Kafka - PHP 使用 Rdkafka 生产/消费数据
Kafka集群部署
安装rdkafka
rdkafka
依赖 libkafka
yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka
http://pecl.php.net/package/r... 可以参阅支持的kafka
客户端版本
生产者
连接集群,创建 topic
,生产数据。
<?php $rk = new Rdkafka\Producer(); $rk->setLogLevel(LOG_DEBUG); // 链接kafka集群 $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); // 创建topic $topic = $rk->newTopic("topic_1"); while (true) { $message = "hello kafka " . date("Y-m-d H:i:s"); echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (\Exception $e) { echo $e->getMessage() . PHP_EOL; } }
消费者-HighLevel
自动分配partition
,rebalance
,comsumer group
。
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'group_1'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', '192.168.20.6:9092,192.168.20.6:9093'); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'topic_1' $consumer->subscribe(['topic_1']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
消费者-LowLevel
指定partition
消费。php consumer_lowlevel.php [partitonNuo]
LowLevel
没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。
<?php if (!isset($argv[1])) { fwrite(STDOUT, "请指定消费分区:"); $partition = (int) fread(STDIN, 1024); } else { $partition = (int) $argv[1]; } $topic = "topic_1"; $conf = new RdKafka\Conf(); // Set the group id. This is required when storing offsets on the broker $conf->set('group.id', 'group_2'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers('192.168.20.6:9092,192.168.20.6:9093'); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 2000); // Set the offset store method to 'file' // $topicConf->set('offset.store.method', 'file'); // $topicConf->set('offset.store.path', sys_get_temp_dir()); // Alternatively, set the offset store method to 'broker' $topicConf->set('offset.store.method', 'broker'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic($topic, $topicConf); // Start consuming partition 0 $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
相关推荐
zyyjay 2020-11-09
xuebingnan 2020-11-05
samtrue 2020-11-22
stefan0 2020-11-22
yifangs 2020-10-13
songshijiazuaa 2020-09-24
hebiwtc 2020-09-18
天步 2020-09-17
83911535 2020-11-13
whatsyourname 2020-11-13
zhouyuqi 2020-11-10
Noneyes 2020-11-10
mathchao 2020-10-28
王志龙 2020-10-28
wwwsurfphpseocom 2020-10-28
diskingchuan 2020-10-23
savorTheFlavor 2020-10-23