kafka-storm 详细
kafka storm 安装:
15.安装kafka cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz tar xf kafka_2.10-0.10.0.0.tgz ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka /usr/local/zookeeper/bin/zkCli.sh create /kafka '' vim /usr/local/kafka/config/server.properties broker.id=0 zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka scp -r /usr/local/kafka_2.10-0.10.0.0.tgz [email protected]:/usr/local/ scp -r /usr/local/kafka_2.10-0.10.0.0.tgz [email protected]:/usr/local/ scp -r /usr/local/kafka/config/server.properties [email protected]:/usr/local/kafka/config/server.properties scp -r /usr/local/kafka/config/server.properties [email protected]:/usr/local/kafka/config/server.properties master slave 启动 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 创建topic /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
16. storm 安装 cd /usr/local/ wget http://mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz tar xf apache-storm-0.10.0.tar.gz ln -s /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R root:root /usr/local/apache-storm-0.10.0 /usr/local/storm mkdir -p /tmp/storm/data/ cd storm vim conf/storm.yaml storm.zookeeper.servers: - "dev10.aoiplus.openpf" - "dev05.aoiplus.openpf" - "dev06.aoiplus.openpf" storm.zookeeper.port: 2181 nimbus.host: "dev10.aoiplus.openpf" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/tmp/storm/data" scp -r /usr/local/storm/conf/storm.yaml [email protected]:/usr/local/storm/conf/ scp -r /usr/local/storm/conf/storm.yaml [email protected]:/usr/local/storm/conf/ 启动 master /usr/local/storm/bin/storm nimbus >/dev/null 2>&1 & /usr/local/storm/bin/storm ui >/dev/null 2>&1 & slaves /usr/local/storm/bin/storm supervisor >/dev/null 2>&1 & 查看 http://dev10.aoiplus.openpf/index.html cp /usr/local/kafka/libs/kafka_2.10-0.10.0.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/scala-library-2.10.6.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/snappy-java-1.1.2.4.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/zkclient-0.8.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/slf4j-api-1.7.21.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/ /usr/local/storm/bin/storm jar /home/baoy/soft/storm/KafkaStormJavaDemo_main_start.jar com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology "terminalInfosAnalysisTopology" mkdir -p /home/baoy/soft/storm/logs chmod -R 777 /home/baoy/soft/storm/logs cd /usr/local/storm/log4j2/ vim cluster.xml <property name="logpath">/home/baoy/soft/storm</property> 关闭 storm /usr/local/storm/bin/storm kill terminalInfosAnalysisTopology
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.curiousby.baoy.cn</groupId> <artifactId>KafkaStormJavaDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SpringKafkaStormDemo</name> <url>http://maven.apache.org</url> <!-- properties constant --> <properties> <spring.version>4.2.5.RELEASE</spring.version> <java.version>1.7</java.version> </properties> <repositories> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <finalName>SpringKafkaStormDemo</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <dependencies> <dependency> <groupId>org.codehaus.plexus</groupId> <artifactId>plexus-compiler-javac</artifactId> <version>2.5</version> </dependency> </dependencies> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> <compilerArguments> <verbose /> <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_main_start</finalName> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
package com.curiousby.baoyou.cn.storm; import java.util.UUID; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; /** * @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology * @Type TerminalInfosAnalysisTopology.java * @Desc * @author cmcc-B100036 * @date 2016年12月15日 下午4:54:50 * @version */ public class TerminalInfosAnalysisTopology { private static String topicName = "baoy-topic"; private static String zkRoot = "" ; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts( "172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka"); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString()); spoutConfig.forceFromStart= false; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", kafkaSpout); builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),2).shuffleGrouping("kafkaSpout"); builder.setBolt("terminalInfosAnalysisElasticsearchBolt", new TerminalInfosAnalysisElasticsearchBolt(), 2).shuffleGrouping("kafkaSpout"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(2); try { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { conf.setMaxSpoutPending(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology()); } } }
public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt { private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class); private OutputCollector collector; @Override public void execute(Tuple tuple) { JSONObject formate = TerminalInfos.formate(tuple.getString(0)); TerminalInfos entity = new TerminalInfos(); entity.formate(formate); if (entity != null) { System.out.println(entity); logger.info("==========================================================="); logger.info("========================="+entity+"========================="); logger.info("==========================================================="); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
使用 kafka 客户端 定时发送
kafka-storm 及时处理
这里面 我使用的是本地模式
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29