Flink通过SQLClinet创建kafka源表并进行实时计算
1.通过自建kafka的生产者来产生数据
/bin/kafka-console-producter.sh --broker-list 192.168.58.177:9092 --topic my_topic
数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}2.在kafka进行消费
/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --topic my_topic --partition 0 --offset 0

3.在Flink的sqlclient 创建表
CREATE TABLE user_log1 (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts VARCHAR
) WITH (
‘connector.type‘ = ‘kafka‘,
‘connector.version‘ = ‘universal‘,
‘connector.topic‘ = ‘my-topic-one‘,
‘connector.startup-mode‘ = ‘earliest-offset‘,
‘connector.properties.group.id‘ = ‘testGroup‘,
‘connector.properties.zookeeper.connect‘ = ‘192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181‘,
‘connector.properties.bootstrap.servers‘ = ‘192.168.58.177:9092‘,
‘format.type‘ = ‘json‘
);
实时计算
select item_id,count(*) from user_log1 group by item_id;
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11

