php+kafka+zookeeper+logstash
本文主要实现的目标是php连接kafka并且成功发送消息给kafka。为了验证这个连接和发送,另外配置了logstash监听kafka相对应的消息,然后转发到redis,原来我不知道对kafka比较陌生,不知道怎么看里面的消息内容(我知道安装包里有个consumer和producer的脚本) ^ _ ^
消息发送路径:php->kafka->logstash->redis
1.安装kafka
下载地址:https://kafka.apache.org/
下载解压后进入根目录,
bin/zookeeper-server-start.sh config/zookeeper.properties & 开启zookeeper bin/kafka-server-start.sh config/server.properties & 开启kafka
另开一个终端然后
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka
这样就创建了一个topic为kafka的消息通道
如果这个步骤成功的话,可以通过另开终端发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka
执行之后就可以输入消息发送了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning
来接受上面终端发送的消息
2.安装php zookeeper扩展,并且用php发送消息
安装扩展之前需要安装zookeeper的c client,扩展有依赖,步骤如下
1.通过apt-get来安装(楼主用的是ubuntu)
2.通过源码安装,地址:https://github.com/apache/zoo...
下载下来
cd zookeeper ./configure make && sudo make install
按照如上步骤,/usr/local/bin目录下就会多一个cli_mt
php的zookeeper的源码可以去pecl.php.net下载,然后老步骤
phpize ./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安装扩展的php不是默认的php,则需要带上--with-php-config参数) make && sudo make install
最后别忘了添加extension=zookeeper.so到php.ini
3.配置logstash
下载地址:https://www.elastic.co/downlo...
修改配置文件,由于楼主的logstash版本已经是5.2的了,所以又是一阵谷歌,才发现很多网上的配置都是1.2版本的,已经不兼容了。
input{ kafka{ bootstrap_servers=>"localhost:9092" topics=>["kafka"] } } output{ redis{ host=>"127.0.0.1" port=>6379 key=>"kafka" data_type=>"list" password=>"123456" } }
4.php发送消息
网上找了一圈,终于找到一个可以用的
https://github.com/nmred/kafk...
也可以用
composer require "nmred/kafka-php"
php代码如下:
require "./vendor/autoload.php"; $produce = \Kafka\Produce::getInstance('10.37.129.2:2181', 3000); $produce->setRequireAck(-1); $topicName = "kafka"; $partitions = $produce->getAvailablePartitions($topicName); $partCount = count($partitions); var_dump('$partCount:'.$partCount); $count = 0; $message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s'))); //发送消息到不同的partition $partitionId = $count%$partCount; $produce->setMessages($topicName, $partitionId, array($message)); $result = $produce->send(); var_dump($result);
参考文章:http://blog.kazaff.me/2015/06...
最后附一张截图
相关推荐
weikaixxxxxx 2020-08-01
goodstudy 2020-06-05
Kafka 2020-09-18
CobingLiu 2020-06-16
sweetgirl0 2020-06-09
yanghuashuiyue 2020-11-14
huangwei00 2020-10-14
hannuotayouxi 2020-08-20
guicaizhou 2020-08-01
sweetgirl0 2020-07-27
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15