当前位置: 移动技术网 > IT编程>软件设计>架构 > 推荐一款高效的处理延迟任务神器

推荐一款高效的处理延迟任务神器

2020年03月09日  | 移动技术网IT编程  | 我要评论

时间轮算法

时间轮是一种高效、低延迟的调度数据结构。其在linux内核中广泛使用,是linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景一般只需要用到原始时间轮就可以了。

代码案例

推荐使用netty提供的hashedwheeltimer工具类来实现延迟任务。

引入依赖:

<dependency>
      <groupid>io.netty</groupid>
      <artifactid>netty-common</artifactid>
      <version>4.1.23.final</version>
</dependency>

红包过期队列信息:

/**
 * 红包过期队列信息
 */
public class redpackettimertask implements timertask {

    private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss");

    /**
     * 红包 id
     */
    private final long redpacketid;

    /**
     * 创建时间戳
     */
    private final long timestamp;

    public redpackettimertask(long redpacketid) {
        this.redpacketid = redpacketid;
        this.timestamp = system.currenttimemillis();
    }

    @override
    public void run(timeout timeout) {
        //异步处理任务
        system.out.println(string.format("任务执行时间:%s,红包创建时间:%s,红包id:%s",
                localdatetime.now().format(f), localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), redpacketid));
    }
}

测试用例:

/**
 * 基于 netty 的时间轮算法 hashedwheeltimer 实现的延迟任务
 */
public class redpackethashedwheeltimer {

    private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss");

    public static void main(string[] args) throws exception {
        threadfactory factory = r -> {
            thread thread = new thread(r);
            thread.setdaemon(true);
            thread.setname("redpackethashedwheeltimerworker");
            return thread;
        };
        /**
         * @param tickduration - 每tick一次的时间间隔
         * @param unit - tickduration 的时间单位
         * @param ticksperwheel - 时间轮中的槽数
         * @param leakdetection - 检查内存溢出
         */
        timer timer = new hashedwheeltimer(factory, 1,
                                           timeunit.seconds, 100,true);
        system.out.println(string.format("开始任务时间:%s",localdatetime.now().format(f)));
        for(int i=1;i<10;i++){
            timertask timertask = new redpackettimertask(i);
            timer.newtimeout(timertask, i, timeunit.seconds);
        }
        thread.sleep(integer.max_value);
    }
}

打印任务执行日志:

开始任务时间:2020-02-12 15:22:23.404
任务执行时间:2020-02-12 15:22:25.410,红包创建时间:2020-02-12 15:22:23.409,红包id:1
任务执行时间:2020-02-12 15:22:26.411,红包创建时间:2020-02-12 15:22:23.414,红包id:2
任务执行时间:2020-02-12 15:22:27.424,红包创建时间:2020-02-12 15:22:23.414,红包id:3
任务执行时间:2020-02-12 15:22:28.410,红包创建时间:2020-02-12 15:22:23.414,红包id:4
任务执行时间:2020-02-12 15:22:29.411,红包创建时间:2020-02-12 15:22:23.414,红包id:5
任务执行时间:2020-02-12 15:22:30.409,红包创建时间:2020-02-12 15:22:23.414,红包id:6
任务执行时间:2020-02-12 15:22:31.411,红包创建时间:2020-02-12 15:22:23.414,红包id:7
任务执行时间:2020-02-12 15:22:32.409,红包创建时间:2020-02-12 15:22:23.414,红包id:8
任务执行时间:2020-02-12 15:22:33.411,红包创建时间:2020-02-12 15:22:23.414,红包id:9

源码相关

其核心是workerthread线程,主要负责每过tickduration时间就累加一次tick。同时也负责执行到期的timeout任务以及添加timeout任务到指定的wheel中。

构造方法:

public hashedwheeltimer(
            threadfactory threadfactory,
            long tickduration, timeunit unit, int ticksperwheel, boolean leakdetection,
            long maxpendingtimeouts) {

        if (threadfactory == null) {
            throw new nullpointerexception("threadfactory");
        }
        if (unit == null) {
            throw new nullpointerexception("unit");
        }
        if (tickduration <= 0) {
            throw new illegalargumentexception("tickduration must be greater than 0: " + tickduration);
        }
        if (ticksperwheel <= 0) {
            throw new illegalargumentexception("ticksperwheel must be greater than 0: " + ticksperwheel);
        }

        // normalize ticksperwheel to power of two and initialize the wheel.
        wheel = createwheel(ticksperwheel);
        mask = wheel.length - 1;

        // convert tickduration to nanos.
        this.tickduration = unit.tonanos(tickduration);

        // prevent overflow.
        if (this.tickduration >= long.max_value / wheel.length) {
            throw new illegalargumentexception(string.format(
                    "tickduration: %d (expected: 0 < tickduration in nanos < %d",
                    tickduration, long.max_value / wheel.length));
        }
        //这里-爪洼笔记
        workerthread = threadfactory.newthread(worker);

        leak = leakdetection || !workerthread.isdaemon() ? leakdetector.track(this) : null;

        this.maxpendingtimeouts = maxpendingtimeouts;

        if (instance_counter.incrementandget() > instance_count_limit &&
            warned_too_many_instances.compareandset(false, true)) {
            reporttoomanyinstances();
        }
}

新增任务,创建即启动:

public timeout newtimeout(timertask task, long delay, timeunit unit) {
        if (task == null) {
            throw new nullpointerexception("task");
        }
        if (unit == null) {
            throw new nullpointerexception("unit");
        }

        long pendingtimeoutscount = pendingtimeouts.incrementandget();

        if (maxpendingtimeouts > 0 && pendingtimeoutscount > maxpendingtimeouts) {
            pendingtimeouts.decrementandget();
            throw new rejectedexecutionexception("number of pending timeouts ("
                + pendingtimeoutscount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxpendingtimeouts + ")");
        }
        //这里-爪洼笔记
        start();

        // add the timeout to the timeout queue which will be processed on the next tick.
        // during processing all the queued hashedwheeltimeouts will be added to the correct hashedwheelbucket.
        long deadline = system.nanotime() + unit.tonanos(delay) - starttime;

        // guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = long.max_value;
        }
        hashedwheeltimeout timeout = new hashedwheeltimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
}

线程启动:

   /**
     * starts the background thread explicitly.  the background thread will
     * start automatically on demand even if you did not call this method.
     *
     * @throws illegalstateexception if this timer has been
     *                               {@linkplain #stop() stopped} already
     */
    public void start() {
        switch (worker_state_updater.get(this)) {
            case worker_state_init:
                if (worker_state_updater.compareandset(this, worker_state_init, worker_state_started)) {
                    workerthread.start();
                }
                break;
            case worker_state_started:
                break;
            case worker_state_shutdown:
                throw new illegalstateexception("cannot be started once stopped");
            default:
                throw new error("invalid workerstate");
        }

        // wait until the starttime is initialized by the worker.
        while (starttime == 0) {
            try {
                starttimeinitialized.await();
            } catch (interruptedexception ignore) {
                // ignore - it will be ready very soon.
            }
        }
    }

执行相关操作:

public void run() {
            // initialize the starttime.
            starttime = system.nanotime();
            if (starttime == 0) {
                // we use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                starttime = 1;
            }

            // notify the other threads waiting for the initialization at start().
            starttimeinitialized.countdown();

            do {
                final long deadline = waitfornexttick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    processcancelledtasks();
                    hashedwheelbucket bucket =
                            wheel[idx];
                    transfertimeoutstobuckets();
                    bucket.expiretimeouts(deadline);
                    tick++;
                }
            } while (worker_state_updater.get(hashedwheeltimer.this) == worker_state_started);

            // fill the unprocessedtimeouts so we can return them from stop() method.
            for (hashedwheelbucket bucket: wheel) {
                bucket.cleartimeouts(unprocessedtimeouts);
            }
            for (;;) {
                hashedwheeltimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.iscancelled()) {
                    unprocessedtimeouts.add(timeout);
                }
            }
            processcancelledtasks();
}

小结

以上方案并没有实现持久化和分布式,生产环境可根据实际业务需求选择使用。

源码

https://gitee.com/52itstyle/spring-boot-seckill

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网