Kafka配置示例

一、kafka消息系统是一个高吞吐分布式高性能的消息系统,支持在线实时消息处理,他可持久化消息到本地磁盘(有生命周期)支持离线消息处理,随着集群负载的增加可横向扩展机器来实现集群扩容,通过zookeeper来实现负载均衡及broker的管理等等。

平台消息系统已上线,可支持大批量消息的传输,支持平台日志系统采集的各种日志的消息订阅,现有土豆实时计算,大屏幕,无线push等项目在使用。

在使用消息系统之前需要先申请访问Token及对应消息通道,平台另外提供了对消息系统调用api封装包,具体权限的申请及api的调用方式大家可以参考一下几节的介绍。

二、KafKa的token账号申请

邮件示例:

Hi,***您好!

我是互动娱乐的**,申请KafKa系统的使用,用于业务的实时数据计算,现申请1种日志的通道!

1.Token:需要申请新Token

2.应用描述:

【消息量】每天约在百万条级别

业务:直播互动礼物实时排行榜日志:log_yule_gittop;

3.申请的通道权限(读或写或读写):读权限

4.申请人联系方式(姓名,邮箱,手机):

三、KafKa日志可以用大数据的同事配置到kafka中

四、KafKa的使用

1、引包

<dependency>

<groupId>com.youku.data</groupId>

<artifactId>data-mq-common</artifactId>

<version>0.8.0</version><!--如不使用新版本特性可以不升级成最新版本-->

</dependency>

2、

五、KafKa的使用

1、发送消息示例:

#创建生产者连接

#MqSystem.CHANNEL_MODE_WRITE说明此通道为生产者模式

#topicName为消息通道名称,管理员创建

#token用户访问通道的token,一个token可以读或写多个消息

#groupId生产者组名(冗余字段,暂时保留)

IChannelwriteChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE,"topicName","token","groupId");

#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用

MqSystem.getMqSystem().setMaxChannleQue(clients);

#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用

MqSystem.getMqSystem().registerMeter(topicName,meterName);

其中meterName命名规范为:sysId_scenId_min_IP.name

说明sysId:用户申请时的分配给的sysId

sceneId:用户申请时的分配给的场景Id

min:代表分钟类型

IP:用户运行程序的机器ip地址

name:用户定义的维度名格式形如:("log"、"ad.vv")注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符

//发送消息,目前只支持字符串

writeChannel.putMsg("testmsg");

2、消费消息示例:

#创建一个消费者连接

#MqSystem.CHANNEL_MODE_READ说明通道为读取模式

#topicName为消息通道名称,管理员创建

#token用户访问通道的token,一个token可以读或写多个消息

#groupId消费者组名(一个或多个消费者在同一个组里)

IChannelreadChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ,"topicName","token","group");

#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用

MqSystem.getMqSystem().setMaxChannleQue(clients);

#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用

MqSystem.getMqSystem().registerMeter(topicName,meterName);

其中meterName命名规范为:sysId_scenId_min_IP.name

说明sysId:用户申请时的分配给的sysId

sceneId:用户申请时的分配给的场景Id

min:代表分钟类型

IP:用户运行程序的机器ip地址

name:用户定义的维度名格式形如:("log"、"ad.vv")注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符

#接收消息

Stringmsg=readChannel.getMsg();

3、生产者代码示例:

publicclassProducer{

//........

publicstaticvoidmain(String[]args){

......//你的代码业务逻辑

//设置队列大小,可以不设置,默认为1个

MqSystem.getMqSystem().setMaxChannleQue(1);

//统计此topic的meter信息,名字为:meterName;可以不设置,默认不统计

MqSystem.getMqSystem().registerMeter(topic,meterName);

newThread(newSend()).start();

...........

}

classSendimplementsRunnable{

privateIChannelwriteChannel;

publicSend(){

try{

//初始化时获取通道连接

writeChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE,topic,token,group);

}catch(MqExceptione){

e.printStackTrace();

}

}

publicvoidrun(){

........//你的业务逻辑

writeChannel.putMsg(msg);//将生产的msg消息放入kafka

}

}

}

4、消费者代码示例:

publicclassComsumer{

//........

publicstaticvoidmain(String[]args){

......//你的代码业务逻辑

//设置队列大小,可以不设置,默认为1个

//MqSystem.getMqSystem().setMaxChannleQue(1);

统计此topic的meter信息,名字为:meterName;可以不设置,默认不统计

//MqSystem.getMqSystem().registerMeter(topic,meterName);

newThread(newReceive()).start();

.......

}

classReceiveimplementsRunnable{

privateIChannelreadChannel;

publicSend(){

try{

//初始化时获取通道连接

readChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ,topic,token,group);

}catch(MqExceptione){

e.printStackTrace();

}

}

publicvoidrun(){

........//你的业务逻辑

readChannel.getMsg();

........

}

}

}

相关推荐