当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot2.0 整合 QuartJob ,实现定时器实时管理

SpringBoot2.0 整合 QuartJob ,实现定时器实时管理

2019年06月06日  | 移动技术网IT编程  | 我要评论

一、quartjob简介

1、一句话描述

quartz是一个完全由java编写的开源作业调度框架,形式简易,功能强大。

2、核心api

(1)、scheduler
代表一个 quartz 的独立运行容器,scheduler 将 trigger 绑定到特定 jobdetail, 这样当 trigger 触发时, 对应的 job 就会被调度。
(2)、trigger
描述 job 执行的时间触发规则。主要有 simpletrigger 和 crontrigger 两个子类,通过一个 triggerkey 唯一标识。
(3)、job
定义一个任务,规定了任务是执行时的行为。jobexecutioncontext 提供了调度器的上下文信息,job 的数据可从 jobdatamap 中获取。
(4)、jobdetail
quartz 在每次执行 job 时,都重新创建一个 job 实例,所以它不直接接受一个 job 的实例,相反它接收一个 job 实现类。描述 job 的实现类及其它相关的静态信息,如 job 名字、描述等。

二、与springboot2.0 整合

1、项目结构


版本描述

spring-boot:2.1.3.release
quart-job:2.3.0

2、定时器配置

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.quartz.schedulerfactorybean;
import javax.sql.datasource;
import java.util.properties;
@configuration
public class scheduleconfig {
    @bean
    public schedulerfactorybean schedulerfactorybean(datasource datasource) {
        // quartz参数配置
        properties prop = new properties();
        // schedule调度器的实体名字
        prop.put("org.quartz.scheduler.instancename", "huskyscheduler");
        // 设置为auto时使用,默认的实现org.quartz.scheduler.simpleinstancegenerator是基于主机名称和时间戳生成。
        prop.put("org.quartz.scheduler.instanceid", "auto");
        // 线程池配置
        prop.put("org.quartz.threadpool.class", "org.quartz.simpl.simplethreadpool");
        prop.put("org.quartz.threadpool.threadcount", "20");
        prop.put("org.quartz.threadpool.threadpriority", "5");
        // jobstore配置:scheduler在运行时用来存储相关的信息
        // jdbcjobstore和jobstoretx都使用关系数据库来存储schedule相关的信息。
        // jobstoretx在每次执行任务后都使用commit或者rollback来提交更改。
        prop.put("org.quartz.jobstore.class", "org.quartz.impl.jdbcjobstore.jobstoretx");
        // 集群配置:如果有多个调度器实体的话则必须设置为true
        prop.put("org.quartz.jobstore.isclustered", "true");
        // 集群配置:检查集群下的其他调度器实体的时间间隔
        prop.put("org.quartz.jobstore.clustercheckininterval", "15000");
        // 设置一个频度(毫秒),用于实例报告给集群中的其他实例
        prop.put("org.quartz.jobstore.maxmisfirestohandleatatime", "1");
        // 触发器触发失败后再次触犯的时间间隔
        prop.put("org.quartz.jobstore.misfirethreshold", "12000");
        // 数据库表前缀
        prop.put("org.quartz.jobstore.tableprefix", "qrtz_");
        // 从 locks 表查询一行并对这行记录加锁的 sql 语句
        prop.put("org.quartz.jobstore.selectwithlocksql", "select * from {0}locks updlock where lock_name = ?");

        // 定时器工厂配置
        schedulerfactorybean factory = new schedulerfactorybean();
        factory.setdatasource(datasource);
        factory.setquartzproperties(prop);
        factory.setschedulername("huskyscheduler");
        factory.setstartupdelay(30);
        factory.setapplicationcontextschedulercontextkey("applicationcontextkey");
        // 可选,quartzscheduler 启动时更新己存在的job
        factory.setoverwriteexistingjobs(true);
        // 设置自动启动,默认为true
        factory.setautostartup(true);
        return factory;
    }
}

3、定时器管理工具

import com.quart.job.entity.schedulejobbean;
import org.quartz.*;
/**
 * 定时器工具类
 */
public class scheduleutil {
    private scheduleutil (){}
    private static final string schedule_name = "husky_" ;
    /**
     * 触发器 key
     */
    public static triggerkey gettriggerkey(long jobid){
        return triggerkey.triggerkey(schedule_name+jobid) ;
    }
    /**
     * 定时器 key
     */
    public static jobkey getjobkey (long jobid){
        return jobkey.jobkey(schedule_name+jobid) ;
    }
    /**
     * 表达式触发器
     */
    public static crontrigger getcrontrigger (scheduler scheduler,long jobid){
        try {
            return (crontrigger)scheduler.gettrigger(gettriggerkey(jobid)) ;
        } catch (schedulerexception e){
            throw new runtimeexception("getcrontrigger fail",e) ;
        }
    }
    /**
     * 创建定时器
     */
    public static void createjob (scheduler scheduler, schedulejobbean schedulejob){
        try {
            // 构建定时器
            jobdetail jobdetail = jobbuilder.newjob(taskjoblog.class).withidentity(getjobkey(schedulejob.getjobid())).build() ;
            cronschedulebuilder schedulebuilder = cronschedulebuilder
                    .cronschedule(schedulejob.getcronexpression())
                    .withmisfirehandlinginstructiondonothing() ;
            crontrigger trigger = triggerbuilder.newtrigger()
                    .withidentity(gettriggerkey(schedulejob.getjobid()))
                    .withschedule(schedulebuilder).build() ;
            jobdetail.getjobdatamap().put(schedulejobbean.job_param_key,schedulejob);
            scheduler.schedulejob(jobdetail,trigger) ;
            // 如果该定时器处于暂停状态
            if (schedulejob.getstatus() == 1){
                pausejob(scheduler,schedulejob.getjobid()) ;
            }
        } catch (schedulerexception e){
            throw new runtimeexception("createjob fail",e) ;
        }
    }
    /**
     * 更新定时任务
     */
    public static void updatejob(scheduler scheduler, schedulejobbean schedulejob) {
        try {
            // 构建定时器
            triggerkey triggerkey = gettriggerkey(schedulejob.getjobid());
            cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(schedulejob.getcronexpression())
                    .withmisfirehandlinginstructiondonothing();
            crontrigger trigger = getcrontrigger(scheduler, schedulejob.getjobid());
            trigger = trigger.gettriggerbuilder().withidentity(triggerkey).withschedule(schedulebuilder).build();
            trigger.getjobdatamap().put(schedulejobbean.job_param_key, schedulejob);
            scheduler.reschedulejob(triggerkey, trigger);
            // 如果该定时器处于暂停状态
            if(schedulejob.getstatus() == 1){
                pausejob(scheduler, schedulejob.getjobid());
            }
        } catch (schedulerexception e) {
            throw new runtimeexception("updatejob fail",e) ;
        }
    }
    /**
     * 停止定时器
     */
    public static void pausejob (scheduler scheduler,long jobid){
        try {
            scheduler.pausejob(getjobkey(jobid));
        } catch (schedulerexception e){
            throw new runtimeexception("pausejob fail",e) ;
        }
    }
    /**
     * 恢复定时器
     */
    public static void resumejob (scheduler scheduler,long jobid){
        try {
            scheduler.resumejob(getjobkey(jobid));
        } catch (schedulerexception e){
            throw new runtimeexception("resumejob fail",e) ;
        }
    }
    /**
     * 删除定时器
     */
    public static void deletejob (scheduler scheduler,long jobid){
        try {
            scheduler.deletejob(getjobkey(jobid));
        } catch (schedulerexception e){
            throw new runtimeexception("deletejob fail",e) ;
        }
    }
    /**
     * 执行定时器
     */
    public static void run (scheduler scheduler, schedulejobbean schedulejob){
        try {
            jobdatamap datamap = new jobdatamap() ;
            datamap.put(schedulejobbean.job_param_key,schedulejob);
            scheduler.triggerjob(getjobkey(schedulejob.getjobid()),datamap);
        } catch (schedulerexception e){
            throw new runtimeexception("run fail",e) ;
        }
    }
}

4、定时器执行和日志

import com.quart.job.entity.schedulejobbean;
import com.quart.job.entity.schedulejoblogbean;
import com.quart.job.service.schedulejoblogservice;
import org.quartz.jobexecutioncontext;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.scheduling.quartz.quartzjobbean;
import java.lang.reflect.method;
import java.util.date;
/**
 * 定时器执行日志记录
 */
public class taskjoblog extends quartzjobbean {

    private static final logger log = loggerfactory.getlogger(taskjoblog.class) ;

    @override
    protected void executeinternal(jobexecutioncontext context) {
        schedulejobbean jobbean = (schedulejobbean)context.getmergedjobdatamap().get(schedulejobbean.job_param_key) ;
        schedulejoblogservice schedulejoblogservice = (schedulejoblogservice)springcontextutil.getbean("schedulejoblogservice") ;
        // 定时器日志记录
        schedulejoblogbean logbean = new schedulejoblogbean () ;
        logbean.setjobid(jobbean.getjobid());
        logbean.setbeanname(jobbean.getbeanname());
        logbean.setparams(jobbean.getparams());
        logbean.setcreatetime(new date());
        long begintime = system.currenttimemillis() ;
        try {
            // 加载并执行定时器的 run 方法
            object target = springcontextutil.getbean(jobbean.getbeanname());
            method method = target.getclass().getdeclaredmethod("run", string.class);
            method.invoke(target, jobbean.getparams());
            long executetime = system.currenttimemillis() - begintime;
            logbean.settimes((int)executetime);
            logbean.setstatus(0);
            log.info("定时器 === >> "+jobbean.getjobid()+"执行成功,耗时 === >> " + executetime);
        } catch (exception e){
            // 异常信息
            long executetime = system.currenttimemillis() - begintime;
            logbean.settimes((int)executetime);
            logbean.setstatus(1);
            logbean.seterror(e.getmessage());
        } finally {
            schedulejoblogservice.insert(logbean) ;
        }
    }
}

三、定时器服务封装

1、定时器初始化

@service
public class schedulejobserviceimpl implements schedulejobservice {

    @resource
    private scheduler scheduler ;
    @resource
    private schedulejobmapper schedulejobmapper ;

    /**
     * 定时器初始化
     */
    @postconstruct
    public void init (){
        schedulejobexample example = new schedulejobexample() ;
        list<schedulejobbean> schedulejobbeanlist = schedulejobmapper.selectbyexample(example) ;
        for (schedulejobbean schedulejobbean : schedulejobbeanlist) {
            crontrigger crontrigger = scheduleutil.getcrontrigger(scheduler,schedulejobbean.getjobid()) ;
            if (crontrigger == null){
                scheduleutil.createjob(scheduler,schedulejobbean);
            } else {
                scheduleutil.updatejob(scheduler,schedulejobbean);
            }
        }
    }
}

2、添加定时器

@override
@transactional(rollbackfor = exception.class)
public int insert(schedulejobbean record) {
    scheduleutil.createjob(scheduler,record);
    return schedulejobmapper.insert(record);
}

3、立即执行一次定时器

@override
@transactional(rollbackfor = exception.class)
public void run(long jobid) {
    schedulejobbean schedulejobbean = schedulejobmapper.selectbyprimarykey(jobid) ;
    scheduleutil.run(scheduler,schedulejobbean);
}

4、更新定时器

@override
@transactional(rollbackfor = exception.class)
public int updatebyprimarykeyselective(schedulejobbean record) {
    scheduleutil.updatejob(scheduler,record);
    return schedulejobmapper.updatebyprimarykeyselective(record);
}

5、停止定时器

@override
@transactional(rollbackfor = exception.class)
public void pausejob(long jobid) {
    schedulejobbean schedulejobbean = schedulejobmapper.selectbyprimarykey(jobid) ;
    scheduleutil.pausejob(scheduler,jobid);
    schedulejobbean.setstatus(1);
    schedulejobmapper.updatebyprimarykeyselective(schedulejobbean) ;
}

6、恢复定时器

@override
@transactional(rollbackfor = exception.class)
public void resumejob(long jobid) {
    schedulejobbean schedulejobbean = schedulejobmapper.selectbyprimarykey(jobid) ;
    scheduleutil.resumejob(scheduler,jobid);
    schedulejobbean.setstatus(0);
    schedulejobmapper.updatebyprimarykeyselective(schedulejobbean) ;
}

7、删除定时器

@override
@transactional(rollbackfor = exception.class)
public void delete(long jobid) {
    scheduleutil.deletejob(scheduler, jobid);
    schedulejobmapper.deletebyprimarykey(jobid) ;
}

四、配置一个测试的定时器

1、定时接口封装

public interface taskservice {
    void run(string params);
}

2、测试定时器

@component("gettimetask")
public class gettimetask implements taskservice {
    private static final logger log = loggerfactory.getlogger(gettimetask.class.getname()) ;
    private static final simpledateformat format =
            new simpledateformat("yyyy-mm-dd hh:mm:ss") ;
    @override
    public void run(string params) {
        log.info("params === >> " + params);
        log.info("当前时间::::"+format.format(new date()));
    }
}

五、源代码

github:知了一笑
https://github.com/cicadasmile/middle-ware-parent

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网