Flink入门学习[0]
依赖环境
请确认以下环境已经正确安装
- Java 8.x
- Maven 3.0.4 (or higher)
验证java
$ java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
验证maven
$ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T02:41:47+08:00) Maven home: /Users/dushixiang/Library/apache-maven-3.6.0 Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre Default locale: zh_CN, platform encoding: UTF-8 OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac"
下载安装Flink
可以看到这里有很多版本,由于这次只是本地开发,所以选择不带hadoop版本就可以,如果不打算使用scala开发的话,scala版本也无所谓。这里我下载了 flink-1.7.0-bin-scala_2.12.tgz,因为后面使用Java写Flink流处理的时候有点啰嗦,我会同时写java/scala两个版本的代码进行对比。
解压文件
tar -xvf flink-1.7.0-bin-scala_2.11.tgz
本地模式启动
cd flink-1.7.0 bin/start-cluster.sh
验证是否成功启动
使用浏览器访问 localhost:8081 可以看到flink的UI界面
第一个Flink程序
使用shell命令创建一个quickstart程序
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.7.0
使用maven命令创建一个quickstart程序
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.0
成功之后查看目录结构
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties
BatchJob.java 和StreamJob.java 都只是空壳,不必理会,修改pom中scala版本为2.12。
编写WordCount程序
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.stream.Stream; public class WordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.fromElements( "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.", "Here, we explain important aspects of Flink’s architecture." ); // 第一种 创建静态内部类Tokenizer类继承FlatMapFunction类并实现flatMap方法 SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(0) // 使用元组的第一个元素当作key做分组 .sum(1); // 使用元组的第二个元素统计该单词一共出现了多少次 // 第二种 创建匿名内部类FlatMapFunction类并实现flatMap方法 /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); } }) .keyBy(0) .sum(1);*/ // 使用lambda表达式,此种方法会报错。原因是lambda方法不能提供足够的信息来进行自动类型提取,官方建议使用上面两种方法。 /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, out) -> { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); }) .keyBy(0) .sum(1);*/ counts.print(); env.execute("第一个FLink WordCount程序"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); } } }
由于是本地模式,直接在IDE中编译运行即可。
接下来是scala版本的WordCount,首先在pom中添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
复制如下scala代码。
package org.myorg.quickstart.scala import org.apache.flink.streaming.api.scala._ object WordCountScala { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.fromElements( "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.", "Here, we explain important aspects of Flink’s architecture." ) val counts = text .flatMap(_.toLowerCase.split("\\W+")) // 将多行输入分割之后压扁成一个列表 .filter(_.nonEmpty) // 过滤掉空字符串 .map((_, 1)) // 将字符转换为元组,字符作为元组的第一个参数,数字1作为元组的第二个参数 .keyBy(0) // 使用元组的第一个元素当作key做分组 .sum(1) // 使用元组的第二个元素统计该单词一共出现了多少次 counts.print() env.execute("第一个FLink WordCount程序Scala版") } }
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11