Storm的配置
2.2 Storm的配置 2.2.1 Storm的配置类型 Storm有大量的配置,可以调整Nimbus、Supervisor、拓扑运行的参数,其中有些配置是不能修改的系统配置,而其他配置是可以修改的。 每个配置会有一个默认值,该值定义在Storm代码库的defaults.yaml文件中。在Nimbus和Supervisor的类路径中定义一个storm.yaml文件,可以覆盖这些配置值。使用StormSubmitter提交拓扑的时候,可以定义一个指定拓扑的配置,但是只能覆盖前缀为TOPOLOGY的配置项。 Storm 0.7.0以后的版本开始允许在Spout/Bolt中覆盖配置,可以修改的配置主要有: "topology.debug"。 "topology.max.spout.pending"。 "topology.max.task.parallelism"。 "topology.kryo.register"。 topology.kryo.register与其他的配置有所不同,它的序列化会应用到拓扑上的所有组件。 Storm的Java API也提供了两种方式指定组件的配置。 内部的(Internally) 在Spout或者Bolt类中,覆盖getComponentConfiguration方法,返回组件配置的Map对象。 getComponentConfiguration方法定义如下: Map<String, Object> getComponentConfiguration() 外部的(Externally) 使用TopologyBuilder类的setSpout方法返回SpoutDeclarer对象,使用setBolt方法返回BoltDeclarer对象。SpoutDeclarer与BoltDeclarer实现了ComponentConfigurationDeclarer接口,该接口有addConfiguration方法和addConfigurations方法,可以通过调用这两个方法来覆盖组件的配置。 SpoutDeclarer接口的定义代码如下: public interface SpoutDeclarer extends ComponentConfigurationDeclarer<SpoutDeclarer> { } BoltDeclarer接口的定义代码如下: public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>, ComponentConfigurationDeclarer<BoltDeclarer> { } ComponentConfigurationDeclarer接口的定义代码如下: public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> { T addConfigurations(Map conf); T addConfiguration(String config, Object value); T setDebug(boolean debug); T setMaxTaskParallelism(Number val); T setMaxSpoutPending(Number val); T setNumTasks(Number val); } Storm配置值的优先顺序为: defaults.yaml < storm.yaml < 特定拓扑的配置 < 内部特定组件的配置 < 外部特定组件的配置
-------------------------------------------------
public interface IComponent extends Serializable { void declareOutputFields(OutputFieldsDeclarer var1); Map<String, Object> getComponentConfiguration(); } public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; }
--------------------------------------------------
又看了一下RollingTopWords,让我比较关注的地方是在RollingCountBolt类里,是通过方法TupleHelpers.isTickTuple(tuple)来判断是否应该发射当前窗口数据,但是判断的依据一开始让我很迷惑,居然是判断该tuple是否来源于“__system”的组件和“__tick”流。 作为对storm了解不多的人,我真的糊涂了,tuple不都是上游的spout发射来的吗,哪里冒出来源不同的tuple。 好吧,我就开始猜了,莫非有个隐藏的spout?或者RollingCountBolt自己给自己发什么特殊的tuple。 正毫无头绪时,奇迹出现了,我把鼠标移到Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS这个常量上时,出现了一行小提示: How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a component-specific configuration. 哦,在方法getComponentConfiguration() 里 conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); 这句话告诉系统,需要按照emitFrequencyInSeconds的频率,产生来源于“__system”的组件和“__tick”流的tuple给task。 好了,这就是RollingTopWords中,定时产生特殊tuple的方法,对于我这种新手,算是又有点进步了。
------------------------------------------------
滑动窗口在监控和统计应用的场景比较广泛,比如每隔一段时间(10s)统计最近30s的请求量或者异常次数,根据请求或者异常次数采取相应措施;这里说一下滑动窗口在storm中实现的原理。参见下图:
窗口大小为30s,每10s就统计一次,那么窗口一共有3个slot,可以对窗口建立长度为3的数组;在storm的blot中在10s内通过execute(tuple)功能不停的把接收的tuple进行count个数(假如内置变量为tuple_count),每10s会自动触发滑动窗口的移动工作(Array[slot3]=》Array[slot2],Array[slot2]=》Array[slot1]),并存储当前tuple_count值到Array[slot3],随之可以进行统计窗口的数据了。
那么如何每10s进行自动触发,storm有一个TickTuple可以满足这个要求,
"__system"component会定时往task发送"__tick"stream的tuple
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置,可以在default.ymal里面配置
也可以在代码里面通过getComponentConfiguration()来进行配置,
publicMap<String,Object>getComponentConfiguration(){
Map<String,Object>conf=newHashMap<String,Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,emitFrequencyInSeconds);
returnconf;
配置完成后,storm就会定期的往task发送ticktuple
只需要通过isTickTuple来判断是否为tickTuple,就可以完成定时触发的功能
publicstaticbooleanisTickTuple(Tupletuple){
returntuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)\\SYSTEM_COMPONENT_ID=="__system"
&&tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);\\SYSTEM_TICK_STREAM_ID=="__tick"
}