Kafka(2)Install ubuntu and Try more JAVA client

Kafka(2)InstallubuntuandTrymoreJAVAclient

1.Trytosetupthisonwindows.

downloadandinstallthisfile

http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi

Unzipthekafkatoworkingdirectory:

D:\tool\kafka-0.7.0

>sbtupdate

>sbtpackage

sbtisinstalledonwindows,butstill,itishardtoinstallkafkaonwindows

2.Trytosetuponubuntu12.04

>wgethttp://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

>tarzxvfkafka-0.7.0-incubating-src.tar.gz

>mvkafka-0.7.0-incubating-src/opt/tools/kafka-0.7.0

>cd/opt/tools/kafka-0.7.0

>./sbtupdate

>./sbtpackage

starttheserver

>bin/zookeeper-server-start.shconfig/zookeeper.properties

>bin/kafka-server-start.shconfig/server.properties

3.FixtheJavaClientProblem

ErrorMessage:

[2012-06-1117:55:00,109]WARNExceptioncausingcloseofsession0x137daf68ab70001duetojava.io.IOException:Connectionresetbypeer(org.apache.zookeeper.server.NIOServerCnxn)

[2012-06-1117:55:00,110]INFOClosedsocketconnectionforclient/192.168.56.1:62003whichhadsessionid0x137daf68ab70001(org.apache.zookeeper.server.NIOServerCnxn)

Solution:

server.properties

#Hostnamethebrokerwilladvertisetoconsumers.Ifnotset,kafkawillusethevaluereturned

#fromInetAddress.getLocalHost().IftherearemultipleinterfacesgetLocalHost

#maynotbewhatyouwant.

hostname=x.x.x.x

#zk.connect=localhost:2181

zk.connect=x.x.x.x:2181

#Timeoutinmsforconnectingtozookeeper

zk.connectiontimeout.ms=1000000

zk.sessiontimeout.ms=60000

zookeeper.properties

dataDir=/tmp/zookeeper

#theportatwhichtheclientswillconnect

clientPort=2181

#disabletheper-iplimitonthenumberofconnectionssincethisisanon-productionconfig

maxClientCnxns=0

tickTime=8000

Weneedtouserealipaddresshereinconfiguration.

TheJavaClientsamplecodesareunderthisdirectory:D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples

Theclasswillbeasfollow:

packagecom.sillycat.magicneptune.example;

importjava.util.Properties;

importkafka.javaapi.producer.Producer;

importkafka.javaapi.producer.ProducerData;

importkafka.producer.ProducerConfig;

publicclassTestProducerMain{

publicstaticvoidmain(String[]args){

Propertiesprops2=newProperties();

props2.put("zk.connect","192.168.56.101:2181");

props2.put("serializer.class","kafka.serializer.StringEncoder");

//Thisisaddedbymyselfforchangingthedefaulttimeout6000.

props2.put("zk.connectiontimeout.ms","15000");

ProducerConfigconfig=newProducerConfig(props2);

Producer<String,String>producer=newProducer<String,String>(config);

//ThemessageissenttoarandomlyselectedpartitionregisteredinZK

ProducerData<String,String>data=newProducerData<String,String>(

"test","test-message,itisoknow.adsfasdf1111222");

producer.send(data);

producer.close();

}

}

packagecom.sillycat.magicneptune.example;

importjava.net.InetAddress;

importjava.net.UnknownHostException;

importkafka.api.FetchRequest;

importkafka.javaapi.consumer.SimpleConsumer;

importkafka.javaapi.message.ByteBufferMessageSet;

importkafka.message.MessageAndOffset;

publicclassTestConsumerMain{

publicstaticvoidmain(String[]args){

try{

System.out.println(InetAddress.getLocalHost().getHostAddress());

}catch(UnknownHostExceptione){

e.printStackTrace();

}

SimpleConsumerconsumer=newSimpleConsumer("192.168.56.101",9092,10000,

1024000);

longoffset=0;

while(true){

//createafetchrequestfortopictest,partition0,current

//offset,andfetchsizeof1MB

FetchRequestfetchRequest=newFetchRequest("test",0,offset,

1000000);

//getthemessagesetfromtheconsumerandprintthemout

ByteBufferMessageSetmessages=consumer.fetch(fetchRequest);

for(MessageAndOffsetmsg:messages){

System.out.println(ExampleUtils.getMessage(msg.message())+"offset="+offset);

//advancetheoffsetafterconsumingeachmessage

offset=msg.offset();

}

}

//consumer.close();

}

}

packagecom.sillycat.magicneptune.example;

importjava.nio.ByteBuffer;

importkafka.message.Message;

publicclassExampleUtils

{

publicstaticStringgetMessage(Messagemessage)

{

ByteBufferbuffer=message.payload();

byte[]bytes=newbyte[buffer.remaining()];

buffer.get(bytes);

returnnewString(bytes);

}

}

references:

http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/

https://github.com/harrah/xsbt/wiki/Getting-Started-Setup

http://incubator.apache.org/kafka/faq.html

http://incubator.apache.org/kafka/quickstart.html

http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html

http://incubator.apache.org/kafka/faq.html

相关推荐