Python生成器实现数据处理管道
假设现在有如下业务场景
某生产系统的日志文件如下,并且在持续增加...
[ncms@UPZZGAP02 logs]$ pwd /home/ncms/ncms/logs [ncms@UPZZGAP02 logs]$ ll 总用量 797020 -rw-rw-r-- 1 ncms ncms 495465795 11月 30 17:10 ansible.log -rw-rw-r-- 1 ncms ncms 2251937 11月 30 17:10 celery_beat.log -rw-rw-r-- 1 ncms ncms 16003 11月 15 10:26 celery_flower.log -rw-rw-r-- 1 ncms ncms 7042114 11月 30 17:10 celery_worker.log -rw-r--r-- 1 ncms ncms 24665873 11月 30 17:10 db_error.log -rw-r--r-- 1 ncms ncms 52428571 11月 28 18:46 db_error.log.1 -rw-r--r-- 1 ncms ncms 52428691 11月 24 06:43 db_error.log.2 -rw-r--r-- 1 ncms ncms 22410652 11月 19 15:16 db_error.log.3 -rw-r--r-- 1 ncms ncms 28064985 11月 30 17:10 db_info.log -rw-r--r-- 1 ncms ncms 52426630 11月 28 13:29 db_info.log.1 -rw-r--r-- 1 ncms ncms 52427357 11月 24 03:48 db_info.log.2 -rw-r--r-- 1 ncms ncms 24276767 11月 19 15:16 db_info.log.3 -rw-rw-r-- 1 ncms ncms 42490 11月 30 13:06 ncms_access.log -rw-rw-r-- 1 ncms ncms 24072 10月 30 15:33 ncms_error.log -rw-rw-r-- 1 ncms ncms 1350318 11月 30 16:38 nginx_access.log -rw-rw-r-- 1 ncms ncms 1685 11月 7 18:15 nginx_error.log -rw-rw-r-- 1 ncms ncms 24001 11月 15 10:27 supervisord.log -rw-rw-r-- 1 ncms ncms 645742 11月 30 16:38 uwsgi.log [ncms@UPZZGAP02 logs]$ du -sh * 473M ansible.log 2.2M celery_beat.log 16K celery_flower.log 6.8M celery_worker.log 24M db_error.log 51M db_error.log.1 51M db_error.log.2 22M db_error.log.3 27M db_info.log 51M db_info.log.1 51M db_info.log.2 24M db_info.log.3 44K ncms_access.log 24K ncms_error.log 1.3M nginx_access.log 4.0K nginx_error.log 24K supervisord.log 632K uwsgi.log [ncms@UPZZGAP02 logs]$
其中有应用、数据库、Celery、Nginx、uwsgi、supervisord、Ansible的日志,Ansible.log有473M,未来很定会更大。现在需要使用某些关键字对日志进行查找分析,应该如何做?
最简单粗暴的方式就是使用grep
之类的命令,递归查找所有的.log文件,但这样会耗费大量内存,影响机器性能。
可以考虑使用数据管道 (类似 Unix 管道) 的方式迭代处理数据。使用Python生成器函数是一个实现管道机制
#!/usr/bin/env python # -*- coding:utf-8 -*- # __author__ = 'liao gao xiang' import os import fnmatch import gzip import bz2 import re # 问题,你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据 # 需要处理,但是不能将它们一次性放入内存中。可以使用生成器实现数据处理管道 """ 文件格式如下 foo/ access-log-012007.gz access-log-022007.gz access-log-032007.gz ... access-log-012008 bar/ access-log-092007.bz2 ... access-log-022008 """ def gen_find(filepat, top): """ 查找符合Shell正则匹配的目录树下的所有文件名 :param filepat: shell正则 :param top: 目录路径 :return: 文件绝对路径生成器 """ for path, _, filenames in os.walk(top): for file in fnmatch.filter(filenames, filepat): yield os.path.join(path, file) def gen_opener(filenames): """ 每打开一个文件生成就生成一个文件对象,调用下一个迭代前关闭文件 :param filenames: 多个文件绝对路径组成的可迭代对象 :return: 文件对象生成器 """ for filename in filenames: if filename.endswith('.gz'): f = gzip.open(filename, 'r', encoding='utf-8') elif filename.endswith('.bz2'): f = bz2.open(filename, 'r', encoding='utf-8') else: f = open(filename, 'r', encoding='utf-8') yield f f.close() def gen_concatenate(iterators): """ 将输入序列拼接成一个很长的行序列。 :param iterators: :return: 返回生成器所产生的所有值 """ for it in iterators: yield from it def gen_grep(pattern, lines): """ 使用正则匹配行 :param pattern: 正则匹配 :param lines: 多行 :return: 结果生成器 """ pat = re.compile(pattern) for n, line in enumerate(lines, start=1): if pat.search(line): yield n, line if __name__ == "__main__": filenames = gen_find('*.log', '/home/ncms/ncms/logs') files = gen_opener(filenames) lines = gen_concatenate(files) user_action = gen_grep('(?i)liaogaoxiang_kd', lines) for n, line in user_action: print(line)
查询包含用户 liaogaoxiang_kd 的所有记录,数据结果如下:
[views:post]:2018-11-07 18:13:09.841490 -users- liaogaoxiang_kd登录成功! [views:get]:2018-11-07 18:16:04.681519 -users- liaogaoxiang_kd访问了用户信息列表 [views:post]:2018-11-07 18:16:23.866700 -users- liaogaoxiang_kd编辑了用户的信息 [views:get]:2018-11-07 18:16:23.878949 -users- liaogaoxiang_kd访问了用户信息列表 [views:get]:2018-11-07 18:16:25.641090 -users- liaogaoxiang_kd访问了用户信息列表 [views:post]:2018-11-07 18:16:42.671377 -users- liaogaoxiang_kd编辑了用户的信息 [views:get]:2018-11-07 18:16:42.719873 -users- liaogaoxiang_kd访问了用户信息列表 [views:post]:2018-11-08 11:17:42.627693 -users- liaogaoxiang_kd登录成功!
如需查询其它错误信息,只需替换gen_grep('(?i)liaogaoxiang_kd', lines)
中的关键字即可!以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时 轮询等。
为了理解上述代码,重点是要明白 yield
语句作为数据的生产者而 for 循环语句 作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元 素传递给迭代处理管道的下一阶段。这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就 很容易编写和维护。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。
使用这种方式的内存效率很高。上述代码即便是在一个超大型文件目录中 也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。 在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。 itertools.chain() 函数同样有类似的功能, 但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样 的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭 代步骤时文件就关闭了,因此 chain() 在这里不能这样使用。上面的方案可以避免这 种情况。
gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父 生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。
程序员交流群,干货分享,加我拉你入群。