高吞吐的分布式发布订阅消息系统——Kafka

标题就是Kafka的定义,它用够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。

高吞吐的分布式发布订阅消息系统——Kafka

我主要使用它来作数据实时计算,统计各种报表,如:小时报表、周报表、月报表、年报表等,以及其它报表,如:复购率统计,当然还有其它用途,这里只是抛砖引玉。

Kafka使用场景

日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

消息系统:解耦和生产者和消费者、缓存消息等。

用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

流式处理:比如spark streaming和storm

高吞吐的分布式发布订阅消息系统——Kafka

Kafka特性

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

可扩展性:kafka集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

高并发:支持数千个客户端同时读写

基本概念介绍

Broker:Kafka集群包含一个或多个服务器,这些服务器就是Broker

Topic:每条发布到Kafka集群的消息都必须有一个Topic

Partition:是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹

Producer:消息产生者,负责生产消息并发送到Kafka Broker

Consumer:消息消费者,向kafka broker读取消息并处理的客户端。

Consumer Group:每个Consumer属于一个特定的组,组可以用来实现一条消息被组内多个成员消费等功能。

下面介绍Kafka 在Centos 7上的安装和使用,包括功能验证和集群的简单配置。

1.安装JDK

若已安装jdk环境,跳过此步骤。Kafka 使用Zookeeper 来保存相关配置信息,Kafka及Zookeeper 依赖Java 运行环境,从oracle网站下载JDK 安装包,解压安装:

$ tar zxvf jdk-8u65-linux-x64.tar.gz

$ mv jdk1.8.0_65 java

设置Java 环境变量:

JAVA_HOME=/opt/java

PATH=$PATH:$JAVA_HOME/bin

export JAVA_HOME PATH

也可以选择yum install安装,相应设置环境变量。

2. 安装Kafka

从官网(http://kafka.apache.org/downloads.html)下载Kafka 安装包,解压安装:

tar zxvf kafka_2.11-0.8.2.2.tgz

mv kafka_2.11-0.8.2.2 /opt/zookeeper/kafka

cd /opt/zookeeper/kafka

3.相关配置

kafak的配置文件在/opt/kafka_2.11/config下叫server.propertie,释义如下:

broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,但是不管你怎么配,别配0就是,不然创建Topic的时候回报错。

port=19092 #当前kafka对外提供服务的端口默认是9092

host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

num.partitions=1 #默认的分区数,一个topic默认1个分区数

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务

replica.fetch.max.bytes=5242880 #取消息的最大直接数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除

log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能

zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

实际上需要修改的就几个:

broker.id=133 #每台服务器的broker.id都不能相同

host.name=192-168-253-133 #主机名

listeners=PLAINTEXT://PLAINTEXT://192.168.1.220:9092 #监听地址

advertised.listeners=PLAINTEXT://192.168.1.220:9092

并开启端口

#在log.retention.hours=168 下追加

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口

zookeeper.connect=192.168.1.220:2181

4.启动Zookeeper

使用安装包中的脚本启动单节点Zookeeper 实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

[2015-10-26 04:26:59,585] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)....

当kafka启动成功后,在终端上输入“jps”,会得到相应如下:

7253 Jps

5850 ZooKeeperMain

6076 QuorumPeerMain

6093 Kafka

QuorumPeerMain是zookeeper的守护进程,kafka是kafka的守护进程。

5.启动Kafka 服务

使用kafka-server-start.sh 启动kafka 服务:

bin/kafka-server-start.sh config/server.properties

[2015-10-26 04:28:56,115] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2015-10-26 04:28:56,141] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)

6.创建topic

使用kafka-topics.sh 创建单分区单副本的topic test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

7.查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

test

8.产生消息

使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Hello world!

Hello Kafka!

9.消费消息

使用kafka-console-consumer.sh 接收消息并在终端打印:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 kafka:2181 --topic test --from-beginning

Hello world!

Hello Kafka!

10.删除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

11.查看描述 Topic 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

12.关闭

bin/kafka-server-stop.sh

13.关不掉可以强制kill

kill -9 进程id

14.查看消费分组

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

test2

test

console-consumer-95156

test1

15.查看特定consumer group 详情,使用--group与--describe参数

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test --describe

集群配置

高吞吐的分布式发布订阅消息系统——Kafka

1.单机多broker 集群配置

利用单节点部署多个broker。 不同的broker 设置不同的 id,监听端口及日志目录。 例如:

cp config/server.properties config/server-1.properties

编辑配置:

config/server-1.properties:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

启动Kafka服务:

bin/kafka-server-start.sh config/server-1.properties &

启动多个服务,按上文类似方式产生和消费消息。

2.多机多broker 集群配置

分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。 例如: 在192.168.1.221,192.168.1.222,192.168.1.223 三台机器部署,Zookeeper配置如下:

initLimit=5

syncLimit=2

server.1=192.168.1.221:2888:3888

server.2=192.168.1.222:2888:3888

server.3=192.168.1.223:2888:3888

分别配置多个机器上的Kafka服务 设置不同的broke id,zookeeper.connect设置如下:

zookeeper.connect=192.168.1.221:2181,192.168.1.222:2181,192.168.1.223:2181

启动Zookeeper与Kafka服务,按上文方式产生和消费消息,验证集群功能。

好了,Kafka已经安装完毕。它支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。

赶快试试吧!

相关推荐