Apache Kafka 代码实例

前提

  • 已经配置好kafka。若未安装,可以参照【Apache Kafka 安装升级指南 http://www.linuxidc.com/Linux/2013-11/92753.htm 】 
  • 已在eclipse里面安装scala插件。Eclipse Kepler中在Help->Eclipse Markectplace中搜索Scalar,然后安装即可。
  • 使用maven构建kafka测试project在eclipse中。
  • 创建topic:在kafka的安装目录下执行bin/kafka-create-topic.sh --zookeeper 192.168.20.99:2181 --replica 1 --partition 1 --topic test
  • 启动consumer:在kafka的安装目录下执行bin/kafka-console-consumer.sh --zookeeper 192.168.20.99:2181 --topic test --from-beginning

pom.xml文件如下

所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.iflytek.cpcloud.kafka</groupId>
  <artifactId>kafkatest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafkatest</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.14</version>
  </dependency>
  <dependency>
   <groupId>com.sksamuel.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <version>0.8.0-beta1</version>
  </dependency>
 </dependencies>
</project>

然后写一个kafka producer的测试程序如下:

package com.iflytek.cpcloud.kafka.kafkatest;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Test the Kafka Producer
 * @author jcsong2
 *
 */
public class ProducerTest {
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("zk.connect", "192.168.20.99:2181");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("metadata.broker.list", "192.168.20.99:9092");
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  for (int i = 0; i < 10; i++)
   producer.send(new KeyedMessage<String, String>("test", "test" + i));
 }
}

在consuemr端可以看到test0到test9十行输出。

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

相关阅读

相关推荐