Flink输出到Redis

   1.代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}//温度传感器读取样例类case class SensorReading(id: String, timestamp: Long, temperature: Double)object RedisSinkTest {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1)    //source    val inputStream  = env.readTextFile("sensor1.txt")    //transform    import org.apache.flink.api.scala._    val dataStream = inputStream.map(x => {      val arr = x.split(",")      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)    })    //sink    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()    dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))    env.execute("redis sink test")  }}//定义一个redis的mapper类,用于定义保存到redis时调用的命令class MyRedisMapper extends RedisMapper[SensorReading] {  override def getCommandDescription: RedisCommandDescription = {    //把传感器id和温度值保存成哈希表: HSET key field value    new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")  }  //相当于是field  override def getKeyFromData(data: SensorReading): String = {    data.id  }  override def getValueFromData(data: SensorReading): String = {    data.temperature.toString  }}2.结果 

Flink输出到Redis

相关推荐