python3检测ossfs可用性+钉钉通知

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019-12-02 15:16
# @Author  : Anthony
# @Email   : 
# @File    : check-ossfs.py

# 检测ossfs进程是否存在
# 检测/xxxx/file是否挂载成功
# ossfs可用性检测

import psutil
import requests
import json
import subprocess
import threading

class DingTalk_Base:
    def __init__(self):
        self.__headers = {‘Content-Type‘: ‘application/json;charset=utf-8‘}
        self.url = ‘https://oapi.dingtalk.com/robot/send?access_token=xxxxx‘
    def send_msg(self,text):
        json_text = {
            "msgtype": "text",
            "text": {
                "content": text
            },
            "at": {
                "atMobiles": [
                    ""
                ],
                "isAtAll": False
            }
        }
        return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content
class DingTalk_Disaster(DingTalk_Base):
    def __init__(self):
        super().__init__()
        # 填写机器人的url
        self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx"

def check_oss_pid():
    pids = psutil.pids()
    for pid in pids:
        p = psutil.Process(pid)
        if "ossfs" == p.name().strip():
            return True
    else:
        return ding.send_msg("""ossfs报警通知:
机器:机器名称
消息内容:ossfs进程不存在,请及时处理...""")



def get_ossfs_name():
    try:
        args = "ls -lh /alidata/|grep file|awk ‘{print $3}‘"
        running_shell = subprocess.Popen(args,
                                         shell=True,
                                         stdin=subprocess.PIPE,
                                         stdout=subprocess.PIPE,
                                         stderr=subprocess.PIPE,)
        out,err = running_shell.communicate()
        for line in out.splitlines():
            s1 = bytes.decode(line).strip()
            if s1.strip() != "root":
                return ding.send_msg("""ossfs报警通知:
机器:语文课堂
消息内容:/xxxx/file/挂载失败,请及时处理...""")
    except Exception as e:
        print(e)

if __name__ == "__main__":
    ding = DingTalk_Disaster()
    threads = [threading.Thread(target=get_ossfs_name),
               threading.Thread(target=check_oss_pid)]
    for t in threads:
        t.start()

相关推荐