高吞吐的分布式发布订阅消息系统——Kafka
标题就是Kafka的定义,它用够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
我主要使用它来作数据实时计算,统计各种报表,如:小时报表、周报表、月报表、年报表等,以及其它报表,如:复购率统计,当然还有其它用途,这里只是抛砖引玉。
Kafka使用场景
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和storm
Kafka特性
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写
基本概念介绍
Broker:Kafka集群包含一个或多个服务器,这些服务器就是Broker
Topic:每条发布到Kafka集群的消息都必须有一个Topic
Partition:是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹
Producer:消息产生者,负责生产消息并发送到Kafka Broker
Consumer:消息消费者,向kafka broker读取消息并处理的客户端。
Consumer Group:每个Consumer属于一个特定的组,组可以用来实现一条消息被组内多个成员消费等功能。
下面介绍Kafka 在Centos 7上的安装和使用,包括功能验证和集群的简单配置。
1.安装JDK
若已安装jdk环境,跳过此步骤。Kafka 使用Zookeeper 来保存相关配置信息,Kafka及Zookeeper 依赖Java 运行环境,从oracle网站下载JDK 安装包,解压安装:
$ tar zxvf jdk-8u65-linux-x64.tar.gz
$ mv jdk1.8.0_65 java
设置Java 环境变量:
JAVA_HOME=/opt/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH
也可以选择yum install安装,相应设置环境变量。
2. 安装Kafka
从官网(http://kafka.apache.org/downloads.html)下载Kafka 安装包,解压安装:
tar zxvf kafka_2.11-0.8.2.2.tgz
mv kafka_2.11-0.8.2.2 /opt/zookeeper/kafka
cd /opt/zookeeper/kafka
3.相关配置
kafak的配置文件在/opt/kafka_2.11/config下叫server.propertie,释义如下:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,但是不管你怎么配,别配0就是,不然创建Topic的时候回报错。
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
实际上需要修改的就几个:
broker.id=133 #每台服务器的broker.id都不能相同
host.name=192-168-253-133 #主机名
listeners=PLAINTEXT://PLAINTEXT://192.168.1.220:9092 #监听地址
advertised.listeners=PLAINTEXT://192.168.1.220:9092
并开启端口
#在log.retention.hours=168 下追加
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.1.220:2181
4.启动Zookeeper
使用安装包中的脚本启动单节点Zookeeper 实例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2015-10-26 04:26:59,585] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)....
当kafka启动成功后,在终端上输入“jps”,会得到相应如下:
7253 Jps
5850 ZooKeeperMain
6076 QuorumPeerMain
6093 Kafka
QuorumPeerMain是zookeeper的守护进程,kafka是kafka的守护进程。
5.启动Kafka 服务
使用kafka-server-start.sh 启动kafka 服务:
bin/kafka-server-start.sh config/server.properties
[2015-10-26 04:28:56,115] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-10-26 04:28:56,141] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
6.创建topic
使用kafka-topics.sh 创建单分区单副本的topic test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
7.查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
8.产生消息
使用kafka-console-producer.sh 发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello world!
Hello Kafka!
9.消费消息
使用kafka-console-consumer.sh 接收消息并在终端打印:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 kafka:2181 --topic test --from-beginning
Hello world!
Hello Kafka!
10.删除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
11.查看描述 Topic 信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
12.关闭
bin/kafka-server-stop.sh
13.关不掉可以强制kill
kill -9 进程id
14.查看消费分组
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
test2
test
console-consumer-95156
test1
15.查看特定consumer group 详情,使用--group与--describe参数
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test --describe
集群配置
1.单机多broker 集群配置
利用单节点部署多个broker。 不同的broker 设置不同的 id,监听端口及日志目录。 例如:
cp config/server.properties config/server-1.properties
编辑配置:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
启动Kafka服务:
bin/kafka-server-start.sh config/server-1.properties &
启动多个服务,按上文类似方式产生和消费消息。
2.多机多broker 集群配置
分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。 例如: 在192.168.1.221,192.168.1.222,192.168.1.223 三台机器部署,Zookeeper配置如下:
initLimit=5
syncLimit=2
server.1=192.168.1.221:2888:3888
server.2=192.168.1.222:2888:3888
server.3=192.168.1.223:2888:3888
分别配置多个机器上的Kafka服务 设置不同的broke id,zookeeper.connect设置如下:
zookeeper.connect=192.168.1.221:2181,192.168.1.222:2181,192.168.1.223:2181
启动Zookeeper与Kafka服务,按上文方式产生和消费消息,验证集群功能。
好了,Kafka已经安装完毕。它支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
赶快试试吧!