当前位置: 移动技术网 > IT编程>脚本编程>Python > python3检测ossfs可用性+钉钉通知

python3检测ossfs可用性+钉钉通知

2019年12月05日  | 移动技术网IT编程  | 我要评论

冷帝罪妃,云南公务员报名入口,英语演讲下载

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @time    : 2019-12-02 15:16
# @author  : anthony
# @email   : ianghont7@163.com
# @file    : check-ossfs.py

# 检测ossfs进程是否存在
# 检测/xxxx/file是否挂载成功
# ossfs可用性检测

import psutil
import requests
import json
import subprocess
import threading

class dingtalk_base:
    def __init__(self):
        self.__headers = {'content-type': 'application/json;charset=utf-8'}
        self.url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
    def send_msg(self,text):
        json_text = {
            "msgtype": "text",
            "text": {
                "content": text
            },
            "at": {
                "atmobiles": [
                    ""
                ],
                "isatall": false
            }
        }
        return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content
class dingtalk_disaster(dingtalk_base):
    def __init__(self):
        super().__init__()
        # 填写机器人的url
        self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx"

def check_oss_pid():
    pids = psutil.pids()
    for pid in pids:
        p = psutil.process(pid)
        if "ossfs" == p.name().strip():
            return true
    else:
        return ding.send_msg("""ossfs报警通知:
机器:机器名称
消息内容:ossfs进程不存在,请及时处理...""")



def get_ossfs_name():
    try:
        args = "ls -lh /alidata/|grep file|awk '{print $3}'"
        running_shell = subprocess.popen(args,
                                         shell=true,
                                         stdin=subprocess.pipe,
                                         stdout=subprocess.pipe,
                                         stderr=subprocess.pipe,)
        out,err = running_shell.communicate()
        for line in out.splitlines():
            s1 = bytes.decode(line).strip()
            if s1.strip() != "root":
                return ding.send_msg("""ossfs报警通知:
机器:语文课堂
消息内容:/xxxx/file/挂载失败,请及时处理...""")
    except exception as e:
        print(e)

if __name__ == "__main__":
    ding = dingtalk_disaster()
    threads = [threading.thread(target=get_ossfs_name),
               threading.thread(target=check_oss_pid)]
    for t in threads:
        t.start()

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网