Kafka代码API

1.建立工程,导入相应的jar包

Procuder类

package cn.itcast.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 //要读取的数据主题
 private static final String topic = "kfc";
 //消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //消费组的编号
  props.put("group.id", "1111");
  //偏移量,从哪个位置读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //根据map获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

consumer--消费者类

package cn.itcast.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 //要读取的数据主题
 private static final String topic = "kfc";
 //消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //消费组的编号
  props.put("group.id", "1111");
  //偏移量,从哪个位置读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //根据map获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

相关推荐