Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)
Apache Flink创建模板项目有2种方式:
1. 通过Maven archetype命令创建;
2. 通过Flink 提供的Quickstart shell脚本创建;
关于Apache Flink的环境搭建,请参考相关链接:
Apache Flink快速入门-基本架构、核心概念和运行流程
Apache Flink v1.8 本地单机环境安装和运行Flink应用
1. 通过Maven archetype创建Flink项目
#使用Maven创建
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.8.0
-DgroupId=com.rickie
-DartifactId=flink-tutorial
-Dversion=0.1
-Dpackage=com.rickie.tutorial
-DinteractiveMode=false
参数说明:
原型archetype有关参数表
项目相关参数:
通过上述mvn 命令创建的Java模板项目结构。
从上述项目结构可以看出,该项目是一个比较完善的Maven项目,其中Java代码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量接口DataSet的实例代码和流式接口DataStream的实例代码。
2. 编写业务代码
将上述项目导入到IDEA中,Flink应用程序模板如下图所示。
打开StreamingJob.java文件,实现简单的单词统计(Word Count)业务功能。
具体代码如下所示。
package com.rickie.tutorial;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
// 设置streaming运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连接socket获取输入的数据
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
// split("\W+") 使用非数字字母切分字符串
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\W+");
for(String token : tokens) {
if(token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
dataStream.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
3. IDEA中运行应用、测试执行效果
首先,使用nc命令启动一个本地监听9000端口,和上一步代码中的端口号保持一致。
nc -l -p 9000
然后,在IDEA中运行StreamingJob,和本地server socket 9000端口建立连接,如下图所示。
接着,在nc命令窗口,输入一些单词,如下所示。
StreamingJob应用根据实现的业务逻辑,进行单词聚合,并输出。单词在5秒的时间窗口(翻滚时间窗口)中计算并打印到stdout。
说明:timeWindow(Time.seconds(5)),只有一个参数,表示是翻滚时间窗口(Tumbling window),即不重叠的时间窗口,只统计本窗口内的数据。
因为没启动Flink服务,所以去localhost:8081的web UI中进行监控。代码 StreamExecutionEnvironment.getExecutionEnvironment()会创建一个LocalEnvironment,然后在Java虚拟机上执行。
在Linux/Flink单机模式下运行
Linux 单机模式下启动Flink相当简单,直接运行bin/start-cluster.sh,会启动Flink的JobManager和TaskManager两个进程。如果想将上述程序提交到Flink,需要执行maven命令打成jar包,然后在命令行中,进入到bin目录下执行 flink run xxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务窗口中输出。
4. 使用IDEA开发批计算应用
Flink支持DataSet API 用于处理批量数据,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)完成数据集转换操作,然后通过sink进行存储,既可以写入HDFS这种分布式文件系统,也可以打印控制台。
在IDEA中,打开BatchJob.java文件,编写如下代码,实现批量计算逻辑。
package com.rickie.tutorial;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Batch Job.
*
* <p>For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for 'mainClass')
* and run 'mvn clean package' on the command line.
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 通过字符串构建数据集
DataSet<String> text = env.fromElements(
"I am Rickie ",
"Hello Rickie",
"Good morning ... Rickie"
);
// 分割字符串,按照key进行分组,统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCount = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
wordCount.print();
// execute program
// env.execute("Flink Batch Java API Skeleton");
}
// 分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
在IDEA中运行,查看输出的单词聚合结果。