当前位置: 移动技术网 > IT编程>开发语言>Java > springboot+Quartz实现任务调度的示例代码

springboot+Quartz实现任务调度的示例代码

2019年07月19日  | 移动技术网IT编程  | 我要评论

在spring框架中通过 @schedule 可以实现定时任务,通过该注解 cron 的属性描述的规则,spring会去调用这个方法。

spring已经简单粗暴的实现了定时任务,为什么要使用quartz ?

如果你现在有很多个定时任务,规则不同,例如:

  1. 想每月25号,信用卡自动还款
  2. 想每年4月1日自己给当年暗恋女神发一封匿名贺卡
  3. 想每隔1小时,备份一下自己的爱情动作片 学习笔记到云盘

maven 依赖

<dependency>
   <groupid>org.quartz-scheduler</groupid>
   <artifactid>quartz</artifactid>
   <version>2.2.1</version>
 </dependency>
 <dependency>
   <groupid>org.quartz-scheduler</groupid>
   <artifactid>quartz-jobs</artifactid>
   <version>2.2.1</version>
 </dependency>

以下是bootdo开源学习框架的源码

我并没有用到全部的字段,思路是先查询db,

封装以下两个对象

  1. jobdetail 负责存放 job 所需要的数据
  2. trigger 设置 job 的 key,规则(cron)何时开启任务等属性

当触发条件满足时,会根据所设置的beanclass 找到该类(必须实现org.quartz.job), 这时可以取出jobdetail 中的数据,执行具体业务逻辑

@component
public class welcomejob implements job{
  @override
  public void execute(jobexecutioncontext arg0) throws jobexecutionexception {
    //你的业务逻辑
  }
}

表结构

create table `sys_task` (
 `id` bigint(20) not null auto_increment,
 `cronexpression` varchar(255) default null comment 'cron表达式',
 `methodname` varchar(255) default null comment '任务调用的方法名',
 `isconcurrent` varchar(255) default null comment '任务是否有状态',
 `description` varchar(255) default null comment '任务描述',
 `updateby` varchar(64) default null comment '更新者',
 `beanclass` varchar(255) default null comment '任务执行时调用哪个类的方法 包名+类名',
 `createdate` datetime default null comment '创建时间',
 `jobstatus` varchar(255) default null comment '任务状态',
 `jobgroup` varchar(255) default null comment '任务分组',
 `updatedate` datetime default null comment '更新时间',
 `createby` varchar(64) default null comment '创建者',
 `springbean` varchar(255) default null comment 'spring bean',
 `jobname` varchar(255) default null comment '任务名',
 primary key (`id`)
) engine=innodb auto_increment=3 default charset=utf8 row_format=compact;

配置类

import java.io.ioexception;
import java.util.properties;
import org.quartz.scheduler;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.config.propertiesfactorybean;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.core.io.classpathresource;
import org.springframework.scheduling.quartz.schedulerfactorybean;
import com.txgl.common.quartz.factory.jobfactory;
@configuration
public class quartzconfigration {

  @autowired
  jobfactory jobfactory;

  @bean
  public schedulerfactorybean schedulerfactorybean() {
    schedulerfactorybean schedulerfactorybean = new schedulerfactorybean();
    try {
      schedulerfactorybean.setoverwriteexistingjobs(true);
      schedulerfactorybean.setquartzproperties(quartzproperties());
      schedulerfactorybean.setjobfactory(jobfactory);
    } catch (ioexception e) {
      e.printstacktrace();
    }
    return schedulerfactorybean;
  }

  // 指定quartz.properties
  @bean
  public properties quartzproperties() throws ioexception {
    propertiesfactorybean propertiesfactorybean = new propertiesfactorybean();
    propertiesfactorybean.setlocation(new classpathresource("/config/quartz.properties"));
    propertiesfactorybean.afterpropertiesset();
    return propertiesfactorybean.getobject();
  }

  // 创建schedule
  @bean(name = "scheduler")
  public scheduler scheduler() {
    return schedulerfactorybean().getscheduler();
  }
}

quartzmanager的代码是关键,通过注入scheduler 对任务进行操作

import java.util.arraylist;
import java.util.list;
import java.util.set;
import org.apache.log4j.logger;
import org.quartz.cronschedulebuilder;
import org.quartz.crontrigger;
import org.quartz.datebuilder;
import org.quartz.datebuilder.intervalunit;
import org.quartz.job;
import org.quartz.jobbuilder;
import org.quartz.jobdetail;
import org.quartz.jobexecutioncontext;
import org.quartz.jobkey;
import org.quartz.scheduler;
import org.quartz.schedulerexception;
import org.quartz.trigger;
import org.quartz.triggerbuilder;
import org.quartz.triggerkey;
import org.quartz.impl.matchers.groupmatcher;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.scheduling.quartz.schedulerfactorybean;
import org.springframework.stereotype.service;
import org.springframework.web.context.support.springbeanautowiringsupport;
import com.bootdo.common.domain.schedulejob;
import com.bootdo.common.quartz.factory.*;
import com.bootdo.common.utils.springcontextholder;;

/**
 * 
 * 
 * @title: quartzmanager.java
 * @description: 计划任务管理
 *
 */
@service
public class quartzmanager {
  public final logger log = logger.getlogger(this.getclass());
  // private schedulerfactorybean schedulerfactorybean
  // =springcontextholder.getbean(schedulerfactorybean.class);
  // @autowired
  // @qualifier("schedulerfactorybean")
  // private schedulerfactorybean schedulerfactorybean;
  @autowired
  private scheduler scheduler;

  /**
   * 添加任务
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  
  public void addjob(schedulejob job) {
    try {
      // 创建jobdetail实例,绑定job实现类
      // 指明job的名称,所在组的名称,以及绑定job类
      class<? extends job> jobclass = (class<? extends job>) (class.forname(job.getbeanclass()).newinstance()
          .getclass());
      jobdetail jobdetail = jobbuilder.newjob(jobclass).withidentity(job.getjobname(), job.getjobgroup())// 任务名称和组构成任务key
          .build();
      // 定义调度触发规则
      // 使用corntrigger规则
      trigger trigger = triggerbuilder.newtrigger().withidentity(job.getjobname(), job.getjobgroup())// 触发器key
          .startat(datebuilder.futuredate(1, intervalunit.second))
          .withschedule(cronschedulebuilder.cronschedule(job.getcronexpression())).startnow().build();
      // 把作业和触发器注册到任务调度中
      scheduler.schedulejob(jobdetail, trigger);
      // 启动
      if (!scheduler.isshutdown()) {
        scheduler.start();
      }
    } catch (exception e) {
      e.printstacktrace();
    }
  }
// public void addjob(schedulejob job) throws schedulerexception {
//   if (job == null || !schedulejob.status_running.equals(job.getjobstatus())) {
//     return;
//   }
//
//   triggerkey triggerkey = triggerkey.triggerkey(job.getjobname(), job.getjobgroup());
//
//   crontrigger trigger = (crontrigger) scheduler.gettrigger(triggerkey);
//
//   // 不存在,创建一个
//
//   if (null == trigger) {
//     class<? extends job> clazz = schedulejob.concurrent_is.equals(job.getisconcurrent())
//         ? quartzjobfactory.class
//         : quartzjobfactorydisallowconcurrentexecution.class;
//
//     jobdetail jobdetail = jobbuilder.newjob(clazz).withidentity(job.getjobname(), job.getjobgroup()).build();
//
//     jobdetail.getjobdatamap().put("schedulejob", job);
//
//     cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(job.getcronexpression());
//
//     trigger = triggerbuilder.newtrigger().withidentity(job.getjobname(), job.getjobgroup())
//         .withschedule(schedulebuilder).build();
//
//     scheduler.schedulejob(jobdetail, trigger);
//   } else {
//     // trigger已存在,那么更新相应的定时设置
//
//     cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(job.getcronexpression());
//
//     // 按新的cronexpression表达式重新构建trigger
//
//     trigger = trigger.gettriggerbuilder().withidentity(triggerkey).withschedule(schedulebuilder).build();
//
//     // 按新的trigger重新设置job执行
//
//     scheduler.reschedulejob(triggerkey, trigger);
//   }
// }

  /**
   * 获取所有计划中的任务列表
   * 
   * @return
   * @throws schedulerexception
   */
  public list<schedulejob> getalljob() throws schedulerexception {
    groupmatcher<jobkey> matcher = groupmatcher.anyjobgroup();
    set<jobkey> jobkeys = scheduler.getjobkeys(matcher);
    list<schedulejob> joblist = new arraylist<schedulejob>();
    for (jobkey jobkey : jobkeys) {
      list<? extends trigger> triggers = scheduler.gettriggersofjob(jobkey);
      for (trigger trigger : triggers) {
        schedulejob job = new schedulejob();
        job.setjobname(jobkey.getname());
        job.setjobgroup(jobkey.getgroup());
        job.setdescription("触发器:" + trigger.getkey());
        trigger.triggerstate triggerstate = scheduler.gettriggerstate(trigger.getkey());
        job.setjobstatus(triggerstate.name());
        if (trigger instanceof crontrigger) {
          crontrigger crontrigger = (crontrigger) trigger;
          string cronexpression = crontrigger.getcronexpression();
          job.setcronexpression(cronexpression);
        }
        joblist.add(job);
      }
    }
    return joblist;
  }

  /**
   * 所有正在运行的job
   * 
   * @return
   * @throws schedulerexception
   */
  public list<schedulejob> getrunningjob() throws schedulerexception {
    list<jobexecutioncontext> executingjobs = scheduler.getcurrentlyexecutingjobs();
    list<schedulejob> joblist = new arraylist<schedulejob>(executingjobs.size());
    for (jobexecutioncontext executingjob : executingjobs) {
      schedulejob job = new schedulejob();
      jobdetail jobdetail = executingjob.getjobdetail();
      jobkey jobkey = jobdetail.getkey();
      trigger trigger = executingjob.gettrigger();
      job.setjobname(jobkey.getname());
      job.setjobgroup(jobkey.getgroup());
      job.setdescription("触发器:" + trigger.getkey());
      trigger.triggerstate triggerstate = scheduler.gettriggerstate(trigger.getkey());
      job.setjobstatus(triggerstate.name());
      if (trigger instanceof crontrigger) {
        crontrigger crontrigger = (crontrigger) trigger;
        string cronexpression = crontrigger.getcronexpression();
        job.setcronexpression(cronexpression);
      }
      joblist.add(job);
    }
    return joblist;
  }

  /**
   * 暂停一个job
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  public void pausejob(schedulejob schedulejob) throws schedulerexception {
    jobkey jobkey = jobkey.jobkey(schedulejob.getjobname(), schedulejob.getjobgroup());
    scheduler.pausejob(jobkey);
  }

  /**
   * 恢复一个job
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  public void resumejob(schedulejob schedulejob) throws schedulerexception {
    jobkey jobkey = jobkey.jobkey(schedulejob.getjobname(), schedulejob.getjobgroup());
    scheduler.resumejob(jobkey);
  }

  /**
   * 删除一个job
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  public void deletejob(schedulejob schedulejob) throws schedulerexception {
    jobkey jobkey = jobkey.jobkey(schedulejob.getjobname(), schedulejob.getjobgroup());
    scheduler.deletejob(jobkey);

  }

  /**
   * 立即执行job
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  public void runajobnow(schedulejob schedulejob) throws schedulerexception {
    jobkey jobkey = jobkey.jobkey(schedulejob.getjobname(), schedulejob.getjobgroup());
    scheduler.triggerjob(jobkey);
  }

  /**
   * 更新job时间表达式
   * 
   * @param schedulejob
   * @throws schedulerexception
   */
  public void updatejobcron(schedulejob schedulejob) throws schedulerexception {

    triggerkey triggerkey = triggerkey.triggerkey(schedulejob.getjobname(), schedulejob.getjobgroup());

    crontrigger trigger = (crontrigger) scheduler.gettrigger(triggerkey);

    cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(schedulejob.getcronexpression());

    trigger = trigger.gettriggerbuilder().withidentity(triggerkey).withschedule(schedulebuilder).build();

    scheduler.reschedulejob(triggerkey, trigger);
  }
}

service实现

import com.bootdo.common.config.constant;
import com.bootdo.common.dao.taskdao;
import com.bootdo.common.domain.schedulejob;
import com.bootdo.common.domain.taskdo;
import com.bootdo.common.quartz.utils.quartzmanager;
import com.bootdo.common.service.jobservice;
import com.bootdo.common.utils.schedulejobutils;
import org.quartz.schedulerexception;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import java.util.hashmap;
import java.util.list;
import java.util.map;

@service
public class jobserviceimpl implements jobservice {

  @autowired
  private taskdao taskschedulejobmapper;

  @autowired
  quartzmanager quartzmanager;

  @override
  public taskdo get(long id) {
    return taskschedulejobmapper.get(id);
  }

  @override
  public list<taskdo> list(map<string, object> map) {
    return taskschedulejobmapper.list(map);
  }

  @override
  public int count(map<string, object> map) {
    return taskschedulejobmapper.count(map);
  }

  @override
  public int save(taskdo taskschedulejob) {
    return taskschedulejobmapper.save(taskschedulejob);
  }

  @override
  public int update(taskdo taskschedulejob) {
    return taskschedulejobmapper.update(taskschedulejob);
  }

  @override
  public int remove(long id) {
    try {
      taskdo schedulejob = get(id);
      quartzmanager.deletejob(schedulejobutils.entitytodata(schedulejob));
      return taskschedulejobmapper.remove(id);
    } catch (schedulerexception e) {
      e.printstacktrace();
      return 0;
    }

  }

  @override
  public int batchremove(long[] ids) {
    for (long id : ids) {
      try {
        taskdo schedulejob = get(id);
        quartzmanager.deletejob(schedulejobutils.entitytodata(schedulejob));
      } catch (schedulerexception e) {
        e.printstacktrace();
        return 0;
      }
    }
    return taskschedulejobmapper.batchremove(ids);
  }

  @override
  public void initschedule() throws schedulerexception {
    // 这里获取任务信息数据
    list<taskdo> joblist = taskschedulejobmapper.list(new hashmap<string, object>(16));
    for (taskdo schedulejob : joblist) {
      if ("1".equals(schedulejob.getjobstatus())) {
        schedulejob job = schedulejobutils.entitytodata(schedulejob);
        quartzmanager.addjob(job);
      }

    }
  }

  @override
  public void changestatus(long jobid, string cmd) throws schedulerexception {
    taskdo schedulejob = get(jobid);
    if (schedulejob == null) {
      return;
    }
    if (constant.status_running_stop.equals(cmd)) {
      quartzmanager.deletejob(schedulejobutils.entitytodata(schedulejob));
      schedulejob.setjobstatus(schedulejob.status_not_running);
    } else {
      if (!constant.status_running_start.equals(cmd)) {
      } else {
        schedulejob.setjobstatus(schedulejob.status_running);
        quartzmanager.addjob(schedulejobutils.entitytodata(schedulejob));
      }
    }
    update(schedulejob);
  }

  @override
  public void updatecron(long jobid) throws schedulerexception {
    taskdo schedulejob = get(jobid);
    if (schedulejob == null) {
      return;
    }
    if (schedulejob.status_running.equals(schedulejob.getjobstatus())) {
      quartzmanager.updatejobcron(schedulejobutils.entitytodata(schedulejob));
    }
    update(schedulejob);
  }
}

启动一个监听去初始化quartz

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.core.annotation.order;
import org.springframework.stereotype.component;
import com.bootdo.common.quartz.utils.quartzmanager;
import com.bootdo.common.service.jobservice;
@component
@order(value = 1)
public class schedulejobinitlistener implements commandlinerunner {

  @autowired
  jobservice schedulejobservice;

  @autowired
  quartzmanager quartzmanager;

  @override
  public void run(string... arg0) throws exception {
    try {
      schedulejobservice.initschedule();
    } catch (exception e) {
      e.printstacktrace();
    }
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网