kafka开发实例
1.启动kafka。
//启动zookeeper server (用&是为了能退出命令行):
bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动kafka server:
bin/kafka-server-start.sh config/server.properties &
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-annotation</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>zkclient</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies>
2.新建一个生产者例子
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "10.103.22.47:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.103.22.47:9092"); props.put("request.required.acks", "1"); //props.put("partitioner.class", "com.xq.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String ip = "192.168.2.3"; String msg ="this is a messageuuu!"; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg); producer.send(data); producer.close(); } }
3.新建一个消费者例子
import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zookeeper.connect", "10.103.22.47:2181"); props.put("zookeeper.connectiontimeout.ms", "1000000"); props.put("group.id", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put("test", 2); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test"); ExecutorService threadPool = Executors.newFixedThreadPool(2); for (final KafkaStream<byte[], byte[]> stream : streams) { threadPool.submit(new Runnable() { public void run() { for (MessageAndMetadata msgAndMetadata : stream) { // process message (msgAndMetadata.message()) System.out.println("topic: " + msgAndMetadata.topic()); Message message = (Message) msgAndMetadata.message(); ByteBuffer buffer = message.payload(); byte[] bytes = new byte[message.payloadSize()]; buffer.get(bytes); String tmp = new String(bytes); System.out.println("message content: " + tmp); } } }); } } }
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29