Riko Python 流处理引擎 项目简介
Riko是一款Python 流处理引擎,类似Yahoo Pipes。采用纯python开发,用于分析处理结构化数据流。拥有同步和异步APIs,同时也支持并行RSS feeds。Riko也支持字符终端界面。功能特性:可读取csv/xml/json/html文件。通过模块化的管道可创建文本流和数据流。可解析、处理、提取RSS/Atom feeds。可创建强大的混合型APIs和maps。支持并行处理。使用示例代码:>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.collections.sync import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. the `detag` option will strip all html tags from the result
>>> # 2. fetch the text contained inside the 'body' tag of the hackernews
>>> # homepage
>>> # 3. replace newlines with spaces and assign the result to 'content'
>>> # 4. tokenize the resulting text using whitespace as the delimeter
>>> # 5. count the number of times each token appears
>>> # 6. obtain the raw stream
>>> # 7. extract the first word and its count
>>> # 8. extract the second word and its count
>>> # 9. extract the third word and its count
>>> url = 'https://news.ycombinator.com/'
>>> fetch_conf = {
... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
>>>
>>> replace_conf = {
... 'rule': [
... {'find': '\r\n', 'replace': ' '},
... {'find': '\n', 'replace': ' '}]}
>>>
>>> flow = (
... SyncPipe('fetchpage', conf=fetch_conf) # 2
... .strreplace(conf=replace_conf, assign='content') # 3
... .stringtokenizer(conf={'delimiter': ' '}, emit=True) # 4
... .count(conf={'count_key': 'content'})) # 5
>>>
>>> stream = flow.output # 6
>>> next(stream) # 7
{"'sad": 1}
>>> next(stream) # 8
{'(': 28}
>>> next(stream) # 9
{'(1999)': 1}
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.collections.sync import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. the `detag` option will strip all html tags from the result
>>> # 2. fetch the text contained inside the 'body' tag of the hackernews
>>> # homepage
>>> # 3. replace newlines with spaces and assign the result to 'content'
>>> # 4. tokenize the resulting text using whitespace as the delimeter
>>> # 5. count the number of times each token appears
>>> # 6. obtain the raw stream
>>> # 7. extract the first word and its count
>>> # 8. extract the second word and its count
>>> # 9. extract the third word and its count
>>> url = 'https://news.ycombinator.com/'
>>> fetch_conf = {
... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
>>>
>>> replace_conf = {
... 'rule': [
... {'find': '\r\n', 'replace': ' '},
... {'find': '\n', 'replace': ' '}]}
>>>
>>> flow = (
... SyncPipe('fetchpage', conf=fetch_conf) # 2
... .strreplace(conf=replace_conf, assign='content') # 3
... .stringtokenizer(conf={'delimiter': ' '}, emit=True) # 4
... .count(conf={'count_key': 'content'})) # 5
>>>
>>> stream = flow.output # 6
>>> next(stream) # 7
{"'sad": 1}
>>> next(stream) # 8
{'(': 28}
>>> next(stream) # 9
{'(1999)': 1}