当前位置: 移动技术网 > IT编程>脚本编程>Python > Tornado集成Apscheduler定时任务

Tornado集成Apscheduler定时任务

2019年06月05日  | 移动技术网IT编程  | 我要评论

南谯区招标采购网,神魄游戏,六盘水汽车站

熟悉python的人可能都知道,apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的quartz移植而来。

之前有用过flask版本的apscheduler做定时任务。刚好前不久接触了tornado,顺便玩玩tornado版本的apscheduler。

本篇做了一个简单的wdb页面,用于添加和删除定时任务,小伙伴们可以基于这个做一些扩展,比如把定时定时任务写入数据库,改变cron规则等等。

#新增任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=add

#删除任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=remov

执行结果可以在console看到

 

from datetime import datetime
from tornado.ioloop import ioloop, periodiccallback
from tornado.web import requesthandler, application
from apscheduler.schedulers.tornado import tornadoscheduler


scheduler = none
job_ids   = []

# 初始化
def init_scheduler():
    global scheduler
    scheduler = tornadoscheduler()
    scheduler.start()
    print('[scheduler init]apscheduler has been started')

# 要执行的定时任务在这里
def task1(options):
    print('{} [apscheduler][task]-{}'.format(datetime.now().strftime('%y-%m-%d %h:%m:%s.%f'), options))


class mainhandler(requesthandler):
    def get(self):
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')


class schedulerhandler(requesthandler):
    def get(self):
        global job_ids
        job_id = self.get_query_argument('job_id', none)
        action = self.get_query_argument('action', none)
        if job_id:
            # add
            if 'add' == action:
                if job_id not in job_ids:
                    job_ids.append(job_id)
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
                    self.write('[task added] - {}'.format(job_id))
                else:
                    self.write('[task exists] - {}'.format(job_id))
            # remove
            elif 'remove' == action:
                if job_id in job_ids:
                    scheduler.remove_job(job_id)
                    job_ids.remove(job_id)
                    self.write('[task removed] - {}'.format(job_id))
                else:
                    self.write('[task not found] - {}'.format(job_id))
        else:
            self.write('[invalid params] invalid job_id or action')


if __name__ == "__main__":
    routes    = [
        (r"/", mainhandler),
        (r"/scheduler/?", schedulerhandler),
    ]
    init_scheduler()
    app       = application(routes, debug=true)
    app.listen(8888)
    ioloop.current().start()

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网