kafka 获取metadata

问题:

<Failed to update metadata after 3000 ms.>

sender类的发送数据时候,会

List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据

NetworkClient类,方法poll,检查metadata是否需要更新

方法:

/**
     * Add a metadata request to the list of sends if we can make one
     */
    private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
        // Beware that the behavior of this method and the computation of timeouts for poll() are
        // highly dependent on the behavior of leastLoadedNode.
        // 最新的可用node
        Node node = this.leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            // mark the timestamp for no node available to connect
            this.lastNoNodeAvailableMs = now;
            return;
        }

        log.debug("Trying to send metadata request to node {}", node.id());
        if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
            Set<String> topics = metadata.topics();
            this.metadataFetchInProgress = true;
            //生成metadata请求,加入到sends队列中
            ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
            sends.add(metadataRequest.request());
            this.inFlightRequests.add(metadataRequest);
        } else if (connectionStates.canConnect(node.id(), now)) {
            // we don't have a connection to this node right now, make one
            log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
            initiateConnect(node, now);
            // If initiateConnect failed immediately, this node will be put into blackout and we
            // should allow immediately retrying in case there is another candidate node. If it
            // is still connecting, the worst case is that we end up setting a longer timeout
            // on the next round and then wait for the response.
        } else { // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network tevent to let us know the seleced
            // connection might be usable again.
            this.lastNoNodeAvailableMs = now;
        }
    }

    

选择一个请求最少,并且链接状态可用的host,作为获取metadata的host

这里

/**
     * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
     * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
     * connection if all existing connections are in use. This method will never choose a node for which there is no
     * existing connection and from which we have disconnected within the reconnect backoff period.
     * @return The node with the fewest in-flight requests.
     */
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadata.fetch().nodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        for (int i = 0; i < nodes.size(); i++) {
            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
            if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
                // if we find an established connection with no in-flight requests we can stop right away
                return node;
            } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) {
                // otherwise if this is the best we have found so far, record that
                inflight = currInflight;
                found = node;
            }
        }

        return found;
    }

相关推荐