【大数据实践】KSQL流处理——如何将多个STREAM输出到一个TOPIC
【大数据实践】KSQL流处理——如何将数据处理结果推到指定Topic
需求场景描述
在生产环境中,各个业务服务产生的事件
都会被push到Kafka
消息中间件中。如:充值中心的 充值事件 会被push到kafka的recharge
topic中,玩家 结算事件 会被push到kafka的game_score
topic中。
平台希望通过处理,实时分析这些事件,筛选出满足条件的一些玩家,对其奖励相应道具。如,想做一个针对不同充值金额的玩家奖励不同道具的活动:
- 当
0 < 充值金额 < 100
时, 奖励一个10万金币卡
道具(道具ID:"10w") - 当
100 <= 充值金额
时,奖励一个100万金币卡
道具(道具ID:"100w")
方案设计
将充值的事件(原始日志数据,JSON格式),推送到Kafka的
recharge
topic中。充值事件数据格式:{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 100 }
kafka中新建一个
PROPREWARD
的topic,专门接收道具奖励
的事件,该主题事件消息格式为:{"user/name" : "foo" "prop/id" : "道具ID" "reward/reason" : "奖励的原因"}
- 一个道具发放服务(Kafka消费者)订阅该主题,当道具奖励事件到达时,获取事件中的 用户名 和 道具ID 为指定玩家发放道具。
- 使用
KSQL
创建两个派生流(Stream),分别从recharge
topic中过滤出0 < 充值金额 < 100
和100 <= 充值金额
的事件,过滤出符合条件的用户名,并组装成约定的道具奖励事件
,将其推送到Kafka的PROPREWARD
topic中。
具体实现
新建一个kafka topic : PROPREWARD (大写),用于接收和存储
道具奖励事件
。./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic PROPREWARD
从kafka topic
PROPREWARD
创建一个流(PROPREWARD
),用于输出道具奖励事件,注意:流的名字与kafka topic的名字相同。CREATE STREAM PROPREWARD (`user/name` varchar, `seed/id` varchar, `reward/reason` varchar) \ WITH (kafka_topic='PROPREWARD', value_format='JSON');
根据业务需要,新建一个查询规则为
0 < 充值金额 < 100
的派生流并插入到PROPREWARD
流中。INSERT INTO PROPREWARD \ SELECT username AS `user/name` , '10w' AS `prop/id`, '充值100元以内奖励10万金币卡道具' AS `reward/reason` \ FROM recharge \ WHERE EVENT_TYPE = 'cash_order' AND CASH > 0 AND CASH <= 100;
根据业务需要,新建一个查询规则为
100 <= 充值金额
的派生流并插入到PROPREWARD
流中。INSERT INTO PROPREWARD \ SELECT username AS `user/name` , '100w' AS `prop/id`, '充值超过100元奖励100万金币卡道具' AS `reward/reason` \ FROM recharge \ WHERE EVENT_TYPE = 'cash_order' AND CASH >= 100;
- 还可以根据业务需要,从其他kafka topic中派生出其他流,插入到
PROPREWARD
流中。
结果验证
往kafka的
recharge
topic中写入数据:{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 9 }
可以在topic
PROPREWARD
中接收到事件:{"user/name" : "foo" , "prop/id" : "10w" "reward/reason" : "充值100元以内奖励10万金币卡道具"}
往kafka的
recharge
topic中写入数据:{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 11 }
可以在topic
PROPREWARD
中接收到事件:{"user/name" : "foo" , "prop/id" : "100w", "reward/reason" : "充值100元以上奖励100万金币卡道具"}
注意
要想将上述两个派生流插入(INSERT INTO)到输出结果的
PROPREWARD
流中,需要确保:- 流
PROPREWARD
的名字与输出结果的Kafka topic名字相同,否则会抛出异常。这应该是KSQL 5.0.0 的一个BUG。 - 派生流中输出的数据结构 (SELECT username AS
user/name
, '100w' ASprop/id
, '充值超过100元奖励100万金币卡道具' ASreward/reason
) 与 流PROPREWARD
定义的结构相同。
- 流
总结
- 通过Kafka + KSQL 流式处理,可以配置出丰富的
活动
——根据各种不同的事件和规则,奖励不同的道具(或者其他类型的东西),而不需要额外的代码开发!! - KSQL官方参考文档