storm drpc 是什么?咋一听觉得挺高大上的,其实也就是那么一回事。就是storm的topology 提供了很多函数,并且函数名唯一,函数里面封装了一些算法操作。只需要在调用的时候指定函数名和传递参数就可以得到返回值。简单的来说就是这么一回事。下面是客户端调用的例子:
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
DRPCClient client = new DRPCClient(null,"192.168.19.10", 3772);//drpc 服务的地址和端口
String result = client.execute("drpcTest", "hello world ");//调用drpcTest函数,传递参数为hello world
System.out.println(result);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
上面就是一个调用drpc服务的例子,姑且不去探究drpcTest函数里面做了什么操作,drpc调用就是那么简单,下面看一下本地drpc和远程drpc有什么区别:
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("drpcTest");
builder.addBolt(new DrpcTestBolt(), 3);
Config conf = new Config();
if (args == null || args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-test", conf, builder.createLocalTopology(drpc));
//客户端调用
for (String word : new String[]{ "hello", "storm" }) {
System.err.println("Result for \"" + word + "\": " + drpc.execute("drpcTest", word));
}
// cluster.shutdown();
// drpc.shutdown();
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
可以看到drpc的本地模式调用传递了LocalDRPC对象,而集群模式不需要传递这个参数,下面分别看看两个函数里面做了什么操作:
public StormTopology createLocalTopology(ILocalDRPC drpc) {
return this.createTopology(new DRPCSpout(this._function, drpc));
}
public StormTopology createRemoteTopology() {
return this.createTopology(new DRPCSpout(this._function));
}
可以看到两个函数的区别只是在于是否传递了localDrpc对象。继续查看createTopology()函数:
private StormTopology createTopology(DRPCSpout spout) {
String SPOUT_ID = "spout";
String PREPARE_ID = "prepare-request";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);
builder.setBolt("prepare-request", new PrepareRequest()).noneGrouping("spout");
.
.
.
IRichBolt var13 = ((LinearDRPCTopologyBuilder.Component)this._components.get(this._components.size() - 1)).bolt;
OutputFieldsGetter var14 = new OutputFieldsGetter();
var13.declareOutputFields(var14);
Map var15 = var14.getFieldsDeclaration();
if(var15.size() != 1) {
throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
} else {
String var16 = (String)var15.keySet().iterator().next();
List var18 = ((StreamInfo)var15.get(var16)).get_output_fields();
if(var18.size() != 2) {
throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
} else {
((BoltDeclarer)builder.setBolt(boltId(i), new JoinResult("prepare-request")).fieldsGrouping(boltId(i - 1), var16, new Fields(new String[]{(String)var18.get(0)}))).fieldsGrouping("prepare-request", "ret", new Fields(new String[]{"request"}));
++i;
builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(boltId(i - 1));
return builder.createTopology();
}
}
}
函数里面主要做了以下几项工作:
1)构建spout。
2)使用ReturnResults向DRPC Server返回结果。
3)为Bolt提供函数用于对tuples进行处理。
如果需要自定义DRPC的话,那么也可以参照LinearDRPCTopologyBuilder的写法:
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("drpcTest2", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("test", new TestBolt(), 3).shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("test");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("testDrpc2", conf, builder.createTopology());
System.out.println(drpc.execute("testDrpc2", "hello"));
System.out.println(drpc.execute("testDrpc2", "storm"));
}
TestBolt:
public static class TestBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "----", retInfo));
}
}
结果:
hello---
storm---
总结,DRPC的流程如下:
DRPC通过DRPC Server来实现,DRPC Server的工作过程分解为以下四个过程:
1.接收到一个客户端的RPC调用请求.
2.发送请求到Storm上的拓扑.
3.从Storm上接收计算结果.
4.将计算结果返回给客户端.
流程图:
Client向DRPC Server发送被调用执行的DRPC函数名称及参数;
Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流;
DRPC Server会为每次函数调用生成唯一的id;
Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC Server,发送指定id的计算结果;
DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。