Kafka+Storm+HDFS
cd /myhome/usr/storm
bin/storm nimbus &
bin/storm supervisor &
bin/storm ui &
Kafka+Storm+HDFS整合实践
kafka_2.9.2-0.8.1.1.tgz
apache-storm-0.9.2-incubating.tar.gz
Kafka安装配置
我们使用3台机器搭建Kafka集群:
1
192.168.4.142 h1
2
192.168.4.143 h2
3
192.168.4.144 h3
在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:
1
cd /usr/local/
2
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
3
tar xvzf kafka_2.9.2-0.8.1.1.tgz
4
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
5
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
1
broker.id=0
2
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
1
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
1
cd /usr/local/zookeeper
2
bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:
1
create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:
1
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
2
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
1
cd /usr/local/
kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.130:2181 192.168.3.140:2181 192.168.3.142:2181
kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.3.130:2181 192.168.3.140:2181 192.168.3.142:2181
2
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
3
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
1
broker.id=1 # 在h1修改
2
3
broker.id=2 # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
1
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
1
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:
1
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
1
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
2
Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
3
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
4
Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1
5
Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
6
Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:
1
Partition: 分区
2
Leader : 负责读写指定分区的节点
3
Replicas : 复制该分区log的节点列表
4
Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
1
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
1
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
Storm安装配置
Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
1
192.168.4.142 h1
2
192.168.4.143 h2
3
192.168.4.144 h3
首先,在h1节点上,执行如下命令安装:
1
cd /usr/local/
2
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
3
tar xvzf apache-storm-0.9.2-incubating.tar.gz
4
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
5
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
然后,修改配置文件conf/storm.yaml,内容如下所示:
01
storm.zookeeper.servers:
02
- "h1"
03
- "h2"
04
- "h3"
05
storm.zookeeper.port: 2181
06
#
07
nimbus.host: "h1"
08
09
supervisor.slots.ports:
10
- 6700
11
- 6701
12
- 6702
13
- 6703
14
15
storm.local.dir: "/tmp/storm"
将配置好的安装文件,分发到其他节点上:
1
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
2
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
1
cd /usr/local/
2
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
3
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
1
bin/storm nimbus &
2
bin/storm supervisor &
为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
1
bin/storm ui &
这样可以通过访问http://h2:8080/来查看Topology的运行状况。
整合Kafka+Storm
消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,
然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。
实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,
例如我使用的Maven依赖配置,如下所示:
org.apache.stormstorm-core0.9.2-incubatingprovidedorg.apache.stormstorm-kafka0.9.2-incubatingorg.apache.kafkakafka_2.9.20.8.1.1org.apache.zookeeperzookeeperlog4jlog4j
下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,
然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class MyKafkaTopology {
public static class KafkaWordSplitter extends BaseRichBolt {
private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getString(0);
LOG.info("RECV[kafka -> splitter] " + line);
String[] words = line.split("\\s+");
for(String word : words) {
LOG.info("EMIT[splitter -> counter] " + word);
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static class WordCounter extends BaseRichBolt {
private static final Log LOG = LogFactory.getLog(WordCounter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map counterMap;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap();
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
LOG.info("RECV[splitter -> counter] " + word + " : " + count);
AtomicInteger ai = this.counterMap.get(word);
if(ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
LOG.info("CHECK statistics map: " + this.counterMap);
}
@Override
public void cleanup() {
LOG.info("The final result:");
Iterator> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()) {
Entry entry = iter.next();
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
Config conf = new Config();
String name = MyKafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}
上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,
在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
然后,就可以提交我们开发的Topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。
整合Storm+HDFS
Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class StormToHDFSTopology {
public static class EventSpout extends BaseRichSpout {
private static final Log LOG = LogFactory.getLog(EventSpout.class);
private static final long serialVersionUID = 886149197481637894L;
private SpoutOutputCollector collector;
private Random rand;
private String[] records;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
records = new String[] {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
};
}
@Override
public void nextTuple() {
Utils.sleep(1000);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
Date d = new Date(System.currentTimeMillis());
String minute = df.format(d);
String record = records[rand.nextInt(records.length)];
LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
collector.emit(new Values(minute, record));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "record"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new EventSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
Config conf = new Config();
String name = StormToHDFSTopology.class.getSimpleName();
if (args != null && args.length > 0) {
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
org.apache.maven.pluginsmaven-shade-plugin1.4truepackageshade
整合Kafka+Storm+HDFS
上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,
然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,
写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples;
import java.util.Arrays;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class DistributeWordTopology {
public static class KafkaWordToUpperCase extends BaseRichBolt {
private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -5207232012035109026L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("RECV[kafka -> splitter] " + line);
if(!line.isEmpty()) {
String upperLine = line.toUpperCase();
LOG.info("EMIT[splitter -> counter] " + upperLine);
collector.emit(input, new Values(upperLine, upperLine.length()));
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line", "len"));
}
}
public static class RealtimeBolt extends BaseRichBolt {
private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -4115132557403913367L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("REALTIME: " + line);
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
// Configure Kafka
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;
// Configure HDFS bolt
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// configure & build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");
// submit topology
Config conf = new Config();
String name = DistributeWordTopology.class.getSimpleName();
if (args != null && args.length > 0) {
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}
上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。