Flink输出到Redis
1.代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}//温度传感器读取样例类case class SensorReading(id: String, timestamp: Long, temperature: Double)object RedisSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //source val inputStream = env.readTextFile("sensor1.txt") //transform import org.apache.flink.api.scala._ val dataStream = inputStream.map(x => { val arr = x.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }) //sink val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build() dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper)) env.execute("redis sink test") }}//定义一个redis的mapper类,用于定义保存到redis时调用的命令class MyRedisMapper extends RedisMapper[SensorReading] { override def getCommandDescription: RedisCommandDescription = { //把传感器id和温度值保存成哈希表: HSET key field value new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature") } //相当于是field override def getKeyFromData(data: SensorReading): String = { data.id } override def getValueFromData(data: SensorReading): String = { data.temperature.toString }}2.结果
相关推荐
xiaoyutongxue 2020-04-19
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11