storm

hdfstopology:

packagecom.ssc.arqe.hadoop.topology;

importorg.apache.storm.hdfs.bolt.HdfsBolt;

importorg.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;

importorg.apache.storm.hdfs.bolt.format.FileNameFormat;

importorg.apache.storm.hdfs.bolt.format.HistoricalJobDataFormat;

importorg.apache.storm.hdfs.bolt.format.NDSJobRecordFormat;

importorg.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;

importorg.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;

importorg.apache.storm.hdfs.bolt.sync.CountSyncPolicy;

importorg.apache.storm.hdfs.bolt.sync.SyncPolicy;

importstorm.kafka.BrokerHosts;

importstorm.kafka.KafkaSpout;

importstorm.kafka.SpoutConfig;

importstorm.kafka.StringScheme;

importstorm.kafka.ZkHosts;

importbacktype.storm.Config;

importbacktype.storm.StormSubmitter;

importbacktype.storm.generated.AlreadyAliveException;

importbacktype.storm.generated.InvalidTopologyException;

importbacktype.storm.spout.SchemeAsMultiScheme;

importbacktype.storm.topology.TopologyBuilder;

importcom.ssc.arqe.hadoop.bolt.JobStatusBolt;

publicclassNDSStatusTopology{

publicstaticvoidmain(String[]args)throwsAlreadyAliveException,

InvalidTopologyException{

//TODOAuto-generatedmethodstub

Stringhadoop_env=args[0];//hadoopenvironment,like

System.out.println("hadoop_env:"+hadoop_env);

Stringpath=args[1];

System.out.println("path:"+path);

Stringtopic=args[2];

System.out.println("kafka_topic:"+topic);

StringzkHost=args[3];

System.out.println("zkHost_port:"+zkHost);

Stringkafka_id=args[4];

System.out.println("consumergroupID:"+kafka_id);//consumergroupid

//syncthefilesystemafterevery1ktuples

SyncPolicysyncPolicy=newCountSyncPolicy(2);

//rotatefileswhentheyreach5MB

FileRotationPolicyrotationPolicy=newTimedRotationPolicy(1.0f,

TimedRotationPolicy.TimeUnit.DAYS);

FileNameFormatfileNameFormat=newDefaultFileNameFormat().withPath(

path).withExtension(".txt");

//JobStatusHandlerhandler=newJobStatusHandler(hadoop_env,path);

BrokerHostsbrokerHosts=newZkHosts(zkHost);

SpoutConfigkafkaConfig=newSpoutConfig(brokerHosts,topic,"",

kafka_id);

kafkaConfig.scheme=newSchemeAsMultiScheme(newStringScheme());

HdfsBoltbolt=newHdfsBolt().withFsUrl(args[0])

.withRecordFormat(newHistoricalJobDataFormat())

.withFileNameFormat(fileNameFormat)

.withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout("GetNDSJobStatusMessage",newKafkaSpout(kafkaConfig),

2);

builder.setBolt("SendNDSJobStatusMessage",bolt,2).shuffleGrouping(

"GetNDSJobStatusMessage");

Configconf=newConfig();

conf.setDebug(true);

conf.setNumWorkers(2);//setworkerstorunningtopology

StormSubmitter.submitTopologyWithProgressBar("NDSJobStatusTopology",

conf,builder.createTopology());

}

}

相关推荐