python中使用pyspark 读取和整理日志数据并将数据写入到es中去

代码:

import re
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
from elasticsearch import Elasticsearch
spark=SparkSession.builder.appName("lz").getOrCreate()
sc = SparkContext.getOrCreate()
es = Elasticsearch()
month_map = {‘Jan‘: ‘1‘, ‘Feb‘: ‘2‘, ‘Mar‘:‘3‘, ‘Apr‘:‘4‘, ‘May‘:‘5‘, ‘Jun‘:‘6‘, ‘Jul‘:‘7‘,
    ‘Aug‘:‘8‘,  ‘Sep‘: ‘9‘, ‘Oct‘:‘10‘, ‘Nov‘: ‘11‘, ‘Dec‘: ‘12‘}

log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件


for b in log_data.toLocalIterator(): 
    #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去
    # e=‘Ambari:Mar  2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2‘#日志格式
    log_group=re.search(‘^(\S+):(\w{3})\s+(\d{1,2})\s(\d{2}:\d{2}:\d{2})\s(\S+)\s(\S+)\[(\d+)\]:\s(.+)‘,b)
    if log_group:
        year=‘2019‘
        try:
            logtime = year+‘-‘+month_map[log_group.group(2)]+‘-‘+log_group.group(3)+‘ ‘+log_group.group(4) #将字段拼接成年月日的格式
            logtime = datetime.datetime.strptime(logtime,‘%Y-%m-%d %H:%M:%S‘)
        except Exception as e:
           pass
        row = dict(_hostname=log_group.group(1), #将数据组成一个字典  k,v
                  syslog_timestamp=logtime,
                  hostname=log_group.group(5),
                  program=log_group.group(6),
                  pid=log_group.group(7),
                  msg = log_group.group(8))
        if re.match(‘^Accepted password for‘,row[‘msg‘]) or re.match(‘^Accepted publickey for‘,row[‘msg‘]) :

            msg_a=re.search(‘Accepted\s\w+\sfor\s(\S+)\sfrom\s(\d{2,3}\.\d{2,3}\.\d{2,3}\.\d{2,3})\sport\s(\d+)‘,row[‘msg‘])
            row[‘login_success‘]=True
            row[‘login_success_msg‘]={‘username‘:msg_a.group(1),‘user_ip‘:msg_a.group(2),‘user_port‘:msg_a.group(3)}
        es.index(index=‘data_log02‘,doc_type=‘test02‘,body=row) #将数据写入到es中去
    else:
        break

转自:https://www.cnblogs.com/wangkun122/articles/10936938.html