聊聊elasticsearch的ElectMasterService
序
本文主要研究一下elasticsearch的ElectMasterService
ElectMasterService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java
public class ElectMasterService { private static final Logger logger = LogManager.getLogger(ElectMasterService.class); public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING = Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope, Property.Deprecated); private volatile int minimumMasterNodes; /** * a class to encapsulate all the information about a candidate in a master election * that is needed to decided which of the candidates should win */ public static class MasterCandidate { public static final long UNRECOVERED_CLUSTER_VERSION = -1; final DiscoveryNode node; final long clusterStateVersion; public MasterCandidate(DiscoveryNode node, long clusterStateVersion) { Objects.requireNonNull(node); assert clusterStateVersion >= -1 : "got: " + clusterStateVersion; assert node.isMasterNode(); this.node = node; this.clusterStateVersion = clusterStateVersion; } public DiscoveryNode getNode() { return node; } public long getClusterStateVersion() { return clusterStateVersion; } @Override public String toString() { return "Candidate{" + "node=" + node + ", clusterStateVersion=" + clusterStateVersion + '}'; } /** * compares two candidates to indicate which the a better master. * A higher cluster state version is better * * @return -1 if c1 is a batter candidate, 1 if c2. */ public static int compare(MasterCandidate c1, MasterCandidate c2) { // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted // list, so if c2 has a higher cluster state version, it needs to come first. int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); if (ret == 0) { ret = compareNodes(c1.getNode(), c2.getNode()); } return ret; } } public ElectMasterService(Settings settings) { this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes); } public void minimumMasterNodes(int minimumMasterNodes) { this.minimumMasterNodes = minimumMasterNodes; } public int minimumMasterNodes() { return minimumMasterNodes; } public int countMasterNodes(Iterable<DiscoveryNode> nodes) { int count = 0; for (DiscoveryNode node : nodes) { if (node.isMasterNode()) { count++; } } return count; } public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) { if (candidates.isEmpty()) { return false; } if (minimumMasterNodes < 1) { return true; } assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() : "duplicates ahead: " + candidates; return candidates.size() >= minimumMasterNodes; } /** * Elects a new master out of the possible nodes, returning it. Returns {@code null} * if no master has been elected. */ public MasterCandidate electMaster(Collection<MasterCandidate> candidates) { assert hasEnoughCandidates(candidates); List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates); sortedCandidates.sort(MasterCandidate::compare); return sortedCandidates.get(0); } /** selects the best active master to join, where multiple are discovered */ public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) { return activeMasters.stream().min(ElectMasterService::compareNodes).get(); } public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) { final int count = countMasterNodes(nodes); return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes); } public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) { final int count = countMasterNodes(nodes); return count > 1 && minimumMasterNodes <= count / 2; } public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, ClusterState newState) { // check if min_master_nodes setting is too low and log warning if (hasTooManyMasterNodes(oldState.nodes()) == false && hasTooManyMasterNodes(newState.nodes())) { logger.warn("value for setting \"{}\" is too low. This can result in data loss! Please set it to at least a quorum of master-" + "eligible nodes (current value: [{}], total number of master-eligible nodes used for publishing in this round: [{}])", ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNodes(), newState.getNodes().getMasterNodes().size()); } } /** * Returns the given nodes sorted by likelihood of being elected as master, most likely first. * Non-master nodes are not removed but are rather put in the end */ static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) { ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes); CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes); return sortedNodes; } /** * Returns a list of the next possible masters. */ public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes, int numberOfPossibleMasters) { List<DiscoveryNode> sortedNodes = sortedMasterNodes(Arrays.asList(nodes.toArray(DiscoveryNode.class))); if (sortedNodes == null) { return new DiscoveryNode[0]; } List<DiscoveryNode> nextPossibleMasters = new ArrayList<>(numberOfPossibleMasters); int counter = 0; for (DiscoveryNode nextPossibleMaster : sortedNodes) { if (++counter >= numberOfPossibleMasters) { break; } nextPossibleMasters.add(nextPossibleMaster); } return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]); } private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) { List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes); if (possibleNodes.isEmpty()) { return null; } // clean non master nodes possibleNodes.removeIf(node -> !node.isMasterNode()); CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes); return possibleNodes; } /** master nodes go before other nodes, with a secondary sort by id **/ private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) { if (o1.isMasterNode() && !o2.isMasterNode()) { return -1; } if (!o1.isMasterNode() && o2.isMasterNode()) { return 1; } return o1.getId().compareTo(o2.getId()); } }
- ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes
- ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
- electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回
ZenDiscovery.findMaster
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener { //...... private DiscoveryNode findMaster() { logger.trace("starting to ping"); List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); if (fullPingResponses.size() == 0) { sb.append(" {none}"); } else { for (ZenPing.PingResponse pingResponse : fullPingResponses) { sb.append("\n\t--> ").append(pingResponse); } } logger.trace("full ping responses:{}", sb); } final DiscoveryNode localNode = transportService.getLocalNode(); // add our selves assert fullPingResponses.stream().map(ZenPing.PingResponse::node) .filter(n -> n.equals(localNode)).findAny().isPresent() == false; fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState())); // filter responses final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); List<DiscoveryNode> activeMasters = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) { activeMasters.add(pingResponse.master()); } } // nodes discovered during pinging List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { if (pingResponse.node().isMasterNode()) { masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); } } if (activeMasters.isEmpty()) { if (electMaster.hasEnoughCandidates(masterCandidates)) { final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); logger.trace("candidate {} won election", winner); return winner.getNode(); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again", masterCandidates, electMaster.minimumMasterNodes()); return null; } } else { assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes return electMaster.tieBreakActiveMasters(activeMasters); } } //...... }
- ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回
小结
- ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes;ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
- electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回
- ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回
doc
相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29