Event Stream System(1)Basic Setup

EventStreamSystem(1)BasicSetup

Installzookeeper3.4.10latestversion

>wgethttp://apache.claz.org/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

>tarzxvfzookeeper-3.4.10.tar.gz

>sudoln-s~/tool/zookeeper-3.4.10/opt/zookeeper-3.4.10

>sudoln-s/opt/zookeeper-3.4.10/opt/zookeeper

AddtoPath

>cpconf/zoo_sample.cfgconf/zoo.cfg

StarttheService

>zkServer.shstartconf/zoo.cfg

ConnecttotheServer

>zkCli.sh-serverlocalhost:2181

InstalllateststableKafkaversion2.10-10.20

>wgethttp://mirror.olnevhost.net/pub/apache/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz

>tarzxvfkafka_2.10-0.10.2.0.tgz

AddtoPath

StarttheService

>kafka-server-start.shconfig/server.properties

Orstartinthebackground

>nohupkafka-server-start.shconfig/server.properties&

Createatopictest

>bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest

Listallthetopic

>bin/kafka-topics.sh--list--zookeeperlocalhost:2181

Producesomemessages

>bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest

Consumethemessages

>bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning

Sparkstream

IusethelatestpackageandItrytousethesample

<?xmlversion="1.0"encoding="UTF-8"?>

<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.sillycat</groupId>

<artifactId>sillycat-eventstream</artifactId>

<version>1.0</version>

<description>sillycatsparkstream</description>

<name>eventstreamspark</name>

<packaging>jar</packaging>

<properties>

<springframework.version>4.3.7.RELEASE</springframework.version>

<logging.version>1.7.25</logging.version>

</properties>

<dependencies>

<!--spark-->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.11</artifactId>

<version>2.1.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.11</artifactId>

<version>2.1.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>

<version>2.1.0</version>

</dependency>

<!--cache-->

<dependency>

<groupId>com.google.guava</groupId>

<artifactId>guava</artifactId>

<version>21.0</version>

</dependency>

<!--logging-->

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

<version>${logging.version}</version>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>jcl-over-slf4j</artifactId>

<version>${logging.version}</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.2.3</version>

</dependency>

<!--springframework-->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>${springframework.version}</version>

<exclusions>

<exclusion>

<groupId>commons-logging</groupId>

<artifactId>commons-logging</artifactId>

</exclusion>

</exclusions>

</dependency>

<!--apache-->

<dependency>

<groupId>org.apache.commons</groupId>

<artifactId>commons-lang3</artifactId>

<version>3.5</version>

</dependency>

<dependency>

<groupId>commons-io</groupId>

<artifactId>commons-io</artifactId>

<version>2.5</version>

</dependency>

<!--testing-->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-test</artifactId>

<version>${springframework.version}</version>

<scope>test</scope>

<exclusions>

<exclusion>

<groupId>commons-logging</groupId>

<artifactId>commons-logging</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.12</version>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.6.1</version>

<configuration>

<source>1.8</source>

<target>1.8</target>

</configuration>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<version>2.4.1</version>

<configuration>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

<archive>

<manifest>

<mainClass>com.sillycat.sillycateventstream.ExecutorApp</mainClass>

</manifest>

</archive>

</configuration>

<executions>

<execution>

<id>assemble-all</id>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-shade-plugin</artifactId>

<version>3.0.0</version>

<executions>

<execution>

<goals>

<goal>shade</goal>

</goals>

<configuration>

<transformers>

<transformer

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

<resource>META-INF/spring.handlers</resource>

</transformer>

<transformer

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

<resource>META-INF/spring.schemas</resource>

</transformer>

</transformers>

</configuration>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

SampleCodesfromWordCount

packagecom.sillycat.sillycateventstream.apps;

importjava.io.Serializable;

importjava.util.Arrays;

importjava.util.HashMap;

importjava.util.Map;

importjava.util.regex.Pattern;

importorg.apache.spark.SparkConf;

importorg.apache.spark.streaming.Duration;

importorg.apache.spark.streaming.api.java.JavaDStream;

importorg.apache.spark.streaming.api.java.JavaPairDStream;

importorg.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;

importorg.apache.spark.streaming.api.java.JavaStreamingContext;

importorg.apache.spark.streaming.kafka.KafkaUtils;

importscala.Tuple2;

/**

*ConsumesmessagesfromoneormoretopicsinKafkaanddoeswordcount.

*

*Usage:JavaKafkaWordCount<zkQuorum><group><topics><numThreads><zkQuorum>

*isalistofoneormorezookeeperserversthatmakequorum<group>isthe

*nameofkafkaconsumergroup<topics>isalistofoneormorekafkatopics

*toconsumefrom<numThreads>isthenumberofthreadsthekafkaconsumer

*shoulduse

*

*Torunthisexample:`$bin/run-example

*org.apache.spark.examples.streaming.JavaKafkaWordCountzoo01,zoo02,\zoo03

*my-consumer-grouptopic1,topic21`

*/

publicclassJavaKafkaWordCountimplementsSerializable{

privatestaticfinallongserialVersionUID=-4598672873749563084L;

privatestaticfinalPatternSPACE=Pattern.compile("");

privateJavaKafkaWordCount(){

}

publicstaticvoidmain(String[]args)throwsException{

SparkConfsparkConf=newSparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");

//Createthecontextwith2secondsbatchsize

JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,newDuration(10*1000));

intnumThreads=1;

Map<String,Integer>topicMap=newHashMap<>();

topicMap.put("test",numThreads);

JavaPairReceiverInputDStream<String,String>messages=KafkaUtils.createStream(jssc,"fr-stage-consumer:2181",

"spark-group",topicMap);

JavaDStream<String>lines=messages.map(x->{

Stringvalue=x._2;

returnvalue;

});

JavaDStream<String>words=lines.flatMap(x->Arrays.asList(SPACE.split(x)).iterator());

JavaPairDStream<String,Integer>wordCounts=words.mapToPair(s->newTuple2<>(s,1)).reduceByKey((i1,i2)->i1+i2);

wordCounts.print();

jssc.start();

jssc.awaitTermination();

}

}

Cassandra

>wgethttp://www.gtlib.gatech.edu/pub/apache/cassandra/3.10/apache-cassandra-3.10-bin.tar.gz

PlaceandaddtoPATH

Commandtostart

>cassandra-Dcassandra.config="file:///opt/cassandra/conf/cassandra.yaml"

References:

http://docs.datastax.com/en/archived/cassandra/2.0/cassandra/tools/toolsCUtility_t.html

相关推荐