Apache Storm v2.0入门项目的开发、测试和运行(IDEA/Maven)
第一个Apache Storm v2.0流计算入门项目的开发、测试和运行(IDEA/Maven)
关于流计算框架Apache Storm最新版的安装,可以参考:
流计算框架-最新版Apache Storm v2.0单机模式安装详细步骤
流计算框架Apache Storm核心概念、架构设计
一、基于IDEA/Maven创建一个Storm应用
应用名称:firststorm
二、添加storm-client的Maven jar包依赖
storm-client 依赖包信息,添加到项目的pom.xml文件中。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>2.0.0</version>
</dependency>
maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包可以点击下拉查看,并且会自动添加到项目classpath中,作为编译使用,等jar包全部下载完毕,现在开始编写具体的计算逻辑了,在这个项目中我们把所有的类都建立在包com.rickie.bigdata.firststorm下。
storm提供了两种运行模式:本地模式(Local Mode)和分布式模式。本地模式针对开发调试storm topologies非常有用。
因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,如何在本地开发、调试storm程序呢?你可以参考本文提供的解决方案。
如下来自Storm 官方文档:
http://storm.apache.org/releases/2.0.0-SNAPSHOT/Local-mode.html
Local Mode
Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies on a cluster.
To run a topology in local mode you have two options. The most common option is to run your topology with storm local instead of storm jar.
This will bring up a local simulated cluster and force all interactions with nimbus to go through the simulated cluster instead of going to a separate process.
If you want to do some automated testing but without actually launching a storm cluster you can use the same classes internally that storm local does.
To do this you first need to pull in the dependencies needed to access these classes. For the java API you should depend on storm-server as a test dependency.
To create an in-process cluster, simply use the LocalCluster class.
如上文所述,使用本地模式(Local Mode),需要先引入storm-server 依赖包。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
引入的storm-server 依赖包。
在本地模式上运行topology类似在一个集群上运行topology。
创建一个本地集群,大致代码如下所示:
- import org.apache.storm.LocalCluster;
- LocalCluster cluster = new LocalCluster();
- 提交集群使用submitTopology,
- 杀死集群使用killTopology
- 关闭一个本地集群使用cluster.shutdown();
完整代码可以参考下面。
三、Storm 应用的代码逻辑开发
(1)首先建立RandomSpout类作为数据源,并且继承于父类BaseRichSpout,确定后可以看到系统自动补全3个方法:nextTuple,open和declareOutputFields。
我们现在就需要重写这3个方法,open方法是数据源的初始化,nextTuple的作用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面我们手动分配一个数组,并且随机取里面的元素,代码如下:
package com.rickie.bigdata;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class RandomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static String[] words = {"Rickie", "Hadoop", "MapReduce", "Storm", "Spark", "Spark Streaming", "Flink"};
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
String word = words[new Random().nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomString"));
}
}
(2)然后新建一个类SenqueceBolt,继承于BaseBasicBolt类,并且重写方法execute和declareOutputFields。
这个类就是用于执行具体的作业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码如下:
package com.rickie.bigdata;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class SequenceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValue(0);
String out = "Hello " + word +"!";
System.out.println(out);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
(3)最后建立一个类FirstStorm。
这个类是主类,在main方法中定义Topology,并且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码如下:
package com.rickie.bigdata;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
public class FirstStorm
{
public static void main( String[] args ) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSpout());
builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
String name = "firststorm";
if(args != null && args.length >0){
name = args[0];
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(name, conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
try(LocalCluster cluster = new LocalCluster()) {
cluster.submitTopology("firststorm", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("firststorm");
cluster.shutdown();
}
}
System.out.println( "Well done!" );
}
}
四、运行调试Storm应用
可以用本地模式运行,在IDEA中直接运行即可,方便开发调试。
在Console可以看到如下输出信息:
接下来我们将这个项目放到Storm服务器集群中运行。
storm jar 命令用于启动一个Topology。
(1)以本地模式(Local Mode)运行
storm local firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm
运行结果和之前IDEA本地模式运行输出类似。
(2)以分布式模式运行
storm jar firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm RickieStorm
可以查看worker日志,看到SequenceBolt 线程输出的信息。只有kill 这个topology,日志输出信息才会终止。
在Storm架构中,Topology代表的并不是确定的作业,而是持续的计算过程,在确定的业务逻辑处理框架下,输入数据源源不断地进入系统,经过流式处理后以较低的延迟产生输出。如果不主动结束这个Topology或者关闭Storm集群,那么数据处理的过程就会持续地进行下去。
另外需要注意的是:由于在分布式模式下运行,worker工作在独立的进程中,因此无法直接在storm jar命令行输出窗口,看到上述SequenceBolt组件的输出信息。
(3)storm list 查看正在运行的topologies和它们的状态
storm list
也可以通过访问http://192.168.56.103:8080/ storm server,查看并操作正在运行的 topology。
(4)kill正在运行的topology
storm kill [storm name]
运行“storm kill”这个命令,仅仅只是调用Nimbus的Thirft接口去kill掉相对应的Topology。
Nimbus接受到kill命令,会将”kill”事务应用到topology上,修改Topology的状态为”killed”以及将“remove”事件列入到未来几秒钟的计划中,即未来几秒后会触发remove时间。这里的kill实际上停止相关的Worker。
默认kill的等待时间是Topology消息的超时时间,但是可以通过storm kill命令中的-w标志对其进行重写,设置了以上参数之后,topology会在指定的等待时间停止运行。这样给了Topology一个机会在shutdown workers之后完成当前没有处理完成的任务;删除Topology以及清理zookeeper中的分配信息和静态信息;清理存储在本地的心跳dir和jar/configs。
现在,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的流计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差不多的。恭喜你,现在算是步入Storm流式计算的殿堂的大门了。
五、Storm架构是如何解决Hadoop架构瓶颈的?
- Storm的Topology只需初始化一次。在将Topology提交到Storm集群的时候,集群会针对该Topology做一次初始化的工作。此后,在Topology运行过程中,对于输入数据而言,是没有计算框架初始化耗时的,有效避免了计算框架初始化的时间损耗。
- Storm使用Netty作为底层的消息队列来传递消息,保证消息能够得到快速的处理。
- 同时Storm采用内存计算模式,无需借助文件存储,直接通过网络直传中间计算结果,避免了组件之间传输数据的大量时间损耗。
六、Apache Storm中的核心概念
- Topology:一个实时计算任务被称作为Topology,包含Spout和Bolt。
- Tuple:数据模型,代表处理单元,可以包含多个Field,K/V的Map。
- Worker:一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程worker来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker,setBolt 的最后一个参数是你想为bolts的并行量。
- Spouts
- 消息源Spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的,如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
- 消息源可以发射多条消息流stream。使用OutputFieldsDeclarer。declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。代码上是这样的:collector.emit(new Values(str));
- Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
- Bolts
- 所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。
- Bolts可以简单的做消息流的传递(来一个元组,调用一次execute)。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量,第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
- Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
- Bolts的主要方法是execute,它以一个tuple作为输入,bolts使用OutputCollector来发射tuple(spout使用SpoutOutputCollector来发射指定的stream),bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。