当前位置: 移动技术网 > IT编程>开发语言>Java > 史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

2020年04月14日  | 移动技术网IT编程  | 我要评论
这篇文章的诞生要感谢一位读者,是他让这篇 优秀的文章 有了和大家见面的机会,重点是 优秀文章 ,哈哈。 事情的经过是这样的... 不用谢我,送人玫瑰,手有余香。 相信接下来的内容一定不会让你失望,因为它将是目前市面上最好的关于“延迟任务”的文章 ,这也一直是我写作追求的目标,让我的每一篇文章都比市面 ...

这篇文章的诞生要感谢一位读者,是他让这篇优秀的文章有了和大家见面的机会,重点是优秀文章,哈哈。

事情的经过是这样的...

微信聊天.png

不用谢我,送人玫瑰,手有余香。相信接下来的内容一定不会让你失望,因为它将是目前市面上最好的关于“延迟任务”的文章,这也一直是我写作追求的目标,让我的每一篇文章都比市面上的好那么一点点。

好了,话不多说,直接进入今天的主题,本文的主要内容如下图所示:
image.png

什么是延迟任务?

顾明思议,我们把需要延迟执行的任务叫做延迟任务

延迟任务的使用场景有以下这些:

  1. 红包 24 小时未被查收,需要延迟执退还业务;
  2. 每个月账单日,需要给用户发送当月的对账单;
  3. 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。

等事件都需要使用延迟任务。

延迟任务实现思路分析

延迟任务实现的关键是在某个时间节点执行某个任务。基于这个信息我们可以想到实现延迟任务的手段有以下两个:

  1. 自己手写一个“死循环”一直判断当前时间节点有没有要执行的任务;
  2. 借助 jdk 或者第三方提供的工具类来实现延迟任务。

而通过 jdk 实现延迟任务我们能想到的关键词是:delayqueue、scheduledexecutorservice,而第三方提供的延迟任务执行方法就有很多了,例如:redis、netty、mq 等手段。

延迟任务实现

下面我们将结合代码来讲解每种延迟任务的具体实现。

1.无限循环实现延迟任务

此方式我们需要开启一个无限循环一直扫描任务,然后使用一个 map 集合用来存储任务和延迟执行的时间,实现代码如下:

import java.time.instant;
import java.time.localdatetime;
import java.util.hashmap;
import java.util.iterator;
import java.util.map;

/**
 * 延迟任务执行方法汇总
 */
public class delaytaskexample {
    // 存放定时任务
    private static map<string, long> _taskmap = new hashmap<>();

    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        // 添加定时任务
        _taskmap.put("task-1", instant.now().plusseconds(3).toepochmilli()); // 延迟 3s

        // 调用无限循环实现延迟任务
        looptask();
    }

    /**
     * 无限循环实现延迟任务
     */
    public static void looptask() {
        long itemlong = 0l;
        while (true) {
            iterator it = _taskmap.entryset().iterator();
            while (it.hasnext()) {
                map.entry entry = (map.entry) it.next();
                itemlong = (long) entry.getvalue();
                // 有任务需要执行
                if (instant.now().toepochmilli() >= itemlong) {
                    // 延迟任务,业务逻辑执行
                    system.out.println("执行任务:" + entry.getkey() +
                            " ,执行时间:" + localdatetime.now());
                    // 删除任务
                    _taskmap.remove(entry.getkey());
                }
            }
        }
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12t18:51:28.188

执行任务:task-1 ,执行时间:2020-04-12t18:51:31.189

可以看出任务延迟了 3s 钟执行了,符合我们的预期。

2.java api 实现延迟任务

java api 提供了两种实现延迟任务的方法:delayqueue 和 scheduledexecutorservice。

① scheduledexecutorservice 实现延迟任务

我们可以使用 scheduledexecutorservice 来以固定的频率一直执行任务,实现代码如下:

public class delaytaskexample {
    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        scheduledexecutorservicetask();
    }

    /**
     * scheduledexecutorservice 实现固定频率一直循环执行任务
     */
    public static void scheduledexecutorservicetask() {
        scheduledexecutorservice executor = executors.newscheduledthreadpool(1);
        executor.schedulewithfixeddelay(
                new runnable() {
                    @override
                    public void run() {
                        // 执行任务的业务代码
                        system.out.println("执行任务" +
                                " ,执行时间:" + localdatetime.now());
                    }
                },
                2, // 初次执行间隔
                2, // 2s 执行一次
                timeunit.seconds);
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12t21:28:10.416

执行任务 ,执行时间:2020-04-12t21:28:12.421

执行任务 ,执行时间:2020-04-12t21:28:14.422

......

可以看出使用 scheduledexecutorservice#schedulewithfixeddelay(...) 方法之后,会以某个频率一直循环执行延迟任务。

② delayqueue 实现延迟任务

delayqueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 delayed 接口,并重写 getdelay(timeunit) 和 compareto(delayed) 方法,delayqueue 实现延迟队列的完整代码如下:

public class delaytest {
    public static void main(string[] args) throws interruptedexception {
        delayqueue delayqueue = new delayqueue();
        // 添加延迟任务
        delayqueue.put(new delayelement(1000));
        delayqueue.put(new delayelement(3000));
        delayqueue.put(new delayelement(5000));
        system.out.println("开始时间:" +  dateformat.getdatetimeinstance().format(new date()));
        while (!delayqueue.isempty()){
            // 执行延迟任务
            system.out.println(delayqueue.take());
        }
        system.out.println("结束时间:" +  dateformat.getdatetimeinstance().format(new date()));
    }

    static class delayelement implements delayed {
        // 延迟截止时间(单面:毫秒)
        long delaytime = system.currenttimemillis();
        public delayelement(long delaytime) {
            this.delaytime = (this.delaytime + delaytime);
        }
        @override
        // 获取剩余时间
        public long getdelay(timeunit unit) {
            return unit.convert(delaytime - system.currenttimemillis(), timeunit.milliseconds);
        }
        @override
        // 队列里元素的排序依据
        public int compareto(delayed o) {
            if (this.getdelay(timeunit.milliseconds) > o.getdelay(timeunit.milliseconds)) {
                return 1;
            } else if (this.getdelay(timeunit.milliseconds) < o.getdelay(timeunit.milliseconds)) {
                return -1;
            } else {
                return 0;
            }
        }
        @override
        public string tostring() {
            return dateformat.getdatetimeinstance().format(new date(delaytime));
        }
    }
}

以上程序执行的结果为:

开始时间:2020-4-12 20:40:38

2020-4-12 20:40:39

2020-4-12 20:40:41

2020-4-12 20:40:43

结束时间:2020-4-12 20:40:43

3.redis 实现延迟任务

使用 redis 实现延迟任务的方法大体可分为两类:通过 zset 数据判断的方式,和通过键空间通知的方式

① 通过数据判断的方式

我们借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 jedis 框架):

import redis.clients.jedis.jedis;
import utils.jedisutils;
import java.time.instant;
import java.util.set;

public class delayqueueexample {
    // zset key
    private static final string _key = "mydelayqueue";
    
    public static void main(string[] args) throws interruptedexception {
        jedis jedis = jedisutils.getjedis();
        // 延迟 30s 执行(30s 后的时间)
        long delaytime = instant.now().plusseconds(30).getepochsecond();
        jedis.zadd(_key, delaytime, "order_1");
        // 继续添加测试数据
        jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_2");
        jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_3");
        jedis.zadd(_key, instant.now().plusseconds(7).getepochsecond(), "order_4");
        jedis.zadd(_key, instant.now().plusseconds(10).getepochsecond(), "order_5");
        // 开启延迟队列
        dodelayqueue(jedis);
    }

    /**
     * 延迟队列消费
     * @param jedis redis 客户端
     */
    public static void dodelayqueue(jedis jedis) throws interruptedexception {
        while (true) {
            // 当前时间
            instant nowinstant = instant.now();
            long lastsecond = nowinstant.plusseconds(-1).getepochsecond(); // 上一秒时间
            long nowsecond = nowinstant.getepochsecond();
            // 查询当前时间的所有任务
            set<string> data = jedis.zrangebyscore(_key, lastsecond, nowsecond);
            for (string item : data) {
                // 消费任务
                system.out.println("消费:" + item);
            }
            // 删除已经执行的任务
            jedis.zremrangebyscore(_key, lastsecond, nowsecond);
            thread.sleep(1000); // 每秒轮询一次
        }
    }
}

② 通过键空间通知

默认情况下 redis 服务器端是不开启键空间通知的,需要我们通过 config set notify-keyspace-events ex 的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:

import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispubsub;
import utils.jedisutils;

public class taskexample {
    public static final string _topic = "__keyevent@0__:expired"; // 订阅频道名称
    public static void main(string[] args) {
        jedis jedis = jedisutils.getjedis();
        // 执行定时任务
        dotask(jedis);
    }

    /**
     * 订阅过期消息,执行定时任务
     * @param jedis redis 客户端
     */
    public static void dotask(jedis jedis) {
        // 订阅过期消息
        jedis.psubscribe(new jedispubsub() {
            @override
            public void onpmessage(string pattern, string channel, string message) {
                // 接收到消息,执行定时任务
                system.out.println("收到消息:" + message);
            }
        }, _topic);
    }
}

4.netty 实现延迟任务

netty 是由 jboss 提供的一个 java 开源框架,它是一个基于 nio 的客户、服务器端的编程框架,使用 netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 tcp 和 udp 的 socket 服务开发。

可以使用 netty 提供的工具类 hashedwheeltimer 来实现延迟任务,实现代码如下。

首先在项目中添加 netty 引用,配置如下:

<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
    <groupid>io.netty</groupid>
    <artifactid>netty-common</artifactid>
    <version>4.1.48.final</version>
</dependency>

netty 实现的完整代码如下:

public class delaytaskexample {
    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        nettytask();
    }

    /**
     * 基于 netty 的延迟任务
     */
    private static void nettytask() {
        // 创建延迟任务实例
        hashedwheeltimer timer = new hashedwheeltimer(3, // 时间间隔
                timeunit.seconds,
                100); // 时间轮中的槽数
        // 创建一个任务
        timertask task = new timertask() {
            @override
            public void run(timeout timeout) throws exception {
                system.out.println("执行任务" +
                        " ,执行时间:" + localdatetime.now());
            }
        };
        // 将任务添加到延迟队列中
        timer.newtimeout(task, 0, timeunit.seconds);

    }
}

以上程序执行的结果为:

程序启动时间:2020-04-13t10:16:23.033

执行任务 ,执行时间:2020-04-13t10:16:26.118

hashedwheeltimer 是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
时间轮.jpg
(图片来源于网络)

以上的图片可以理解为,时间轮大小为 8,某个时间转一格(例如 1s),每格指向一个链表,保存着待执行的任务。

5.mq 实现延迟任务

如果专门开启一个 mq 中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了 mq 环境的话,用它来实现延迟任务的话,还是可取的。

几乎所有的 mq 中间件都可以实现延迟任务,在这里更准确的叫法应该叫延队列。本文就使用 rabbitmq 为例,来看它是如何实现延迟任务的。

rabbitmq 实现延迟队列的方式有两种:

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  • 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。

注意: 延迟插件 rabbitmq-delayed-message-exchange 是在 rabbitmq 3.5.7 及以上的版本才支持的,依赖 erlang/opt 18.0 及以上运行环境。

由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列的功能。

首先,我们需要下载并安装 rabbitmq-delayed-message-exchange 插件,下载地址:

选择相应的对应的版本进行下载,然后拷贝到 rabbitmq 服务器目录,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件,在使用命令 rabbitmq-plugins list 查询安装的所有插件,安装成功如下图所示:

最后重启 rabbitmq 服务,使插件生效。

首先,我们先要配置消息队列,实现代码如下:

import com.example.rabbitmq.mq.directconfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;

@configuration
public class delayedconfig {
    final static string queue_name = "delayed.goods.order";
    final static string exchange_name = "delayedec";
    @bean
    public queue queue() {
        return new queue(delayedconfig.queue_name);
    }

    // 配置默认的交换机
    @bean
    customexchange customexchange() {
        map<string, object> args = new hashmap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new customexchange(delayedconfig.exchange_name, "x-delayed-message", true, false, args);
    }
    // 绑定队列到交换器
    @bean
    binding binding(queue queue, customexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delayedconfig.queue_name).noargs();
    }
}

然后添加增加消息的代码,具体实现如下:

import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.text.simpledateformat;
import java.util.date;

@component
public class delayedsender {
    @autowired
    private amqptemplate rabbittemplate;

    public void send(string msg) {
        simpledateformat sf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
        system.out.println("发送时间:" + sf.format(new date()));

        rabbittemplate.convertandsend(delayedconfig.exchange_name, delayedconfig.queue_name, msg, new messagepostprocessor() {
            @override
            public message postprocessmessage(message message) throws amqpexception {
                message.getmessageproperties().setheader("x-delay", 3000);
                return message;
            }
        });
    }
}

再添加消费消息的代码:

import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.text.simpledateformat;
import java.util.date;

@component
@rabbitlistener(queues = "delayed.goods.order")
public class delayedreceiver {
    @rabbithandler
    public void process(string msg) {
        simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
        system.out.println("接收时间:" + sdf.format(new date()));
        system.out.println("消息内容:" + msg);
    }
}

最后,我们使用代码测试一下:

import com.example.rabbitmq.rabbitmqapplication;
import com.example.rabbitmq.mq.delayed.delayedsender;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;

import java.text.simpledateformat;
import java.util.date;

@runwith(springrunner.class)
@springboottest
public class delayedtest {

    @autowired
    private delayedsender sender;

    @test
    public void test() throws interruptedexception {
        simpledateformat sf = new simpledateformat("yyyy-mm-dd");
        sender.send("hi admin.");
        thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }
}

以上程序的执行结果如下:

发送时间:2020-04-13 20:47:51

接收时间:2020-04-13 20:47:54

消息内容:hi admin.

从结果可以看出,以上程序执行符合延迟任务的实现预期。

6.使用 spring 定时任务

如果你使用的是 spring 或 springboot 的项目的话,可以使用借助 scheduled 来实现,本文将使用 springboot 项目来演示 scheduled 的实现,实现我们需要声明开启 scheduled,实现代码如下:

@springbootapplication
@enablescheduling
public class application {
    public static void main(string[] args) {
        springapplication.run(application.class, args);
    }
}

然后添加延迟任务,实现代码如下:

@component
public class schedulejobs {
    @scheduled(fixeddelay = 2 * 1000)
    public void fixeddelayjob() throws interruptedexception {
        system.out.println("任务执行,时间:" + localdatetime.now());
    }
}

此时当我们启动项目之后就可以看到任务以延迟了 2s 的形式一直循环执行,结果如下:

任务执行,时间:2020-04-13t14:07:53.349

任务执行,时间:2020-04-13t14:07:55.350

任务执行,时间:2020-04-13t14:07:57.351

...

我们也可以使用 corn 表达式来定义任务执行的频率,例如使用 @scheduled(cron = "0/4 * * * * ?") 。

7.quartz 实现延迟任务

quartz 是一款功能强大的任务调度器,可以实现较为复杂的调度功能,它还支持分布式的任务调度。

我们使用 quartz 来实现一个延迟任务,首先定义一个执行任务代码如下:

import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
import org.springframework.scheduling.quartz.quartzjobbean;

import java.time.localdatetime;

public class samplejob extends quartzjobbean {
    @override
    protected void executeinternal(jobexecutioncontext jobexecutioncontext)
            throws jobexecutionexception {
        system.out.println("任务执行,时间:" + localdatetime.now());
    }
}

在定义一个 jobdetail 和 trigger 实现代码如下:

import org.quartz.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class samplescheduler {
    @bean
    public jobdetail samplejobdetail() {
        return jobbuilder.newjob(samplejob.class).withidentity("samplejob")
                .storedurably().build();
    }

    @bean
    public trigger samplejobtrigger() {
        // 3s 后执行
        simpleschedulebuilder schedulebuilder =
                simpleschedulebuilder.simpleschedule().withintervalinseconds(3).withrepeatcount(1);
        return triggerbuilder.newtrigger().forjob(samplejobdetail()).withidentity("sampletrigger")
                .withschedule(schedulebuilder).build();
    }
}

最后在 springboot 项目启动之后开启延迟任务,实现代码如下:

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.scheduling.quartz.schedulerfactorybean;

/**
 * springboot 项目启动后执行
 */
public class mystartuprunner implements commandlinerunner {

    @autowired
    private schedulerfactorybean schedulerfactorybean;

    @autowired
    private samplescheduler samplescheduler;

    @override
    public void run(string... args) throws exception {
        // 启动定时任务
        schedulerfactorybean.getscheduler().schedulejob(
                samplescheduler.samplejobtrigger());
    }
}

以上程序的执行结果如下:

2020-04-13 19:02:12.331  info 17768 --- [  restartedmain] com.example.demo.demoapplication         : started demoapplication in 1.815 seconds (jvm running for 3.088)

任务执行,时间:2020-04-13t19:02:15.019

从结果可以看出在项目启动 3s 之后执行了延迟任务。

总结

本文讲了延迟任务的使用场景,以及延迟任务的 10 种实现方式:

  1. 手动无线循环;
  2. scheduledexecutorservice;
  3. delayqueue;
  4. redis zset 数据判断的方式;
  5. redis 键空间通知的方式;
  6. netty 提供的 hashedwheeltimer 工具类;
  7. rabbitmq 死信队列;
  8. rabbitmq 延迟消息插件 rabbitmq-delayed-message-exchange;
  9. spring scheduled;
  10. quartz。

最后的话

俗话说:台上一分钟,台下十年功。本文的所有内容皆为作者多年工作积累的结晶,以及熬夜呕心沥血的整理,如果觉得本文有帮助到你,请帮我分享出去,让更多的人看到,谢谢你。

本文由博客一文多发平台 openwrite 发布!

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网