storm drpc实例
序
本文主要演示一下storm drpc实例
配置
version: '2' services: supervisor: image: storm container_name: supervisor command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 6700:6700 - 6701:6701 - 6702:6702 - 6703:6703 - 8000:8000 drpc: image: storm container_name: drpc command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - supervisor - zookeeper links: - nimbus - supervisor - zookeeper restart: always ports: - 3772:3772 - 3773:3773 - 3774:3774
- 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
- 对于drpc服务,则暴露drpc.port(
好让外部的DRPCClient访问
)、drpc.invocations.port(让worker访问
)
TridentTopology
@Test public void testDeployDRPCStateQuery() throws InterruptedException, TException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) //NOTE transforms a Stream into a TridentState object .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6); topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); StormTopology stormTopology = topology.build(); //远程提交 mvn clean package -Dmaven.test.skip=true //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交 System.setProperty("storm.jar",TOPOLOGY_JAR); Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181 StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology); }
- 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
- 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery
DRPCClient
@Test public void testLaunchDrpcClient() throws TException { Config conf = new Config(); //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针 conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName()); conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000); conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772); System.out.println(client.execute("words", "cat dog the man")); }
- 注意这里的配置项不能少,否则会引发空指针
- Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
- 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
- DRPCClient配置了drpc的地址及port
- client.execute这里要传入newDRPCStream指定的function名称
小结
- 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(
DRPCClient使用的是thrift协议调用
) - supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
- DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称
doc
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15