kafka 获取metadata
问题:
<Failed to update metadata after 3000 ms.>
sender类的发送数据时候,会
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据
NetworkClient类,方法poll,检查metadata是否需要更新
方法:
/** * Add a metadata request to the list of sends if we can make one */ private void maybeUpdateMetadata(List<NetworkSend> sends, long now) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. // 最新的可用node Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); // mark the timestamp for no node available to connect this.lastNoNodeAvailableMs = now; return; } log.debug("Trying to send metadata request to node {}", node.id()); if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set<String> topics = metadata.topics(); this.metadataFetchInProgress = true; //生成metadata请求,加入到sends队列中 ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); sends.add(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); // If initiateConnect failed immediately, this node will be put into blackout and we // should allow immediately retrying in case there is another candidate node. If it // is still connecting, the worst case is that we end up setting a longer timeout // on the next round and then wait for the response. } else { // connected, but can't send more OR connecting // In either case, we just need to wait for a network tevent to let us know the seleced // connection might be usable again. this.lastNoNodeAvailableMs = now; } }
选择一个请求最少,并且链接状态可用的host,作为获取metadata的host
这里
/** * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a * connection if all existing connections are in use. This method will never choose a node for which there is no * existing connection and from which we have disconnected within the reconnect backoff period. * @return The node with the fewest in-flight requests. */ public Node leastLoadedNode(long now) { List<Node> nodes = this.metadata.fetch().nodes(); int inflight = Integer.MAX_VALUE; Node found = null; for (int i = 0; i < nodes.size(); i++) { int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); Node node = nodes.get(idx); int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { // if we find an established connection with no in-flight requests we can stop right away return node; } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; } } return found; }
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29