监控kafka队列长度
#!/usr/local/python37/bin/python
#获取kafka命令中lags的值,来判定现在队列中有多少未消费,如果超过5000,则可能有延迟
import os
import re
import datetime
import time
import pandas as pd
import numpy as np
import subprocess
#now=datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
now=time.time()
lagsInfos=os.popen("sh /opt/elk/kafka_node1/bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.10.100:9092 --group logstash | awk ‘{if($5>20){print $1,$5}}‘").read()
#定义dataframe的index及value的列表
columnList=[]
#lagList=[]
#print(lagsInfos.splitlines())
for i in range(1,len(lagsInfos.splitlines())):
lagList=[]
lagInfo=lagsInfos.splitlines()[i].split()
lagList.append(lagInfo[0])
lagList.append(int(lagInfo[1]))
columnList.append(lagList)
df=pd.DataFrame(columnList,columns=["topics","LAG"])
dfResult=df.groupby("topics",as_index=False).sum()
h1=dfResult.loc[dfResult["LAG"]>100,["topics","LAG"]]
#print(h1)
if len(h1)==0:
print("OK")
else:
#将要发送的短信内容中的空格和换行符替换成url里面的格式,否则在发送短信时会报错
msg=str(h1).replace(" ","%20").replace("\n","%0a")
url="‘http://sms.domain.com/Smsweb/sms?pid=smsPid&pwd=Mjdfadklfae&phone=1111111111&msg="+msg+"‘"
print(url)
result=subprocess.getoutput("curl " + url)
print(result)
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29
sweetgirl0 2020-07-27