Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

Apache Flink创建模板项目有2种方式:

1. 通过Maven archetype命令创建;

2. 通过Flink 提供的Quickstart shell脚本创建;

关于Apache Flink的环境搭建,请参考相关链接:

Apache Flink快速入门-基本架构、核心概念和运行流程

Apache Flink v1.8 本地单机环境安装和运行Flink应用

1. 通过Maven archetype创建Flink项目

#使用Maven创建

mvn archetype:generate

-DarchetypeGroupId=org.apache.flink

-DarchetypeArtifactId=flink-quickstart-java

-DarchetypeVersion=1.8.0

-DgroupId=com.rickie

-DartifactId=flink-tutorial

-Dversion=0.1

-Dpackage=com.rickie.tutorial

-DinteractiveMode=false

参数说明:

原型archetype有关参数表

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

项目相关参数:

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

通过上述mvn 命令创建的Java模板项目结构。

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

从上述项目结构可以看出,该项目是一个比较完善的Maven项目,其中Java代码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量接口DataSet的实例代码和流式接口DataStream的实例代码。

2. 编写业务代码

将上述项目导入到IDEA中,Flink应用程序模板如下图所示。

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

打开StreamingJob.java文件,实现简单的单词统计(Word Count)业务功能。

具体代码如下所示。

package com.rickie.tutorial;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

/**

* Skeleton for a Flink Streaming Job.

*

* <p>For a tutorial how to write a Flink streaming application, check the

* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.

*

* <p>To package your application into a JAR file for execution, run

* 'mvn clean package' on the command line.

*

* <p>If you change the name of the main class (with the public static void main(String[] args))

* method, change the respective entry in the POM.xml file (simply search for 'mainClass').

*/

public class StreamingJob {

public static void main(String[] args) throws Exception {

// set up the streaming execution environment

// 设置streaming运行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 连接socket获取输入的数据

DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);

// split("\W+") 使用非数字字母切分字符串

DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

@Override

public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

String[] tokens = s.toLowerCase().split("\W+");

for(String token : tokens) {

if(token.length() > 0) {

collector.collect(new Tuple2<String, Integer>(token, 1));

}

}

}

}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

dataStream.print();

// execute program

env.execute("Flink Streaming Java API Skeleton");

}

}

3. IDEA中运行应用、测试执行效果

首先,使用nc命令启动一个本地监听9000端口,和上一步代码中的端口号保持一致。

nc -l -p 9000

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

然后,在IDEA中运行StreamingJob,和本地server socket 9000端口建立连接,如下图所示。

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

接着,在nc命令窗口,输入一些单词,如下所示。

StreamingJob应用根据实现的业务逻辑,进行单词聚合,并输出。单词在5秒的时间窗口(翻滚时间窗口)中计算并打印到stdout。

说明:timeWindow(Time.seconds(5)),只有一个参数,表示是翻滚时间窗口(Tumbling window),即不重叠的时间窗口,只统计本窗口内的数据。

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

因为没启动Flink服务,所以去localhost:8081的web UI中进行监控。代码 StreamExecutionEnvironment.getExecutionEnvironment()会创建一个LocalEnvironment,然后在Java虚拟机上执行。

在Linux/Flink单机模式下运行

Linux 单机模式下启动Flink相当简单,直接运行bin/start-cluster.sh,会启动Flink的JobManager和TaskManager两个进程。如果想将上述程序提交到Flink,需要执行maven命令打成jar包,然后在命令行中,进入到bin目录下执行 flink run xxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务窗口中输出。

4. 使用IDEA开发批计算应用

Flink支持DataSet API 用于处理批量数据,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)完成数据集转换操作,然后通过sink进行存储,既可以写入HDFS这种分布式文件系统,也可以打印控制台。

在IDEA中,打开BatchJob.java文件,编写如下代码,实现批量计算逻辑。

package com.rickie.tutorial;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

/**

* Skeleton for a Flink Batch Job.

*

* <p>For a tutorial how to write a Flink batch application, check the

* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.

*

* <p>To package your application into a JAR file for execution,

* change the main class in the POM.xml file to this class (simply search for 'mainClass')

* and run 'mvn clean package' on the command line.

*/

public class BatchJob {

public static void main(String[] args) throws Exception {

// set up the batch execution environment

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 通过字符串构建数据集

DataSet<String> text = env.fromElements(

"I am Rickie ",

"Hello Rickie",

"Good morning ... Rickie"

);

// 分割字符串,按照key进行分组,统计相同的key个数

DataSet<Tuple2<String, Integer>> wordCount = text

.flatMap(new LineSplitter())

.groupBy(0)

.sum(1);

// Print

wordCount.print();

// execute program

// env.execute("Flink Batch Java API Skeleton");

}

// 分割字符串的方法

public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override

public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {

for (String word : line.split(" ")) {

out.collect(new Tuple2<String, Integer>(word, 1));

}

}

}

}

在IDEA中运行,查看输出的单词聚合结果。

Apache Flink-基于Java项目模板创建Flink应用(流计算和批计算)

相关推荐