storm
hdfstopology:
packagecom.ssc.arqe.hadoop.topology;
importorg.apache.storm.hdfs.bolt.HdfsBolt;
importorg.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
importorg.apache.storm.hdfs.bolt.format.FileNameFormat;
importorg.apache.storm.hdfs.bolt.format.HistoricalJobDataFormat;
importorg.apache.storm.hdfs.bolt.format.NDSJobRecordFormat;
importorg.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
importorg.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
importorg.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
importorg.apache.storm.hdfs.bolt.sync.SyncPolicy;
importstorm.kafka.BrokerHosts;
importstorm.kafka.KafkaSpout;
importstorm.kafka.SpoutConfig;
importstorm.kafka.StringScheme;
importstorm.kafka.ZkHosts;
importbacktype.storm.Config;
importbacktype.storm.StormSubmitter;
importbacktype.storm.generated.AlreadyAliveException;
importbacktype.storm.generated.InvalidTopologyException;
importbacktype.storm.spout.SchemeAsMultiScheme;
importbacktype.storm.topology.TopologyBuilder;
importcom.ssc.arqe.hadoop.bolt.JobStatusBolt;
publicclassNDSStatusTopology{
publicstaticvoidmain(String[]args)throwsAlreadyAliveException,
InvalidTopologyException{
//TODOAuto-generatedmethodstub
Stringhadoop_env=args[0];//hadoopenvironment,like
System.out.println("hadoop_env:"+hadoop_env);
Stringpath=args[1];
System.out.println("path:"+path);
Stringtopic=args[2];
System.out.println("kafka_topic:"+topic);
StringzkHost=args[3];
System.out.println("zkHost_port:"+zkHost);
Stringkafka_id=args[4];
System.out.println("consumergroupID:"+kafka_id);//consumergroupid
//syncthefilesystemafterevery1ktuples
SyncPolicysyncPolicy=newCountSyncPolicy(2);
//rotatefileswhentheyreach5MB
FileRotationPolicyrotationPolicy=newTimedRotationPolicy(1.0f,
TimedRotationPolicy.TimeUnit.DAYS);
FileNameFormatfileNameFormat=newDefaultFileNameFormat().withPath(
path).withExtension(".txt");
//JobStatusHandlerhandler=newJobStatusHandler(hadoop_env,path);
BrokerHostsbrokerHosts=newZkHosts(zkHost);
SpoutConfigkafkaConfig=newSpoutConfig(brokerHosts,topic,"",
kafka_id);
kafkaConfig.scheme=newSchemeAsMultiScheme(newStringScheme());
HdfsBoltbolt=newHdfsBolt().withFsUrl(args[0])
.withRecordFormat(newHistoricalJobDataFormat())
.withFileNameFormat(fileNameFormat)
.withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("GetNDSJobStatusMessage",newKafkaSpout(kafkaConfig),
2);
builder.setBolt("SendNDSJobStatusMessage",bolt,2).shuffleGrouping(
"GetNDSJobStatusMessage");
Configconf=newConfig();
conf.setDebug(true);
conf.setNumWorkers(2);//setworkerstorunningtopology
StormSubmitter.submitTopologyWithProgressBar("NDSJobStatusTopology",
conf,builder.createTopology());
}
}