Apache Kafka 0.10.0.0稳定版发布及其新特性介绍

http://www.iteblog.com/archives/1677

ApacheKafka0.10.0.0稳定版发布及其新特性介绍

在Kafka0.9.0.0,开发者们在新consumer上使用poll()函数的时候是几乎无法控制返回消息的条数。不过值得高兴的是,此版本的Kafka引入了max.poll.records参数,允许开发者控制返回消息的条数。

NewConsumerAPI

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.10.0.0</version>

</dependency>

kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

poll

ConsumerRecords<K,V>poll(longtimeout)

SeeAlso:

KafkaConsumer.poll(long)

poll

publicConsumerRecords<K,V>poll(longtimeout)

Fetchdataforthetopicsorpartitionsspecifiedusingoneofthesubscribe/assignAPIs.Itisanerrortonothavesubscribedtoanytopicsorpartitionsbeforepollingfordata.

Oneachpoll,consumerwilltrytousethelastconsumedoffsetasthestartingoffsetandfetchsequentially.Thelastconsumedoffsetcanbemanuallysetthroughseek(TopicPartition,long)orautomaticallysetasthelastcommittedoffsetforthesubscribedlistofpartitions

Specifiedby:

pollininterfaceConsumer<K,V>

Parameters:

timeout-Thetime,inmilliseconds,spentwaitinginpollifdataisnotavailableinthebuffer.If0,returnsimmediatelywithanyrecordsthatareavailablecurrentlyinthebuffer,elsereturnsempty.Mustnotbenegative.

Returns:

mapoftopictorecordssincethelastfetchforthesubscribedlistoftopicsandpartitions

Throws:

InvalidOffsetException-iftheoffsetforapartitionorsetofpartitionsisundefinedoroutofrangeandnooffsetresetpolicyhasbeenconfigured

WakeupException-ifwakeup()iscalledbeforeorwhilethisfunctioniscalled

AuthorizationException-ifcallerdoesReadaccesstoanyofthesubscribedtopicsortotheconfiguredgroupId

KafkaException-foranyotherunrecoverableerrors(e.g.invalidgroupIdorsessiontimeout,errorsdeserializingkey/valuepairs,oranynewerrorcasesinfutureversions)

SeeAlso:

poll(long)

Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("group.id","test");

props.put("enable.auto.commit","false");

props.put("auto.commit.interval.ms","1000");

props.put("session.timeout.ms","30000");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("foo","bar"));

finalintminBatchSize=200;

List<ConsumerRecord<String,String>>buffer=newArrayList<>();

while(true){

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records){

buffer.add(record);

}

if(buffer.size()>=minBatchSize){

insertIntoDb(buffer);

consumer.commitSync();

buffer.clear();

}

}

SimpleConsumerdemo:

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

ReadingtheData

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
        .clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 100000)
        .build();
FetchResponse fetchResponse = consumer.fetch(req);
 
if (fetchResponse.hasError()) {
        // See code in previous section
}
numErrors = 0;
 
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    long currentOffset = messageAndOffset.offset();
    if (currentOffset < readOffset) {
        System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
        continue;
    }
    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = messageAndOffset.message().payload();
 
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    numRead++;
    a_maxReads--;
}
 
if (numRead == 0) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ie) {
    }
}

Notethatthe‘readOffset’asksthelastreadmessagewhatthenextOffsetwouldbe.ThiswaywhentheblockofmessagesisprocessedweknowwheretoaskKafkawheretostartthenextfetch.

Alsonotethatweareexplicitlycheckingthattheoffsetbeingreadisnotlessthantheoffsetthatwerequested.ThisisneededsinceifKafkaiscompressingthemessages,thefetchrequestwillreturnanentirecompressedblockeveniftherequestedoffsetisn'tthebeginningofthecompressedblock.Thusamessagewesawpreviouslymaybereturnedagain.NotealsothatweaskforafetchSizeof100000bytes.IftheKafkaproducersarewritinglargebatches,thismightnotbeenough,andmightreturnanemptymessageset.Inthiscase,thefetchSizeshouldbeincreaseduntilanon-emptysetisreturned.

Finally,wekeeptrackofthe#ofmessagesread.Ifwedidn'treadanythingonthelastrequestwegotosleepforasecondsowearen'thammeringKafkawhenthereisnodata.

http://zqhxuyuan.github.io/2016/02/20/Kafka-Consumer-New/

http://blog.csdn.net/xianzhen376/article/details/51167742

相关推荐