博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[case43]聊聊storm的LinearDRPCTopologyBuilder
阅读量:6290 次
发布时间:2019-06-22

本文共 21269 字,大约阅读时间需要 70 分钟。

本文主要研究一下storm的LinearDRPCTopologyBuilder

实例

manual drpc

@Test    public void testManualDRPC() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {        TopologyBuilder builder = new TopologyBuilder();        DRPCSpout spout = new DRPCSpout("exclamation"); //Fields("args", "return-info")        //spout为DRPCSpout,组件id为drpc        builder.setSpout("drpc", spout);        builder.setBolt("exclaim", new ManualExclaimBolt(), 3).shuffleGrouping("drpc"); //Fields("result", "return-info")        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");        SubmitHelper.submitRemote("manualDrpc",builder.createTopology());    }
  • 这里展示了最原始的drpc的topology的构建,开始使用DRPCSpout,结束使用ReturnResults
  • DRPCSpout的outputFields为Fields("args", "return-info"),ReturnResults接收的fields为Fields("result", "return-info")
  • 这里要求自定义的ManualExclaimBolt的outputFields为Fields为Fields("result", "return-info"),其中return-info可以从input中获取,而result则会处理结果

使用LinearDRPCTopologyBuilder

@Test    public void testBasicDRPCTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");        builder.addBolt(new ExclaimBolt(), 3);        SubmitHelper.submitRemote("basicDrpc",builder.createRemoteTopology());    }
  • LinearDRPCTopologyBuilder自动帮你构建了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,在使用上极为简洁
  • 由于构造的component上下游不同,因而对用户自定义的bolt的要求为输入字段为Fields("request", "args"),输出字段为new Fields("id", "result"),其中前者的request即为requestId,即为后者的id,是long型;args为输入参数,result为输出结果

LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java

public class LinearDRPCTopologyBuilder {    String function;    List
components = new ArrayList<>(); public LinearDRPCTopologyBuilder(String function) { this.function = function; } private static String boltId(int index) { return "bolt" + index; } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { return addBolt(new BatchBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { return addBolt(bolt, 1); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { if (parallelism == null) { parallelism = 1; } Component component = new Component(bolt, parallelism.intValue()); components.add(component); return new InputDeclarerImpl(component); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { return addBolt(bolt, null); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { return addBolt(new BasicBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { return addBolt(bolt, null); } public StormTopology createLocalTopology(ILocalDRPC drpc) { return createTopology(new DRPCSpout(function, drpc)); } public StormTopology createRemoteTopology() { return createTopology(new DRPCSpout(function)); } private StormTopology createTopology(DRPCSpout spout) { final String SPOUT_ID = "spout"; final String PREPARE_ID = "prepare-request"; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout); builder.setBolt(PREPARE_ID, new PrepareRequest()) .noneGrouping(SPOUT_ID); int i = 0; for (; i < components.size(); i++) { Component component = components.get(i); Map
source = new HashMap
(); if (i == 1) { source.put(boltId(i - 1), SourceArgs.single()); } else if (i >= 2) { source.put(boltId(i - 1), SourceArgs.all()); } IdStreamSpec idSpec = null; if (i == components.size() - 1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } BoltDeclarer declarer = builder.setBolt( boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); for (SharedMemory request : component.sharedMemory) { declarer.addSharedMemory(request); } if (!component.componentConf.isEmpty()) { declarer.addConfigurations(component.componentConf); } if (idSpec != null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); } if (i == 0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; if (i == 0) { prevId = PREPARE_ID; } else { prevId = boltId(i - 1); } for (InputDeclaration declaration : component.declarations) { declaration.declare(prevId, declarer); } } if (i > 0) { declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID); } } IRichBolt lastBolt = components.get(components.size() - 1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map
streams = getter.getFieldsDeclaration(); if (streams.size() != 1) { throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); } String outputStream = streams.keySet().iterator().next(); List
fields = streams.get(outputStream).get_output_fields(); if (fields.size() != 2) { throw new RuntimeException( "Output stream of last component in LinearDRPCTopology must contain exactly two fields. " + "The first should be the request id, and the second should be the result."); } builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); i++; builder.setBolt(boltId(i), new ReturnResults()) .noneGrouping(boltId(i - 1)); return builder.createTopology(); } //......}
  • 从createTopology可以看到,构建的spout为DRPCSpout(spout),之后是PrepareRequest(prepare-request)
  • 之后根据用户设置的bolt,包装构建CoordinatedBolt,如果有多个bolt的话,会对第二个及之后的bolt设置directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID),用emitDirect发射Fields("id", "count")
  • 构建完用户设置的bolt之后,构建JoinResult,最后才是ReturnResults

DRPCSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java

public class DRPCSpout extends BaseRichSpout {    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS    static final long serialVersionUID = 2387848310969237877L;    final String _function;    final String _local_drpc_id;    SpoutOutputCollector _collector;    List
_clients = new ArrayList<>(); transient LinkedList
> _futures = null; transient ExecutorService _backround = null; public DRPCSpout(String function) { _function = function; if (DRPCClient.isLocalOverride()) { _local_drpc_id = DRPCClient.getOverrideServiceId(); } else { _local_drpc_id = null; } } //...... @Override public void open(Map
conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; if (_local_drpc_id == null) { _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
()); _futures = new LinkedList<>(); int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); int index = context.getThisTaskIndex(); int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); List
servers = (List
) conf.get(Config.DRPC_SERVERS); if (servers == null || servers.isEmpty()) { throw new RuntimeException("No DRPC servers configured for topology"); } if (numTasks < servers.size()) { for (String s : servers) { _futures.add(_backround.submit(new Adder(s, port, conf))); } } else { int i = index % servers.size(); _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); } } } @Override public void close() { for (DRPCInvocationsClient client : _clients) { client.close(); } } @Override public void nextTuple() { if (_local_drpc_id == null) { int size = 0; synchronized (_clients) { size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end } for (int i = 0; i < size; i++) { DRPCInvocationsClient client; synchronized (_clients) { client = _clients.get(i); } if (!client.isConnected()) { LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort()); reconnectAsync(client); continue; } try { DRPCRequest req = client.fetchRequest(_function); if (req.get_request_id().length() > 0) { Map
returnInfo = new HashMap<>(); returnInfo.put("id", req.get_request_id()); returnInfo.put("host", client.getHost()); returnInfo.put("port", client.getPort()); _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i)); break; } } catch (AuthorizationException aze) { reconnectAsync(client); LOG.error("Not authorized to fetch DRPC request from DRPC server", aze); } catch (TException e) { reconnectAsync(client); LOG.error("Failed to fetch DRPC request from DRPC server", e); } catch (Exception e) { LOG.error("Failed to fetch DRPC request from DRPC server", e); } } checkFutures(); } else { //...... } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; if (_local_drpc_id == null) { client = _clients.get(did.index); } else { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); } int retryCnt = 0; int maxRetries = 3; while (retryCnt < maxRetries) { retryCnt++; try { client.failRequest(did.id); break; } catch (AuthorizationException aze) { LOG.error("Not authorized to failRequest from DRPC server", aze); throw new RuntimeException(aze); } catch (TException tex) { if (retryCnt >= maxRetries) { LOG.error("Failed to fail request", tex); break; } reconnectSync((DRPCInvocationsClient) client); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("args", "return-info")); } //......}
  • open的时候准备DRPCInvocationsClient
  • nextTuple方法通过DRPCInvocationsClient.fetchRequest(_function)获取DRPCRequest信息
  • 之后构建returnInfo然后emit数据,msgId为DRPCMessageId,tuple为Values(req.get_func_args(), JSONValue.toJSONString(returnInfo))
  • 这里重写了fail方法,对于请求失败,进行重试,默认重试3次

PrepareRequest

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java

public class PrepareRequest extends BaseBasicBolt {    public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;    public static final String RETURN_STREAM = "ret";    public static final String ID_STREAM = "id";    Random rand;    @Override    public void prepare(Map
map, TopologyContext context) { rand = new Random(); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String args = tuple.getString(0); String returnInfo = tuple.getString(1); long requestId = rand.nextLong(); collector.emit(ARGS_STREAM, new Values(requestId, args)); collector.emit(RETURN_STREAM, new Values(requestId, returnInfo)); collector.emit(ID_STREAM, new Values(requestId)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(ARGS_STREAM, new Fields("request", "args")); declarer.declareStream(RETURN_STREAM, new Fields("request", "return")); declarer.declareStream(ID_STREAM, new Fields("request")); }}
  • PrepareRequest取出args及returnInfo,构造requestId,然后emit到ARGS_STREAM、RETURN_STREAM、ID_STREAM三个stream
  • JoinResult会接收PrepareRequest的RETURN_STREAM,第一个CoordinatedBolt会接收ARGS_STREAM

CoordinatedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java

/** * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries. */public class CoordinatedBolt implements IRichBolt {        private TimeCacheMap
_tracked; //...... public void execute(Tuple tuple) { Object id = tuple.getValue(0); TrackingInfo track; TupleType type = getTupleType(tuple); synchronized (_tracked) { track = _tracked.get(id); if (track == null) { track = new TrackingInfo(); if (_idStreamSpec == null) { track.receivedId = true; } _tracked.put(id, track); } } if (type == TupleType.ID) { synchronized (_tracked) { track.receivedId = true; } checkFinishId(tuple, type); } else if (type == TupleType.COORD) { int count = (Integer) tuple.getValue(1); synchronized (_tracked) { track.reportCount++; track.expectedTupleCount += count; } checkFinishId(tuple, type); } else { synchronized (_tracked) { _delegate.execute(tuple); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { _delegate.declareOutputFields(declarer); declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count")); } //...... public static class TrackingInfo { int reportCount = 0; int expectedTupleCount = 0; int receivedTuples = 0; boolean failed = false; Map
taskEmittedTuples = new HashMap<>(); boolean receivedId = false; boolean finished = false; List
ackTuples = new ArrayList<>(); @Override public String toString() { return "reportCount: " + reportCount + "\n" + "expectedTupleCount: " + expectedTupleCount + "\n" + "receivedTuples: " + receivedTuples + "\n" + "failed: " + failed + "\n" + taskEmittedTuples.toString(); } }}
  • CoordinatedBolt在declareOutputFields的时候,除了调用代理bolt的declareOutputFields外,还declareStream,给Constants.COORDINATED_STREAM_ID发射Fields("id", "count")
  • execute方法首先保证每个requestId都有一个TrackingInfo,它记录了expectedTupleCount以及receivedTuples统计数,还有taskEmittedTuples(这里命名有点歧义,其实是这里维护的是当前bolt发射给下游bolt的task的tuple数量,用于emitDirect告知下游bolt的task它应该接收到的tuple数量(具体是在checkFinishId方法中,在finished的时候发送),下游bolt接收到该统计数之后更新expectedTupleCount)
  • execute方法接收到的tuple有几类,一类是TupleType.ID(_idStreamSpec不为null的情况下)、一类是TupleType.COORD(接收Fields("id", "count"),并执行checkFinishId,判断是否应该结束)、一类是TupleType.REGULAR(正常的执行bolt的execute方法)
  • checkFinishId会判断track.reportCount == _numSourceReports以及track.expectedTupleCount == track.receivedTuples,如果满足条件则标记track.finished = true,同时通知下游bolt它应该接收到多少数量的tuple(如果还有的话)。

JoinResult

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java

public class JoinResult extends BaseRichBolt {    public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);    String returnComponent;    Map
returns = new HashMap<>(); Map
results = new HashMap<>(); OutputCollector _collector; public JoinResult(String returnComponent) { this.returnComponent = returnComponent; } public void prepare(Map
map, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { Object requestId = tuple.getValue(0); if (tuple.getSourceComponent().equals(returnComponent)) { returns.put(requestId, tuple); } else { results.put(requestId, tuple); } if (returns.containsKey(requestId) && results.containsKey(requestId)) { Tuple result = results.remove(requestId); Tuple returner = returns.remove(requestId); LOG.debug(result.getValue(1).toString()); List
anchors = new ArrayList<>(); anchors.add(result); anchors.add(returner); _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1))); _collector.ack(result); _collector.ack(returner); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result", "return-info")); }}
  • 如果tuple是PrepareRequest发送过来的,则将tuple放入returns,否则放入results
  • 之后判断returns及results两个map是否同时都有该requestId,如果有表示匹配出了结果,则往下游emit数据
  • emit的第一个字段为result,第二个为returnInfo

ReturnResults

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java

public class ReturnResults extends BaseRichBolt {    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS    static final long serialVersionUID = -774882142710631591L;    OutputCollector _collector;    boolean local;    Map
_conf; Map
_clients = new HashMap
(); @Override public void prepare(Map
topoConf, TopologyContext context, OutputCollector collector) { _conf = topoConf; _collector = collector; local = topoConf.get(Config.STORM_CLUSTER_MODE).equals("local"); } @Override public void execute(Tuple input) { String result = (String) input.getValue(0); String returnInfo = (String) input.getValue(1); if (returnInfo != null) { Map
retMap; try { retMap = (Map
) JSONValue.parseWithException(returnInfo); } catch (ParseException e) { LOG.error("Parseing returnInfo failed", e); _collector.fail(input); return; } final String host = (String) retMap.get("host"); final int port = ObjectReader.getInt(retMap.get("port")); String id = (String) retMap.get("id"); DistributedRPCInvocations.Iface client; if (local) { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); } else { List server = new ArrayList() { { add(host); add(port); }}; if (!_clients.containsKey(server)) { try { _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); } catch (TTransportException ex) { throw new RuntimeException(ex); } } client = _clients.get(server); } int retryCnt = 0; int maxRetries = 3; while (retryCnt < maxRetries) { retryCnt++; try { client.result(id, result); _collector.ack(input); break; } catch (AuthorizationException aze) { LOG.error("Not authorized to return results to DRPC server", aze); _collector.fail(input); throw new RuntimeException(aze); } catch (TException tex) { if (retryCnt >= maxRetries) { LOG.error("Failed to return results to DRPC server", tex); _collector.fail(input); } reconnectClient((DRPCInvocationsClient) client); } } } } private void reconnectClient(DRPCInvocationsClient client) { if (client instanceof DRPCInvocationsClient) { try { LOG.info("reconnecting... "); client.reconnectClient(); //Blocking call } catch (TException e2) { LOG.error("Failed to connect to DRPC server", e2); } } } @Override public void cleanup() { for (DRPCInvocationsClient c : _clients.values()) { c.close(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { }}
  • ReturnResults主要是将结果发送给请求的DRPCInvocationsClient
  • returnInfo里头包含了要将结果发送到的目标host、port,根据host、port构造DRPCInvocationsClient
  • 之后调用DRPCInvocationsClient.result(id, result)方法将结果返回,默认重试3次,如果是AuthorizationException则直接fail,如果成功则ack

小结

  • LinearDRPCTopologyBuilder在v0.9.1-incubating版本的时候被标记为@Deprecated(2012年月),当时认为Trident的newDRPCStream的替代,不过这样的话要用drpc就得使用Trident,所以后来(2018年4月)移除掉该标志,在2.0.0, 1.1.3, 1.0.7, 1.2.2版本均已经不是废弃标记
  • LinearDRPCTopologyBuilder包装组合了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,对外暴露简单的api无需用户在构造这些component

    • DRPCSpout主要是构造args以及returnInfo信息;
    • PrepareRequest将数据分流,发往ARGS_STREAM、RETURN_STREAM、ID_STREAM;
    • CoordinatedBolt主要是保障这些bolt之间的tuple被完整传递及ack;
    • JoinResult主要是匹配requestId及结果,将请求与响应的数据匹配上,然后发送到下游;
    • ReturnResults根据returnInfo将数据返回给Client端
  • 使用LinearDRPCTopologyBuilder,对于第一个bolt,其输入为Fields("request", "args");对最后一个bolt要求输出字段为new Fields("id", "result");对于非最后一个bolt要求输出字段的第一个字段为id,即requestId,方便CoordinatedBolt进行追踪统计,确认bolt是否成功接收上游bolt发送的所有tuple。

doc

转载地址:http://yrkta.baihongyu.com/

你可能感兴趣的文章
openstack 制作大于2TB根分区自动扩容的CENTOS镜像
查看>>
Unbuntu安装遭遇 vmware上的Easy install模式
查看>>
几个常用的ASP木马
查看>>
python分析postfix邮件日志的状态
查看>>
Mysql-5.6.x多实例配置
查看>>
psutil
查看>>
在git@osc上托管自己的代码
查看>>
机器学习算法:朴素贝叶斯
查看>>
小五思科技术学习笔记之扩展访问列表
查看>>
使用Python脚本检验文件系统数据完整性
查看>>
使用MDT部署Windows Server 2003 R2
查看>>
Redhat as5安装Mysql5.0.28
查看>>
通过TMG发布ActiveSync
查看>>
Web服务器的配置与管理(4) 配置访问权限和安全
查看>>
点石成金:“硅业报国”不仅是理念
查看>>
联络中心演化的四个特征
查看>>
《SQL与关系数据库理论——如何编写健壮的SQL代码》》一1.4 原始模型回顾
查看>>
云数据中心UPS供电系统需具备的特性
查看>>
低碳出行下的新宠儿:多方通信下的云视频会议
查看>>
京东发布物联网战略 将推出智子万家升级体验计划
查看>>