当前位置: 移动技术网 > IT编程>脚本编程>Python > python获取微信企业号打卡数据并生成windows计划任务

python获取微信企业号打卡数据并生成windows计划任务

2019年06月05日  | 移动技术网IT编程  | 我要评论

alcmtr,魅惑暴王别宠我,夏川纯子

由于公司的系统用的是java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,所以只能自己捣鼓这个,以下是个人设置的一些内容,仅供大家参考

安装python

python的安装,这里就不详细写了,大家可自行度娘或google。

安装第三方库

python安装好之后别忘记配置环境变量!另外,所以的内容都是安装在服务器上的,且服务器需要能够上外网,否则,只能配置在本地,因为需要外网连接微信企业号的接口。这里需要用到几个第三方库:

python的pip命令,一般python安装好之后都会默认有,如果不确定,可输入命令查询,通过cmd进入命令提示符,输入

pip list

如果提示你需要更新,你可以更新,也可以不更新,更新命令其实给到你了python -m pip install --upgrade pip

安装所需要的库

step.1

pip install pymssql

如果安装pymssql出错,提示什么visual c++ 14,则先安装wheel,如不报错则忽略step2、step3

step.2

pip install wheel

step.3

下载pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

可去这里下载最新版本的。

下载好之后,进入该文件所在的目录,通过pip install安装即可cd d:\

pip install pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

step.4

pip install requests

至此,所有第三方库都配置好了。

写主程序

# !/usr/bin/python
# -*- coding:utf-8 -*-
# @time: 2018/7/26 16:05
# @author: hychen.cc
import json # 因微信企业号返回的格式为json,所以引入json
import requests
import pymssql
import math # 引入数学方法
import time
import datetime
server = 'xx.xx.xx.xx' # 数据库服务器地址
user = 'sa' # 数据库登录名,可以用sa
password = '******' # 数据库用户对应的密码
dbname = 'dbname' # 数据库名称
corp_id = 'xxxxxx' # 微信企业号提供的corp_id
corp_secret = 'xxxxx' # 微信企业号提供的corp_secret
"""

因微信接口所需要unix时间戳,所以需要把时间转为为unix时间戳格式

定义时间转换为unix时间方法

"""def datetime_timestamp(dt):
 # dt为字符串
 # 中间过程,一般都需要将字符串转化为时间数组
 time.strptime(dt, '%y-%m-%d %h:%m:%s')
 ## time.struct_time(tm_year=2018, tm_mon=10, tm_mday=25, tm_hour=10, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=88, tm_isdst=-1)
 # 将"2018-10-25 10:00:00"转化为时间戳
 s = time.mktime(time.strptime(dt, '%y-%m-%d %h:%m:%s'))
 return int(s)
# 定义连接数据库方法
def get_link_server():
 connection = pymssql.connect(server, user, password, database=dbname)
 if connection:
  return connection
 else:
  raise valueerror('connect dbserver failed.')
"""

定义获取用户列表,因为微信企业号一次最大只能获取100个,所以需要转换为列表格式,分批次获取

我这里设置是从db中获取有权限微信打卡的人员(select * from table),换成自己的方式即可

"""
def get_userid_list():
 """
 获取用户列表
 :return:
 """
 conn = get_link_server()
 cursor = conn.cursor()
 sql = "select * from table"
 cursor.execute(sql)
 row = cursor.fetchone()
 userlist = []
 while row:
  userlist.append(row[0])
  row = cursor.fetchone()
 if userlist:
  return userlist
 else:
  raise valueerror('get userlist failed.')
 conn.close()
"""

获取access_token,因为token有时效(2小时),所以需要存在本地,这样不需要频繁调用,所以我定义了存储过程(sp_getwx_access_token)来判断之前存储的token是否有效,有效的话就不需要重复获取了

"""
def get_access_token(refresh=false):
 """
 获取access token
 :return:
 """
 if not refresh:
  api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
   corp_id, corp_secret)
  response = requests.get(api_access_token_url, verify=false)
  if response.status_code == 200:
   rep_dict = json.loads(response.text)
   errcode = rep_dict.get('errcode')
   if errcode:
    raise valueerror('get wechat access token failed, errcode=%s.' % errcode)
   else:
    access_token = rep_dict.get('access_token')
    if access_token:
     conn = get_link_server()
     cursor = conn.cursor()
     cursor.execute('exec sp_getwx_access_token @access_token=%s', access_token)
     conn.commit()
     conn.close()
     return access_token
    else:
     raise valueerror('get wechat access token failed.')
  else:
   raise valueerror('get wechat access token failed.')
 else:
  conn = get_link_server()
  cursor = conn.cursor()
  cursor.execute("select access_token from wx_accesstoken where id=1")
  access_token = cursor.fetchone()
  if access_token:
   return access_token[0]
   conn.close()
  else:
   api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
    corp_id, corp_secret)
   response = requests.get(api_access_token_url, verify=false)
   if response.status_code == 200:
    rep_dict = json.loads(response.text)
    errcode = rep_dict.get('errcode')
    if errcode:
     raise valueerror('get wechat access token failed, errcode=%s.' % errcode)
    else:
     access_token = rep_dict.get('access_token')
     if access_token:
      conn = get_link_server()
      cursor = conn.cursor()
      cursor.execute('exec sp_getwx_access_token @access_token=%s', access_token)
      conn.commit()
      conn.close()
      return access_token
     else:
      raise valueerror('get wechat access token failed.')
   else:
    raise valueerror('get wechat access token failed.')
# 获取微信打卡的json格式
def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist):
 api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
 json_str = json.dumps(
  {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime, 'useridlist': useridlist})
 response = requests.post(api_punch_card_url, data=json_str, verify=false)
 if response.status_code == 200:
  rep_dic = json.loads(response.text)
  errcode = rep_dic.get('errcode')
  if errcode == 42001:
   access_token = get_access_token(true)
   api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
   json_str = json.dumps(
    {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime,
    'useridlist': useridlist})
   response = requests.post(api_punch_card_url, data=json_str, verify=false)
   rep_dic = json.loads(response.text)
   errcode = rep_dic.get('errcode')
   if errcode:
    raise valueerror('get punch data failed1, errcode=%s' % errcode)
   else:
    value_str = rep_dic.get('checkindata')
    if value_str:
     return value_str
    else:
     raise valueerror('get punch data failed2.')
  elif errcode:
   raise valueerror ('get punch data failed3, errcode=%s' % errcode)
  else:
   value_str = rep_dic.get('checkindata')
   if value_str:
    return value_str
   else:
    raise valueerror('i do not find employee punch data.')
 else:
  raise valueerror ('get punch data failed5.')
# 调用接口,获得数据
if __name__ == '__main__':
 today = datetime.date.today()
 oneday = datetime.timedelta(days=3) # days,即获取几天内的
 yesterday = today - oneday
 starttime = datetime_timestamp(yesterday.strftime('%y-%m-%d') + ' 00:00:00')
 endtime = datetime_timestamp(today.strftime('%y-%m-%d') + ' 23:59:59')
 opencheckindatatype = 3
 access_token = get_access_token()
 if access_token:
  useridlist = get_userid_list()
  if useridlist:
   step = 100
   total = len(useridlist)
   n = math.ceil(total/step)
   for i in range(n):
    # print (useridlist[i*step:(i+1)*step])
    punch_card = get_punchcard_info(access_token, opencheckindatatype, starttime, endtime,useridlist[i*step:(i+1)*step])
    # print (punch_card)
    if punch_card:
     conn = get_link_server()
     cursor = conn.cursor()
     for dic_obj in punch_card:
      cursor.execute('exec sp_analysispunchcard @json=%s',
          (json.dumps(dic_obj, ensure_ascii=false)))
      # print((json.dumps(dic_obj, ensure_ascii=false))),sp_analysispunchcard把获取到的数据解析后存入数据库中
      conn.commit()
     conn.close()
     print ('get punch card successed.')
    else:
     raise valueerror('no userlist exists')

设置windows计划任务

通过控制面板-管理工具-任务计划程序,右击选择创建基本任务,这里注意的是路径和程序。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

程序或脚本:python.exe

添加参数(可选)(a):你的py文件目录

起始于:python目录,如果不知道python安装到哪去了,按照下列cmd命令,输入python后进入python命令查询

import sys
sys.prefix,回车

到此,配置完成,可自行右击任务-执行查询效果,或者通过python命令执行py文件

进入到py文件目录

python xxx.py

总结

以上所述是小编给大家介绍的python获取微信企业号打卡数据并生成windows计划任务,希望对大家有所帮助

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网