当前位置: 移动技术网 > IT编程>开发语言>Java > Java中有界队列的饱和策略(reject policy)原理解析

Java中有界队列的饱和策略(reject policy)原理解析

2020年05月13日  | 移动技术网IT编程  | 我要评论

h漫画图,岳阳火车站订票电话,中国环境影响评价

我们在使用executorservice的时候知道,在executorservice中有个一个queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列(executorservice.newcachedthreadpool)和有界的队列(executorservice newfixedthreadpool(int nthreads))。

无界队列很好理解,我们可以无限制的向executorservice提交任务。那么对于有界队列来说,如果队列满了该怎么处理呢?

今天我们要介绍一下java中executorservice的饱和策略(reject policy)。

以executorservice的具体实现threadpoolexecutor来说,它定义了4种饱和策略。分别是abortpolicy,discardpolicy,discardoldestpolicy和callerrunspolicy。

如果要在threadpoolexecutor中设定饱和策略可以调用setrejectedexecutionhandler方法,如下所示:

    threadpoolexecutor threadpoolexecutor= new threadpoolexecutor(5, 10, 10, timeunit.seconds, new linkedblockingdeque<runnable>(20));
    threadpoolexecutor.setrejectedexecutionhandler(
        new threadpoolexecutor.abortpolicy()
    );

上面的例子中我们定义了一个初始5个,最大10个工作线程的thread pool,并且定义其中的queue的容量是20。如果提交的任务超出了容量,则会使用abortpolicy策略。

abortpolicy

abortpolicy意思是如果队列满了,最新的提交任务将会被拒绝,并抛出rejectedexecutionexception异常:

  public static class abortpolicy implements rejectedexecutionhandler {
    /**
     * creates an {@code abortpolicy}.
     */
    public abortpolicy() { }

    /**
     * always throws rejectedexecutionexception.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws rejectedexecutionexception always
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      throw new rejectedexecutionexception("task " + r.tostring() +
                         " rejected from " +
                         e.tostring());
    }
  }

上面的代码中,rejectedexecution方法中我们直接抛出了rejectedexecutionexception异常。

discardpolicy

discardpolicy将会悄悄的丢弃提交的任务,而不报任何异常。

public static class discardpolicy implements rejectedexecutionhandler {
    /**
     * creates a {@code discardpolicy}.
     */
    public discardpolicy() { }

    /**
     * does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
    }
  }

discardoldestpolicy

discardoldestpolicy将会丢弃最老的任务,保存最新插入的任务。

  public static class discardoldestpolicy implements rejectedexecutionhandler {
    /**
     * creates a {@code discardoldestpolicy} for the given executor.
     */
    public discardoldestpolicy() { }

    /**
     * obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      if (!e.isshutdown()) {
        e.getqueue().poll();
        e.execute(r);
      }
    }
  }

我们看到在rejectedexecution方法中,poll了最老的一个任务,然后使用threadpoolexecutor提交了一个最新的任务。

callerrunspolicy

callerrunspolicy和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度。我们看下是怎么实现的:

public static class callerrunspolicy implements rejectedexecutionhandler {
    /**
     * creates a {@code callerrunspolicy}.
     */
    public callerrunspolicy() { }

    /**
     * executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedexecution(runnable r, threadpoolexecutor e) {
      if (!e.isshutdown()) {
        r.run();
      }
    }
  }

在rejectedexecution方法中,直接调用了 r.run()方法,这会导致该方法直接在调用者的主线程中执行,而不是在线程池中执行。从而导致主线程在该任务执行结束之前不能提交任何任务。从而有效的阻止了任务的提交。

使用semaphore

如果我们并没有定义饱和策略,那么有没有什么方法来控制任务的提交速度呢?考虑下之前我们讲到的semaphore,我们可以指定一定的资源信号量来控制任务的提交,如下所示:

public class semaphoreusage {

  private final executor executor;
  private final semaphore semaphore;

  public semaphoreusage(executor executor, int count) {
    this.executor = executor;
    this.semaphore = new semaphore(count);
  }

  public void submittask(final runnable command) throws interruptedexception {
    semaphore.acquire();
    try {
      executor.execute(() -> {
            try {
              command.run();
            } finally {
              semaphore.release();
            }
          }
      );
    } catch (rejectedexecutionexception e) {
      semaphore.release();
    }
  }
}

本文的例子可参考https://github.com/ddean2009/learn-java-concurrency/tree/master/rejectpolicy

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网