Flink : Standalone Cluster
Standalone Cluster 就是独立的 Flink 集群,相对应的有基于 YARN 的 Flink 集群
要求
- Java 1.8 和 JAVA_HOME 环境变量
- 不同机器之间支持 SSH 免密码登陆
- 不同机器都有相同的 Flink 目录结构
下载
Flink 包 (https://flink.apache.org/downloads.html)
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz tar xzf flink-1.10.0-bin-scala_2.12.tgz cd flink-1.10.0
如果需要和 Hadoop 集成
在 Flink 1.8 之前,需要下载带有 Hadoop 的 Flink 包
从 Flink 1.8 开始,Hadoop 的包被单独分离出来
cd flink-1.10.0/lib wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
(官网那个 download 页面也写的不很清楚)
配置
conf/flink-conf.yaml
:$ cat conf/flink-conf.yaml | grep -v "^#" | grep -v "^$" jobmanager.rpc.address: 192.168.1.1 # jobmanager ip jobmanager.rpc.port: 6123 # jobmanager port,用于提交 job jobmanager.heap.size: 1024m # jobmanager 内存 taskmanager.memory.process.size: 1568m # task manager 内存 taskmanager.numberOfTaskSlots: 1 # 每个 task manager 有多少个 slot parallelism.default: 1 # flink run 命令不指定并行度时,默认使用 1 jobmanager.execution.failover-strategy: region # 故障重启策略 # region 代表只重启 ExecutionGraph 中对应的 Region 的 Task # full 代表重启 Job 中所有的 Task,即重置整个 ExecutionGraph
这是默认的配置项,还有很多其他选项
conf/masters 配置 Job Manager 的 IP 和 Web UI 的端口
192.168.1.1:8081
conf/slaves 配置 Task Manager 的 IP
192.168.1.2 192.168.1.3
把配置 copy 到所有节点
启动
bin/start-cluster.sh
这个脚本到 conf/masters 配置的节点上调用下面的命令启动 Job Manager
bin/jobmanager.sh
再到 conf/slaves 配置的节点上调用下面的命令启动 Task Manager
bin/taskmanager.sh
在 master 节点上可以看到启动了下面的 Java 程序
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
在 slave 节点上可以看到启动了下面的 Java 程序
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
可以登录 master 的 localhost:8081 通过 Web UI 查看
可以看到有 3 个 Task Manager,由于每个 Task Manager 只配了一个 slot,总共是 3 个 Task Slot
提交 Job
提交 examples 目录下的例子程序
bin/flink run examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
会一直等待直到程序完成,输出日志如下
Job has been submitted with JobID f038a66d9b9d6c9e9b80b866dde2dacf Program execution finished Job with JobID f038a66d9b9d6c9e9b80b866dde2dacf has finished. Job Runtime: 4508 ms
可以通过 -d (detach) 参数提交后就退出,不用等待程序结束
bin/flink run -d examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
输出日志如下
Job has been submitted with JobID 161410edb6b9a28ca69e84e5fe0885c3
可以到 Web UI 的 Completed Jobs 下查看
bin/flink 除了 run 还有其他命令,简单介绍如下
Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> Action "info" shows the optimized execution plan of the program (JSON). Syntax: info [OPTIONS] <jar-file> <arguments> Action "list" lists running and scheduled programs. Syntax: list [OPTIONS] Action "stop" stops a running program with a savepoint (streaming jobs only). Syntax: stop [OPTIONS] <Job ID> Action "cancel" cancels a running program. Syntax: cancel [OPTIONS] <Job ID> Action "savepoint" triggers savepoints for a running job or disposes existing ones. Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
bin/flink run 的部分参数简单介绍
"run" action options: -c,--class <classname> Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -py,--python <pythonFile> Python script with the program entry point. The dependent resources can be configured with the `--pyFiles` option. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. Options for executor mode: -D <property=value> Generic configuration options for execution/deployment and for the configured executor. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". Options for default mode: -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
run 命令支持提交 python 程序,这里没列出来
添加 JobManager/TaskManager 到集群
在要添加的 master 节点执行
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
在要添加的 slave 节点执行
bin/taskmanager.sh start|start-foreground|stop|stop-all
不需要停止服务
停止集群
stop-cluster.sh