当前位置: 移动技术网 > IT编程>开发语言>Java > 多线程编程学习十一(ThreadPoolExecutor 详解).

多线程编程学习十一(ThreadPoolExecutor 详解).

2019年09月20日  | 移动技术网IT编程  | 我要评论

一、threadpoolexecutor 参数说明

    public threadpoolexecutor(int corepoolsize,
                              int maximumpoolsize,
                              long keepalivetime,
                              timeunit unit,
                              blockingqueue<runnable> workqueue,
                              threadfactory threadfactory,
                              rejectedexecutionhandler handler)
  • corepoolsize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的 prestartallcorethreads() 方法,核心线程池会提前创建并启动所有核心线程。
  • workqueue:任务队列。当核心线程池中没有线程时,所提交的任务会被暂存在队列中。java 提供了多种。
  • maximumpoolsize:线程池允许创建的最大线程数。如果队列也满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的空闲线程执行任务。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。
  • keepalivetime:当线程池中的线程数大于 corepoolsize 时,keepalivetime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。
  • timeunit:线程活动保持时间的单位。
  • threadfactory:创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置符合业务的名字。

    // 依赖 guava
    new threadfactorybuilder().setnameformat("xx-task-%d").build();
  • handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。java 提供了以下4种策略:

    • abortpolicy:默认。直接抛出异常。
    • callerrunspolicy:只用调用者所在线程来运行任务。
    • discardoldestpolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • discardpolicy:不处理,丢弃掉。

tips: 一般我们称核心线程池中的线程为核心线程,这部分线程不会被回收;超过任务队列后,创建的线程为空闲线程,这部分线程会被回收(回收时间即 keepalivetime)

二、常见的 threadpoolexecutor 介绍

executors 是创建 threadpoolexecutor 和 scheduledthreadpoolexecutor 的工厂类。

java 提供了多种类型的 threadpoolexecutor,比较常见的有 fixedthreadpool、singlethreadexecutor、cachedthreadpool等。

fixedthreadpool

    public static executorservice newfixedthreadpool(int nthreads) {
        return new threadpoolexecutor(nthreads, nthreads,
                                      0l, timeunit.milliseconds,
                                      new linkedblockingqueue<runnable>());
    }

fixedthreadpool 被称为可重用固定线程数的线程池。可以看到 corepoolsize 和 maximumpoolsize 都被设置成了 nthreads;keepalivetime设置为0l,意味着多余的空闲线程会被立即终止;使用了阻塞队列 linkedblockingqueue 作为线程的工作队列(队列的容量为 integer.max_value)。

fixedthreadpool 所存在的问题是,由于队列的容量为 integer.max_value,基本可以认为是无界的,所以 maximumpoolsize 和 keepalivetime 参数都不会生效,饱和拒绝策略也不会执行,会造成任务大量堆积在阻塞队列中。

fixedthreadpool 适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。


singlethreadexecutor

    public static executorservice newsinglethreadexecutor() {
        return new finalizabledelegatedexecutorservice
            (new threadpoolexecutor(1, 1,
                                    0l, timeunit.milliseconds,
                                    new linkedblockingqueue<runnable>()));
    }

singlethreadexecutor 是使用单个线程的线程池。可以看到 corepoolsize 和 maximumpoolsize 被设置为1,其他参数与 fixedthreadpool 相同,所以所带来的风险也和 fixedthreadpool 一致,就不赘述了。

singlethreadexecutor 适用于需要保证顺序的执行各个任务。


cachedthreadpool

    public static executorservice newcachedthreadpool() {
        return new threadpoolexecutor(0, integer.max_value,
                                      60l, timeunit.seconds,
                                      new synchronousqueue<runnable>());
    }

cachedthreadpool 是一个会根据需要创建新线程的线程池。可以看到 corepoolsize 被设置为 0,所以创建的线程都为空闲线程;maximumpoolsize 被设置为 integer.max_value(基本可认为无界),意味着可以创建无限数量的空闲线程;keepalivetime 设置为60l,意味着空闲线程等待新任务的最长时间为60秒;使用没有容量的 synchronousqueue 作为线程池的工作队列。

cachedthreadpool 所存在的问题是, 如果主线程提交任务的速度高于maximumpool 中线程处理任务的速度时,cachedthreadpool 会不断创建新线程。极端情况下,cachedthreadpool会因为创建过多线程而耗尽cpu和内存资源。

cachedthreadpool 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

三、 自建 threadpoolexecutor 线程池

鉴于上面提到的风险,我们更提倡使用 threadpoolexecutor 去创建线程池,而不用 executors 工厂去创建。

以下是一个 threadpoolexecutor 创建线程池的 demo 实例:

public class pool {

    static threadfactory threadfactory = new threadfactorybuilder().setnameformat("pool-task-%d").build();
    static executorservice executor = new threadpoolexecutor(runtime.getruntime().availableprocessors() * 2,
            200, 0l, timeunit.milliseconds, new linkedblockingqueue<>(1024),
            threadfactory, new threadpoolexecutor.abortpolicy());

    public static void main(string[] args) throws executionexception, interruptedexception {
        // 1. 无返回值的任务执行 -> runnable
        executor.execute(() -> system.out.println("hello world"));
        // 2. 有返回值的任务执行 -> callable
        future<string> future = executor.submit(() -> "hello world");
        // get 方法会阻塞线程执行等待返回结果
        string result = future.get();
        system.out.println(result);

        // 3. 监控线程池
        monitor();

        // 4. 关闭线程池
        shutdownandawaittermination();

        monitor();
    }

    private static void monitor() {
        threadpoolexecutor threadpoolexecutor = (threadpoolexecutor) pool.executor;
        system.out.println("【线程池任务】线程池中曾经创建过的最大线程数:" + threadpoolexecutor.getlargestpoolsize());
        system.out.println("【线程池任务】线程池中线程数:" + threadpoolexecutor.getpoolsize());
        system.out.println("【线程池任务】线程池中活动的线程数:" + threadpoolexecutor.getactivecount());
        system.out.println("【线程池任务】队列中等待执行的任务数:" + threadpoolexecutor.getqueue().size());
        system.out.println("【线程池任务】线程池已执行完任务数:" + threadpoolexecutor.getcompletedtaskcount());
    }

    /**
     * 关闭线程池
     * 1. shutdown、shutdownnow 的原理都是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程。
     * 2. shutdownnow:将线程池的状态设置成 stop,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
     * 3. shutdown:将线程池的状态设置成 shutdown 状态,然后中断所有没有正在执行任务的线程。
     */
    private static void shutdownandawaittermination() {
        // 禁止提交新任务
        executor.shutdown();
        try {
            // 等待现有任务终止
            if (!executor.awaittermination(60, timeunit.seconds)) {
                // 取消当前正在执行的任务
                executor.shutdownnow();
                // 等待一段时间让任务响应被取消
                if (!executor.awaittermination(60, timeunit.seconds)) {
                    system.err.println("pool did not terminate");
                }
            }
        } catch (interruptedexception ie) {
            // 如果当前线程也中断,则取消
            executor.shutdownnow();
            // 保留中断状态
            thread.currentthread().interrupt();
        }
    }
}

创建线程池需要注意以下几点:

  1. cpu 密集型任务应配置尽可能小的线程,如配置 ncpu+1 个线程。
  2. io 密集型任务(数据库读写等)应配置尽可能多的线程,如配置 ncpu*2 个线程。
  3. 优先级不同的任务可以使用优先级队列 priorityblockingqueue 来处理。
  4. 建议使用有界队列。可以避免创建数量非常多的线程,甚至拖垮系统。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网