Flink二次开发
一、KafkaSink
1、按流内容分发到对应topic,隔天自动切换
在flink自带的kafka sink实现里,只支持写到固定topic,而我们的kafka2kafka日志处理逻辑要求消息要按照ds字段值写入到对应topic,topic名前缀相同,后面跟ds字段值,需要进行改造
具体实现思路如下:
(1)由如下源码可知KeyedSerializationSchema对象才能赋值schema,从而可以通过schema.getTargetTopic获得每条对应的topic,而常用的SimpleStringSchema并未继承KeyedSerializationSchema,无法得到对应的topic,只能使用固定topic
FlinkKafkaProducer011
private final KeyedSerializationSchema<IN> schema; @Override public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception { String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } ... } (2)继承合适的父类进行二次开发 实现了KeyedSerializationSchema的类有两种: TypeInformationKeyValueSerializationSchema<K, V>,可序列化也可反序列化,即既可用于生产者又可用于消费者 KeyedSerializationSchemaWrapper,只能序列化,只能用于生产者,传入SerializationSchema对象后可以自动调用其序列化方法进行序列化 还可以自己实现KeyedSerializationSchema接口 为了最大限度降低代码复杂度,提高执行效率,一定要选择合适的父类进行二次开发。只有序列化逻辑需要修改,因此选择实现KeyedSerializationSchema接口,重写部分方法,实现按json流内容中的ds字段值与指定的前缀组合成每条kafka信息的目标topic JsonKeyedSerializationSchemaWrapper public class JsonKeyedSerializationSchemaWrapper implements KeyedSerializationSchema<JSONObject> { private String topicPrefix; public JsonKeyedSerializationSchemaWrapper(String topicPrefix) { this.topicPrefix = topicPrefix; } @Override public byte[] serializeValue(JSONObject element) { return element.toJSONString().getBytes(); } @Override public byte[] serializeKey(JSONObject element) { return null; } @Override public String getTargetTopic(JSONObject element) { return this.topicPrefix+"_"+element.getString("ds"); } } (3)改造业务代码 LogParserBallEnconfEntopic JsonKeyedSerializationSchemaWrapper serial = new JsonKeyedSerializationSchemaWrapper( String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game")) ); FlinkKafkaProducer011<JSONObject> kafkaResultProducer = new FlinkKafkaProducer011<>( parameterTool.get("kafka.sink.bootstrap.servers"), String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game")) ,serial); 二、HdfsSink 1、按流内容分发到对应目录 flink自带的hdfs sink只支持写入到固定目录,而我们的kafka2hdfs处理逻辑要求消息要按照header_filepath字段值写入到对应的目录,类似如下形式: /logs/ball/json/Chat/ds=2018-11-12//logs/ball/json/Chat/status-1/ds=2018-11-12//logs/ball/json/status-2-CheckWg/ds=2018-10-31/ 具体实现思路如下: (1)由源码可知BucketingSink类的setBucketer(Bucketer<T> bucketer)方法确定要写入的文件目录 (2)继承合适的父类进行二次开发 实现了Bucketer接口的类有以下两种: DateTimeBucketer 写入到固定目录的桶内,桶是按给定日期格式生成的 BasePathBucketer 写入到固定目录 我们所要写入的目录不需要分桶,因此继承BasePathBucketer类,重写部分方法 HdfsBasePathBucketer public class HdfsBasePathBucketer extends BasePathBucketer<JSONObject> { private static final long serialVersionUID = 1L; @Override public Path getBucketPath(Clock clock, Path basePath, JSONObject element) { String header_filepath = element.getString("header_filepath"); return super.getBucketPath(clock, new Path(basePath+"/"+header_filepath), element); } } (3)改造业务代码 Bucketer bucketer = new HdfsBasePathBucketer(); hdfsSink.setBucketer(bucketer);
2、定制文件内容
flink自带的hdfs sink只支持将接收到的消息整体使用UTF-8格式写入到文件,而我们的kafka2hdfs处理逻辑要求只写body字段内容到文件
具体实现思路如下:
(1)由源码可知BucketingSink类的setWriter(Writer<T> writer)方法确定要写入的内容
(2)继承合适的父类进行二次开发
AvroKeyValueSinkWriter 生成avro文件
- SequenceFileWriter 生成hadoop sequencefile文件,可指定压缩级别
- StringWriter 默认生成UTF-8编码的文本文件
继承StringWriter,重写部分方法
HdfsStringWriter
@Override public void write(JSONObject element) throws IOException { String body = element.getString("body"); try { cs = Charset.forName(csn); } catch (IllegalCharsetNameException e) { throw new IOException("The charset " + csn + " is not valid.", e); } catch (UnsupportedCharsetException e) { throw new IOException("The charset " + csn + " is not supported.", e); } FSDataOutputStream outputStream = getStream(); outputStream.write(body.getBytes(cs)); outputStream.write(‘\n‘); } @Override public StringWriter<JSONObject> duplicate() { return new HdfsStringWriter(); }
(3)改造业务代码
hdfsSink.setWriter(new HdfsStringWriter());
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11