.NET Core下开源任务调度框架Hangfire的Api任务拓展(支持秒级任务)

2019年04月09日





有考虑过 “fluentscheduler” ,使用简单,但是管理配置也很麻烦,我希望能做到配置简单,管理方便,高性能。最后想到了以前听过的hangfire,它的好处就是自带控制面板,在园子里看了很多相关资料,偶然发现了有人拓展过hangfire通过调用api接口来执行任务,这种方式可以避免依赖本地代码,方便部署,在此基础上,我用空闲时间拓展了一下现在已经基本可以满足需求。





由于更新到最新版hangfire 1.7支持秒级任务,使用的在线表达式生成部分表达式有问题,注掉了秒级任务表达式生成,有时间需要详细测试更改,可以参考(hangfire官方提供的表达式)


1,部署及调试:只需要配置数据库连接,然后编译即可运行,无需建表,支持(redis,mysql, sqlserver)其他数据库暂时用不到没测试。推荐使用redis集群。项目中直接添加了redis的存储包,已经更新stackexchange.redis到最新版本方便拓展,调试时可以直接调试。部署,只需要发布项目,运行创建windows服务的bat命令,命令已经包含在项目中,或者发布至linux。








 1  //只读面板,只能读取不能操作
 2             app.usehangfiredashboard("/job-read", new dashboardoptions
 3             {
 4                 apppath = "#",//返回时跳转的地址
 5                 displaystorageconnectionstring = false,//是否显示数据库连接信息
 6                 isreadonlyfunc = context =>
 7                 {
 8                     return true;
 9                 },
10                 authorization = new[] { new basicauthauthorizationfilter(new basicauthauthorizationfilteroptions
11                 {
12                     requiressl = false,//是否启用ssl验证,即https
13                     sslredirect = false,
14                     logincasesensitive = true,
15                     users = new []
16                     {                    
17                         new basicauthauthorizationuser
18                         {
19                             login = "read",
20                             passwordclear = "only"
21                         },
22                         new basicauthauthorizationuser
23                         {
24                             login = "test",
25                             passwordclear = "123456"
26                         },
27                         new basicauthauthorizationuser
28                         {
29                             login = "guest",
30                             passwordclear = "123@123"
31                         }
32                     }
33                 })
34                 }
35             });




 1   catch (exception ex)
 2             {
 3                 //获取重试次数
 4                 var count = context.getjobparameter<string>("retrycount");
 5                 context.settextcolor(consoletextcolor.red);
 6                 //signalr推送
 7                 //sendrequest(configsettings.instance.url+"/api/publish/everyone", "测试");
 8                 if (count == "3")//重试达到三次的时候发邮件通知
 9                 {
10                     sendemail(item.jobname, item.url, ex.tostring());
11                 }
12                 logger.error(ex, "httpjob.excute");
13                 context.writeline($"执行出错:{ex.message}");
14                 throw;//不抛异常不会执行重试操作
15             }
 1 /// <summary>
 2         /// 邮件模板
 3         /// </summary>
 4         /// <param name="jobname"></param>
 5         /// <param name="url"></param>
 6         /// <param name="exception"></param>
 7         /// <returns></returns>
 8         private static string sethtmlbody(string jobname, string url, string exception)
 9         {
10             var htmlbody = $@"<h3 align='center'>{hangfirehttpjoboptions.smtpsubject}</h3>
11                             <h3>执行时间:</h3>
12                             <p>
13                                 {datetime.now}
14                             </p>
15                             <h3>
16                                 任务名称:<span> {jobname} </span><br/>
17                             </h3>
18                             <h3>
19                                 请求路径:{url}
20                             </h3>
21                             <h3><span></span> 
22                                 执行结果:<br/>
23                             </h3>
24                             <p>
25                                 {exception}
26                             </p> ";
27             return htmlbody;
28         }
 1  //使用redis
 2                         config.useredisstorage(redis, new hangfire.redis.redisstorageoptions()
 3                         {
 4                             fetchtimeout=timespan.fromminutes(5),
 5                             prefix = "{hangfire}:",
 6                             //活动服务器超时时间
 7                             invisibilitytimeout = timespan.fromhours(1),
 8                             //任务过期检查频率
 9                             expirycheckinterval = timespan.fromhours(1),
10                             deletedlistsize = 10000,
11                             succeededlistsize = 10000
12                         })
13                         .usehangfirehttpjob(new hangfirehttpjoboptions()
14                         {
15                             sendtomaillist = hangfiresettings.instance.sendmaillist,
16                             sendmailaddress = hangfiresettings.instance.sendmailaddress,
17                             smtpserveraddress = hangfiresettings.instance.smtpserveraddress,
18                             smtpport = hangfiresettings.instance.smtpport,
19                             smtppwd = hangfiresettings.instance.smtppwd,
20                             smtpsubject = hangfiresettings.instance.smtpsubject
21                         })


6,signalr 推送:宿主程序使用的weapi,因此可以通过webapi推送,这样做的好处是可以将服务当作推送服务使用,第三方接口也可以利用此来推送,


 1  /// <summary>
 2        ///用户加入组处理
 3        /// </summary>
 4        /// <param name="userid">用户唯一标识</param>
 5        /// <param name="groupname">组名称</param>
 6        /// <returns></returns>
 7         public task initusers(string userid,string groupname)
 8         {
 9             console.writeline($"{userid}加入用户组");
10             groups.addtogroupasync(context.connectionid, groupname);
11             signalrgroups.usergroups.add(new signalrgroups()
12             {
13                 connectionid = context.connectionid,
14                 groupname = groupname,
15                 userid = userid
16             });
17             return clients.all.sendasync("userjoin", "用户组数据更新,新增id为:" + context.connectionid + " pid:" + userid);
18         }
19         /// <summary>
20         /// 断线的时候处理
21         /// </summary>
22         /// <param name="exception"></param>
23         /// <returns></returns>
24         public override task ondisconnectedasync(exception exception)
25         {
26             //掉线移除用户,不给其推送
27             var user = signalrgroups.usergroups.firstordefault(c => c.connectionid == context.connectionid);
29             if (user != null)
30             {
31                 console.writeline($"用户:{user.userid}已离线");
32                 signalrgroups.usergroups.remove(user);
33                 groups.removefromgroupasync(context.connectionid, user.groupname);
34             }
35             return base.ondisconnectedasync(exception);
36         }
 1  /// <summary>
 2         /// 单个connectionid推送
 3         /// </summary>
 4         /// <param name="groups"></param>
 5         /// <returns></returns>
 6         [httppost, route("anyone")]
 7         public iactionresult anyone([frombody]ienumerable<signalrgroups> groups)
 8         {
 9             if (groups != null && groups.any())
10             {
11                 var ids = groups.select(c => c.userid);
12                 var list = signalrgroups.usergroups.where(c => ids.contains(c.userid));
13                 foreach (var item in list)
14                     hubcontext.clients.client(item.connectionid).sendasync("anyone", $"{item.connectionid}: {item.content}");
15             }
16             return ok();
17         }
19         /// <summary>
20         /// 全部推送
21         /// </summary>
22         /// <param name="message"></param>
23         /// <returns></returns>
24         [httppost, route("everyone")]
25         public iactionresult everyone([frombody] msg body)
26         {
27             var data = httpcontext.response.body;
28             hubcontext.clients.all.sendasync("everyone", $"{body.message}");
29             return ok();
30         }
32         /// <summary>
33         /// 单个组推送
34         /// </summary>
35         /// <param name="group"></param>
36         /// <returns></returns>
37         [httppost, route("anygroups")]
38         public iactionresult anygroups([frombody]signalrgroups group)
39         {
40             if (group != null)
41             {
42                 hubcontext.clients.group(group.groupname).sendasync("anygroups", $"{group.content}");
43             }
44             return ok();
45         }



 1 /*健康检查配置项*/
 2   "healthchecks-ui": {
 3     /*检查地址,可以配置当前程序和外部程序*/
 4     "healthchecks": [
 5       {
 6         "name": "hangfire api 健康检查",
 7         "uri": "http://localhost:9006/healthz"
 8       }
 9     ],
10     /*需要检查的api地址*/
11     "checkurls": [
12       {
13         "uri": "http://localhost:17600/cityservice.svc/healthycheck",
14         "httpmethod": "get"
15       },
16       {
17         "uri": "http://localhost:9098/checkhelath",
18         "httpmethod": "post"
19       },
20       {
21         "uri": "http://localhost:9067/grthelathcheck",
22         "httpmethod": "get"
23       },
24       {
25         "uri": "http://localhost:9043/grthelathcheck",
26         "httpmethod": "get"
27       }
28     ],
29     "webhooks": [], //钩子配置
30     "evaluationtimeonseconds": 10, //检测频率
31     "minimumsecondsbetweenfailurenotifications": 60, //推送间隔时间
32     "healthcheckdatabaseconnectionstring": "data source=\\healthchecksdb" //-> sqlite库存储检查配置及日志信息
33   }



1  //添加健康检查地址
2             hangfiresettings.instance.hostservers.foreach(s =>
3             {
4                 services.addhealthchecks().addurlgroup(new uri(s.uri), s.httpmethod.tolower() == "post" ? httpmethod.post : httpmethod.get, $"{s.uri}");
5             });
 1  app.usehealthchecks("/healthz", new healthcheckoptions()
 2             {
 3                 predicate = _ => true,
 4                 responsewriter = uiresponsewriter.writehealthcheckuiresponse
 5             });
 6             app.usehealthchecks("/health", options);//获取自定义格式的json数据
 7             app.usehealthchecksui(setup =>
 8             {
 9                 setup.uipath = "/hc"; // 健康检查的ui面板地址
10                 setup.apipath = "/hc-api"; // 用于api获取json的检查数据
11             });




 1 [{
 2     "id": 1,
 3     "status": "unhealthy",
 4     "onstatefrom": "2019-04-07t18:00:09.6996751+08:00",
 5     "lastexecuted": "2019-04-07t18:05:03.4761739+08:00",
 6     "uri": "http://localhost:53583/healthz",
 7     "name": "hangfire api 健康检查",
 8     "discoveryservice": null,
 9     "entries": [{
10         "id": 1,
11         "name": "http://localhost:17600/cityservice.svc/healthycheck",
12         "status": "unhealthy",
13         "description": "an error occurred while sending the request.",
14         "duration": "00:00:04.3907375"
15     }, {
16         "id": 2,
17         "name": "http://localhost:9098/checkhelath",
18         "status": "unhealthy",
19         "description": "an error occurred while sending the request.",
20         "duration": "00:00:04.4140310"
21     }, {
22         "id": 3,
23         "name": "http://localhost:9067/grthelathcheck",
24         "status": "unhealthy",
25         "description": "an error occurred while sending the request.",
26         "duration": "00:00:04.3847367"
27     }, {
28         "id": 4,
29         "name": "http://localhost:9043/grthelathcheck",
30         "status": "unhealthy",
31         "description": "an error occurred while sending the request.",
32         "duration": "00:00:04.4499007"
33     }],
34     "history": []
35 }]
 1 {
 2     "status": "unhealthy",
 3     "errors": [{
 4         "key": "http://localhost:17600/cityservice.svc/healthycheck",
 5         "value": "unhealthy"
 6     }, {
 7         "key": "http://localhost:9098/checkhelath",
 8         "value": "unhealthy"
 9     }, {
10         "key": "http://localhost:9067/grthelathcheck",
11         "value": "unhealthy"
12     }, {
13         "key": "http://localhost:9043/grthelathcheck",
14         "value": "unhealthy"
15     }]
16 }
 1  //重写json报告数据,可用于远程调用获取健康检查结果
 2             var options = new healthcheckoptions
 3             {
 4                 responsewriter = async (c, r) =>
 5                 {
 6                     c.response.contenttype = "application/json";
 8                     var result = jsonconvert.serializeobject(new
 9                     {
10                         status = r.status.tostring(),
11                         errors = r.entries.select(e => new { key = e.key, value = e.value.status.tostring() })
12                     });
13                     await c.response.writeasync(result);
14                 }
15             };



  1  /// <summary>
  2         /// 添加一个队列任务立即被执行
  3         /// </summary>
  4         /// <param name="httpjob"></param>
  5         /// <returns></returns>
  6         [httppost, route("addbackgroundjob")]
  7         public jsonresult addbackgroundjob([frombody] hangfire.httpjob.server.httpjobitem httpjob)
  8         {
  9             var addreslut = string.empty;
 10             try
 11             {
 12                 addreslut = backgroundjob.enqueue(() => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null));
 13             }
 14             catch (exception ec)
 15             {
 16                 return json(new message() { code = false, errormessage = ec.tostring() });
 17             }
 18             return json(new message() { code = true, errormessage = "" });
 19         }
 21         /// <summary>
 22         /// 添加一个周期任务
 23         /// </summary>
 24         /// <param name="httpjob"></param>
 25         /// <returns></returns>
 26         [httppost, route("addorupdaterecurringjob")]
 27         public jsonresult addorupdaterecurringjob([frombody] hangfire.httpjob.server.httpjobitem httpjob)
 28         {
 29             try
 30             {
 31                 recurringjob.addorupdate(httpjob.jobname, () => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null), httpjob.corn, timezoneinfo.local);
 32             }
 33             catch (exception ec)
 34             {
 35                 return json(new message() { code = false, errormessage = ec.tostring() });
 36             }
 37             return json(new message() { code = true, errormessage = "" });
 38         }
 40         /// <summary>
 41         /// 删除一个周期任务
 42         /// </summary>
 43         /// <param name="jobname"></param>
 44         /// <returns></returns>
 45         [httpget,route("deletejob")]
 46         public jsonresult deletejob(string jobname)
 47         {
 48             try
 49             {
 50                 recurringjob.removeifexists(jobname);
 51             }
 52             catch (exception ec)
 53             {
 54                 return json(new message() { code = false, errormessage = ec.tostring() });
 55             }
 56             return json(new message() { code = true, errormessage = "" });
 57         }
 58         /// <summary>
 59         /// 手动触发一个任务
 60         /// </summary>
 61         /// <param name="jobname"></param>
 62         /// <returns></returns>
 63         [httpget, route("triggerrecurringjob")]
 64         public jsonresult triggerrecurringjob(string jobname)
 65         {
 66             try
 67             {
 68                 recurringjob.trigger(jobname);
 69             }
 70             catch (exception ec)
 71             {
 72                 return json(new message() { code = false, errormessage = ec.tostring() });
 73             }
 74             return json(new message() { code = true, errormessage = "" });
 75         }
 76         /// <summary>
 77         /// 添加一个延迟任务
 78         /// </summary>
 79         /// <param name="httpjob">httpjob.delayfromminutes(延迟多少分钟执行)</param>
 80         /// <returns></returns>
 81         [httppost, route("addschedulejob")]
 82         public jsonresult addschedulejob([frombody] hangfire.httpjob.server.httpjobitem httpjob)
 83         {
 84             var reslut = string.empty;
 85             try
 86             {
 87                 reslut = backgroundjob.schedule(() => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null), timespan.fromminutes(httpjob.delayfromminutes));
 88             }
 89             catch (exception ec)
 90             {
 91                 return json(new message() { code = false, errormessage = ec.tostring() });
 92             }
 93             return json(new message() { code = true, errormessage = "" });
 94         }
 95         /// <summary>
 96         /// 添加连续任务,多个任务依次执行,只执行一次
 97         /// </summary>
 98         /// <param name="httpjob"></param>
 99         /// <returns></returns>
100         [httppost, route("addcontinuejob")]
101         public jsonresult addcontinuejob([frombody] list<hangfire.httpjob.server.httpjobitem> httpjobitems)
102         {
103             var reslut = string.empty;
104             var jobid = string.empty;
105             try
106             {
107                 httpjobitems.foreach(k =>
108                 {
109                     if (!string.isnullorempty(jobid))
110                     {
111                         jobid = backgroundjob.continuejobwith(jobid, () => runcontinuejob(k));
112                     }
113                     else
114                     {
115                         jobid = backgroundjob.enqueue(() => hangfire.httpjob.server.httpjob.excute(k, k.jobname, null));
116                     }
117                 });
118                 reslut = "true";
119             }
120             catch (exception ec)
121             {
122                 return json(new message() { code = false, errormessage = ec.tostring() });
123             }
124             return json(new message() { code = true, errormessage = "" });
125         }




 通过特性来添加任务重试时间间隔(hangfire 1.7 新增,单位/秒),重试次数,队列名称,任务名称,以及分布式锁超时时间

 1 /// <summary>
 2         /// 执行任务,delaysinseconds(重试时间间隔/单位秒)
 3         /// </summary>
 4         /// <param name="item"></param>
 5         /// <param name="jobname"></param>
 6         /// <param name="context"></param>
 7         [automaticretry(attempts = 3, delaysinseconds = new[] { 30, 60, 90 }, logevents = true, onattemptsexceeded = attemptsexceededaction.fail)]
 8         [displayname("api任务:{1}")]
 9         [queue("apis")]
10         [jobfilter(timeoutinseconds: 3600)]


 1 //设置分布式锁,分布式锁会阻止两个相同的任务并发执行,用任务名称和方法名称作为锁
 2             var jobresource = $"{filtercontext.backgroundjob.job.args[1]}.{filtercontext.backgroundjob.job.method.name}";
 3             var locktimeout = timespan.fromseconds(_timeoutinseconds);
 4             try
 5             {
 6                 //判断任务是否被暂停
 7                 using (var connection = jobstorage.current.getconnection())
 8                 {
 9                     var conts = connection.getallitemsfromset($"jobpauseof:{filtercontext.backgroundjob.job.args[1]}");
10                     if (conts.contains("true"))
11                     {
12                         filtercontext.canceled = true;//任务被暂停不执行直接跳过
13                         return;
14                     }
15                 }
16                 //申请分布式锁
17                 var distributedlock = filtercontext.connection.acquiredistributedlock(jobresource, locktimeout);
18                 filtercontext.items["distributedlock"] = distributedlock;
19             }
20             catch (exception ec)
21             {
22                 //获取锁超时,取消任务,任务会默认置为成功
23                 filtercontext.canceled = true;
24                 logger.info($"任务{filtercontext.backgroundjob.job.args[1]}超时,任务id{filtercontext.backgroundjob.id}");
25             }


1  if (!filtercontext.items.containskey("distributedlock"))
2             {
3                 throw new invalidoperationexception("找不到分布式锁,没有为该任务申请分布式锁.");
4             }
5             //释放分布式锁
6             var distributedlock = (idisposable)filtercontext.items["distributedlock"];
7             distributedlock.dispose();




1 public void onstateapplied(applystatecontext context, iwriteonlytransaction transaction)
2         {
3             //设置过期时间,任务将在三天后过期,过期的任务会自动被扫描并删除
4             context.jobexpirationtimeout = timespan.fromdays(3);
5         }




