Kafka配置示例
一、kafka消息系统是一个高吞吐分布式高性能的消息系统,支持在线实时消息处理,他可持久化消息到本地磁盘(有生命周期)支持离线消息处理,随着集群负载的增加可横向扩展机器来实现集群扩容,通过zookeeper来实现负载均衡及broker的管理等等。
平台消息系统已上线,可支持大批量消息的传输,支持平台日志系统采集的各种日志的消息订阅,现有土豆实时计算,大屏幕,无线push等项目在使用。
在使用消息系统之前需要先申请访问Token及对应消息通道,平台另外提供了对消息系统调用api封装包,具体权限的申请及api的调用方式大家可以参考一下几节的介绍。
二、KafKa的token账号申请
邮件示例:
Hi,***您好!
我是互动娱乐的**,申请KafKa系统的使用,用于业务的实时数据计算,现申请1种日志的通道!
1.Token:需要申请新Token
2.应用描述:
【消息量】每天约在百万条级别
业务:直播互动礼物实时排行榜日志:log_yule_gittop;
3.申请的通道权限(读或写或读写):读权限
4.申请人联系方式(姓名,邮箱,手机):
三、KafKa日志可以用大数据的同事配置到kafka中
四、KafKa的使用
1、引包
<dependency>
<groupId>com.youku.data</groupId>
<artifactId>data-mq-common</artifactId>
<version>0.8.0</version><!--如不使用新版本特性可以不升级成最新版本-->
</dependency>
2、
五、KafKa的使用
1、发送消息示例:
#创建生产者连接
#MqSystem.CHANNEL_MODE_WRITE说明此通道为生产者模式
#topicName为消息通道名称,管理员创建
#token用户访问通道的token,一个token可以读或写多个消息
#groupId生产者组名(冗余字段,暂时保留)
IChannelwriteChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE,"topicName","token","groupId");
#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().setMaxChannleQue(clients);
#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().registerMeter(topicName,meterName);
其中meterName命名规范为:sysId_scenId_min_IP.name
说明sysId:用户申请时的分配给的sysId
sceneId:用户申请时的分配给的场景Id
min:代表分钟类型
IP:用户运行程序的机器ip地址
name:用户定义的维度名格式形如:("log"、"ad.vv")注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符
//发送消息,目前只支持字符串
writeChannel.putMsg("testmsg");
2、消费消息示例:
#创建一个消费者连接
#MqSystem.CHANNEL_MODE_READ说明通道为读取模式
#topicName为消息通道名称,管理员创建
#token用户访问通道的token,一个token可以读或写多个消息
#groupId消费者组名(一个或多个消费者在同一个组里)
IChannelreadChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ,"topicName","token","group");
#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().setMaxChannleQue(clients);
#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().registerMeter(topicName,meterName);
其中meterName命名规范为:sysId_scenId_min_IP.name
说明sysId:用户申请时的分配给的sysId
sceneId:用户申请时的分配给的场景Id
min:代表分钟类型
IP:用户运行程序的机器ip地址
name:用户定义的维度名格式形如:("log"、"ad.vv")注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符
#接收消息
Stringmsg=readChannel.getMsg();
3、生产者代码示例:
publicclassProducer{
//........
publicstaticvoidmain(String[]args){
......//你的代码业务逻辑
//设置队列大小,可以不设置,默认为1个
MqSystem.getMqSystem().setMaxChannleQue(1);
//统计此topic的meter信息,名字为:meterName;可以不设置,默认不统计
MqSystem.getMqSystem().registerMeter(topic,meterName);
newThread(newSend()).start();
...........
}
classSendimplementsRunnable{
privateIChannelwriteChannel;
publicSend(){
try{
//初始化时获取通道连接
writeChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE,topic,token,group);
}catch(MqExceptione){
e.printStackTrace();
}
}
publicvoidrun(){
........//你的业务逻辑
writeChannel.putMsg(msg);//将生产的msg消息放入kafka
}
}
}
4、消费者代码示例:
publicclassComsumer{
//........
publicstaticvoidmain(String[]args){
......//你的代码业务逻辑
//设置队列大小,可以不设置,默认为1个
//MqSystem.getMqSystem().setMaxChannleQue(1);
统计此topic的meter信息,名字为:meterName;可以不设置,默认不统计
//MqSystem.getMqSystem().registerMeter(topic,meterName);
newThread(newReceive()).start();
.......
}
classReceiveimplementsRunnable{
privateIChannelreadChannel;
publicSend(){
try{
//初始化时获取通道连接
readChannel=MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ,topic,token,group);
}catch(MqExceptione){
e.printStackTrace();
}
}
publicvoidrun(){
........//你的业务逻辑
readChannel.getMsg();
........
}
}
}