storm的CustomStreamGrouping

本文主要研究一下storm的CustomStreamGrouping

CustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java

public interface CustomStreamGrouping extends Serializable {
 /**
 * Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the
 * target tasks.
 *
 * It also tells the grouping the metadata on the stream this grouping will be used on.
 */
 void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 /**
 * This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns
 * the tasks to send the tuples to.
 *
 * @param values the values to group on
 */
 List<Integer> chooseTasks(int taskId, List<Object> values);
}
  • 这里定义了prepare以及chooseTasks方法
  • GrouperFactory里头定义了FieldsGrouper、GlobalGrouper、NoneGrouper、AllGrouper、BasicLoadAwareCustomStreamGrouping
  • 另外org.apache.storm.grouping包里头也定义了ShuffleGrouping、PartialKeyGrouping、LoadAwareShuffleGrouping

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

public static class FieldsGrouper implements CustomStreamGrouping {
 private Fields outFields;
 private List<List<Integer>> targetTasks;
 private Fields groupFields;
 private int numTasks;
 public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
 this.outFields = outFields;
 this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
 }
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 this.targetTasks = new ArrayList<List<Integer>>();
 for (Integer targetTask : targetTasks) {
 this.targetTasks.add(Collections.singletonList(targetTask));
 }
 this.numTasks = targetTasks.size();
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
 return targetTasks.get(targetTaskIndex);
 }
 }
  • 对选中fields的values通过TupleUtils.chooseTaskIndex选择task下标;chooseTaskIndex主要是采用Arrays.deepHashCode取哈希值然后对numTask向下取模

GlobalGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

public static class GlobalGrouper implements CustomStreamGrouping {
 private List<Integer> targetTasks;
 public GlobalGrouper() {
 }
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 this.targetTasks = targetTasks;
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 if (targetTasks.isEmpty()) {
 return null;
 }
 // It's possible for target to have multiple tasks if it reads multiple sources
 return Collections.singletonList(targetTasks.get(0));
 }
 }
  • 这里固定取第一个task,即targetTasks.get(0)

NoneGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

public static class NoneGrouper implements CustomStreamGrouping {
 private final Random random;
 private List<Integer> targetTasks;
 private int numTasks;
 public NoneGrouper() {
 random = new Random();
 }
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 this.targetTasks = targetTasks;
 this.numTasks = targetTasks.size();
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 int index = random.nextInt(numTasks);
 return Collections.singletonList(targetTasks.get(index));
 }
 }
  • 这里通过random.nextInt(numTasks)随机取task

AllGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

public static class AllGrouper implements CustomStreamGrouping {
 private List<Integer> targetTasks;
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 this.targetTasks = targetTasks;
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 return targetTasks;
 }
 }
  • 这里返回所有的targetTasks

ShuffleGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
 private ArrayList<List<Integer>> choices;
 private AtomicInteger current;
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 choices = new ArrayList<List<Integer>>(targetTasks.size());
 for (Integer i : targetTasks) {
 choices.add(Arrays.asList(i));
 }
 current = new AtomicInteger(0);
 Collections.shuffle(choices, new Random());
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 int rightNow;
 int size = choices.size();
 while (true) {
 rightNow = current.incrementAndGet();
 if (rightNow < size) {
 return choices.get(rightNow);
 } else if (rightNow == size) {
 current.set(0);
 return choices.get(0);
 }
 } // race condition with another thread, and we lost. try again
 }
}
  • 这里在prepare的时候对ArrayList<List<Integer>> choices进行随机化
  • 采用current.incrementAndGet()实现round robbin的效果,超过size的时候重置返回第一个,没有超过则返回incr后的index的值

PartialKeyGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
 private static final long serialVersionUID = -1672360572274911808L;
 private List<Integer> targetTasks;
 private Fields fields = null;
 private Fields outFields = null;
 private AssignmentCreator assignmentCreator;
 private TargetSelector targetSelector;
 public PartialKeyGrouping() {
 this(null);
 }
 public PartialKeyGrouping(Fields fields) {
 this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
 }
 public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
 this(fields, assignmentCreator, new BalancedTargetSelector());
 }
 public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
 this.fields = fields;
 this.assignmentCreator = assignmentCreator;
 this.targetSelector = targetSelector;
 }
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 this.targetTasks = targetTasks;
 if (this.fields != null) {
 this.outFields = context.getComponentOutputFields(stream);
 }
 }
 @Override
 public List<Integer> chooseTasks(int taskId, List<Object> values) {
 List<Integer> boltIds = new ArrayList<>(1);
 if (values.size() > 0) {
 final byte[] rawKeyBytes = getKeyBytes(values);
 final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
 final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
 boltIds.add(selectedTask);
 }
 return boltIds;
 }
 //......
}
  • 这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个

LoadAwareCustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java

public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
 void refreshLoad(LoadMapping loadMapping);
}
  • 继承了CustomStreamGrouping接口,然后新定义了refreshLoad方法用于刷新负载,这里的负载主要是executor的receiveQueue的负载(qMetrics.population() / qMetrics.capacity())
  • LoadAwareCustomStreamGrouping有几个实现类,有BasicLoadAwareCustomStreamGrouping以及LoadAwareShuffleGrouping

小结

  • storm的CustomStreamGrouping接口定义了chooseTasks方法,用于选择tasks来处理tuples
  • ShuffleGrouping类似round robbin,FieldsGrouper则根据所选字段值采用Arrays.deepHashCode取哈希值然后对numTask向下取模,GlobalGrouper返回index为0的taskId,NoneGrouper则随机返回,AllGrouper不做过滤返回所有taskId,PartialKeyGrouping则使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标,然后选择使用次数少的那个task。
  • LoadAware的grouping有BasicLoadAwareCustomStreamGrouping以及LoadAwareShuffleGrouping,他们都实现了LoadAwareCustomStreamGrouping接口,该接口定义了refreshLoad方法,用于动态刷新负载,这里的负载主要是executor的receiveQueue的负载(qMetrics.population() / qMetrics.capacity())

doc

  • Stream groupings
{!-- PGC_COLUMN --}

storm的CustomStreamGrouping

相关推荐