spring-kafka源码分析一(Producer)


spring kafka uml类图
spring-kafka是运用spring的概念基于apache kafka(linkedin开源已经捐献给apache基金会)消息解决方案开发的一个java client端。它提供了一些接口来更方便收发消息与kafka server端交互,并且支持spring 注解形式。并且我们知道消息是一个异步调用逻辑,我们下文也会分析源码体现出spring kafka怎么实现这个异步的过程。下面我们通过一个demo来分析下spring kafka的源码。

public static void main(String[] args) throws InterruptedException {
        ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
        containerProps.setMessageListener(new MessageListener<Integer, String>() {
            @Override
            public void onMessage(ConsumerRecord<Integer, String> message) {
                System.out.println("received: " + message);
            }

        });
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("testAuto");
        container.start();
---------------------------------------------------------
        KafkaTemplate<Integer, String> template = createTemplate();
        template.setDefaultTopic("topic1");
        template.sendDefault(0, "foo");
        template.flush();
        container.stop();
    }

上述方法被“—-”分割为两部分,上面一部分是从kafka server接受消息的相关配置,下面一部分是发消息给kafka server的配置。本文我们也主要从这两个方面来分析spring-kafka的源码(消息系统理所当然就是收发消息的嘛)

发消息主线程(调用发送消息的线程)

KafkaTemplate

KafkaTemplate是spring提供给我们用来发送消息的实现类,我们看下怎么创建一个KafkaTemplate

KafkaTemplate<Integer, String> template = createTemplate();

private static KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf =
                new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(pf);
        return template;
    }

private static Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

可以看到,你首先需要创建一个DefaultKafkaConsumerFactory,而创建
DefaultKafkaConsumerFactory需要指定kafka server地址相关的一些配置参数,然后调用KafkaTemplate的构造方法把factory当作参数传进去

   public KafkaTemplate(ProducerFactory<K, V> producerFactory)    {
	this(producerFactory, false);
}

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
	this.producerFactory = producerFactory;
	this.autoFlush = autoFlush;
}

解释下autoFlush,默认为false,若设置为true,则代表每次send后自动flush。

主线程KafkaTemplate.send发消息

文章开头的main方法中template.sendDefault最终会调用KafkaTemplate.doSend方法。

@Override
	public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
		ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
		return doSend(producerRecord);
	}

	protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
		getTheProducer();  -------------(1)--------------
		if (this.logger.isTraceEnabled()) {
			this.logger.trace("Sending: " + producerRecord);
		}
		final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
--(2)---getTheProducer().send(producerRecord, new Callback() {

			@Override
			public void onCompletion(RecordMetadata metadata, Exception exception) {
				if (exception == null) {
					future.set(new SendResult<>(producerRecord, metadata));
					if (KafkaTemplate.this.producerListener != null
							&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
						KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
								producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
					}
				}
				else {
					future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
					if (KafkaTemplate.this.producerListener != null) {
						KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
								producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
					}
				}
			}

		});
		if (this.autoFlush) {
			flush();
		}
		if (this.logger.isTraceEnabled()) {
			this.logger.trace("Sent: " + producerRecord);
		}
		return future;
	}

我们下面关注下getTheProducer(),时序图如下
getTheProducer()时序图

KafkaTemplate

private Producer<K, V> getTheProducer() {
		if (this.producer == null) {
			synchronized (this) {
				if (this.producer == null) {
					this.producer = this.producerFactory.createProducer();
				}
			}
		}
		return this.producer;
	}

我们可以看到KafkaTemplate里面getTheProducer方法会初始化一个KafkaProducer,并且采用了synchronized,保证了只会new一次。我们从KafkaProducer类开头的注释可以看到,KafkaProducer是kafka client端publish record到kafka cluster上的。并且它是线程安全的,并且在多线程环境下共享一个实例比多个实例效率高很多。
另外我们关注到上面时序图最后一步KafkaProducer构造方法会初始化producer相关的参数,比如会初始化一个类似于mesageQueue的RecordAccumulator(这个后面会提到),并启动一个sender线程,这个线程基本就是kafaka producer的工作io线程(可能大家会想难道producer就靠一个线程,没错,我们等会会讲,它也用到了nio的selector事件循环机制,讲多个kafka server clusters对应的channel都绑定到这个selector上面)。

KafkaProducer.send

继续看最上面一段代码我们标记的—(2)—getTheProducer().send(x,x)。这个getTheProducer()会返回一个CloseSafeProducer,从本文第一张图可以看到这个类其实有个代理类KafkaProducer。最终会调用到KafkaProducer.doSend(ProducerRecord<K, V> record, Callback callback)方法。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    // first make sure the metadata for the topic is available
    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
    long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
    byte[] serializedKey;
    try {
        serializedKey = keySerializer.serialize(record.topic(), record.key());
    } catch (ClassCastException cce) {
        throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                " specified in key.serializer");
    }
    byte[] serializedValue;
    try {
        serializedValue = valueSerializer.serialize(record.topic(), record.value());
    } catch (ClassCastException cce) {
        throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                " specified in value.serializer");
    }
    int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
    int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
    ensureValidRecordSize(serializedSize);
    tp = new TopicPartition(record.topic(), partition);
    long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
    log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
    // producer callback will make sure to call both 'callback' and interceptor callback
    Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
    if (result.batchIsFull || result.newBatchCreated) {
        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        this.sender.wakeup();
    }
    return result.future;
    // handling exceptions and record the errors;
    // for API exceptions return them in the future,
    // for other exceptions throw directly
}

上面源码我们可以看到它首先会把消息的key和value都序列化,然后得到要发送到的partition,然后验证消息的长度是否合法,最后会调用到RecordAccumulator.append方法。这个方法会把消息append到这个accumulator,以供producer io子线程发给kafka server。
总结上面的send过程,我们可以看到主线程(调用template.sendDefault线程)并没有把消息真正发送到kafka server,只是把消息累加到一个类似于queue的RecordAccumulator上面。(正因为这样,所以发消息才会很迅速,因为都是操作内存,没有io)我们不难猜测上文我们提到的sender子线程会获取RecordAccumulator上的消息record,然后把消息发给kafka server,这也体现了发送消息确实是一个异步的过程

真正与server交互的线程

sender线程时序图

kafka producer线程时序图

Sender

通过Sender类开头的注释我们可以知道它是真正与kafka server交互的一个后台线程,它会把待发送的消息发给合适的server集群节点。它实现了Runnable接口,来看下它的run方法。

public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

上述代码大致可以分为3部分:

  • 如果sender线程还处于running状态,则调用run方法发送消息。
  • 如果sender线程不在running状态,调用者也没有强制close,accumulator中还有待发送的request或者还有等待kafka server返回ack的请求,都会继续调用run方法。
  • 若sender线程不在running状态,并且已经强制关闭了,则放弃所有还未完成的request

我们主要分析下第一部分中的run方法

void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
    }

不难看到它先从accumulator(之前template.sendDefault的线程已经把待发的record append到accumulator了)读取待发送的消息信息。考虑到这段代码比较复杂,我们只关注一下两部分:

  • 怎么与kafka server端建立连接
  • 向kafka server端发送数据

与kafka server端建立连接

NetworkClient类

/**
     * Initiate a connection to the given node
     */
    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) {
            /* attempt failed, we'll try again after the backoff */
            connectionStates.disconnected(nodeConnectionId, now);
            /* maybe the problem is our metadata, update it */
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

上述这个方法也是在sender子线程里面完成的,它会调用selector.connect方法(这个类是spring-kafka封装的一个类似于nio selector的实现类,它通过持有一个java.nio.channels.Selector成员变量,采用非阻塞事件驱动形式,用来实现非阻塞的连接,发送,接受消息,之前我们再nio源码分析中已经讲到过),源码如下:

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);

        if (connected) {
            // OP_CONNECT won't trigger for immediately connected channels
            log.debug("Immediately connected to node {}", channel.id());
            immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

上述代码我们在之前的博文中Nio学习简单理解已经分析过了,首先与kafka serve端建立了一个non blocking 的SocketChannel,然后将该channel注册到一个java.nio.channels.Selector上面,并注册OP_CONNECT事件,可以看到这是很典型的nio selector的用法。

真正的向kafka sever发送消息

先来看下KafkaChannel类里面的这个方法

public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

可以看到,这里又注册了OP_WRITE事件。

然后我们再回到之前的sender run(long time)方法,看下这个方法最后一部分

 for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);

它先把待发给kafka server的request放进queue里面(client.send方法),然后调用client.poll方法

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

这个方法又会调用this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs))方法,并且会调用回调函数,这里我们先来讲解一下回调

ListenableFuture<SendResult<Integer, String>> future = template.sendDefault(0, "foo");
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                System.out.println("xiaoming send success");
            }

            @Override
            public void onFailure(Throwable ex) {
            }

        });

最早发送消息template.sendDefault方法会返回一个ListenableFuture<SendResult<Integer, String>> future参数,如果你对发送消息的结果(是否发送成功)之类的感兴趣,当然你可以直接调用future.get,但是这是个阻塞方法,会阻塞当前线程。所以你可以像上面一样future.addCallback添加一个回调参数。
最终这个response.request().callback().onComplete(response)就会调用这个回调函数(具体这个回调是怎么实现的我这里就不展开了)。
所以这个回调函数显示不是发生在template.sendDefault的线程里面,它是在kafka producer的工作io线程中,也就是nio eventloop那个事件循环的线程中,所以我们这个回调函数里面最好不要直接进行一些耗时的操作,这样会影响整个producer的工作。实在要进行耗时操作,可以启动一个子线程来完成

然后我们回到this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs))这个方法。这个方法最终会调用到Selector.pollSelectionKeys方法

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }

                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }

通过这个方法的名字就能看到这里是找到之前注册的OP_CONNECT,OP_WRITE,OP_READ(后面接受消息会讲)这些事件已经就绪的事件,然后调用相应方法来处理。对于发送消息,就是想上面那样,如果channel.ready() && key.isWritable()那么就调用Send send = channel.write()

public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }

真正向socker写数据,写完必须删除OP_WRITE事件,不然会造成空循环,这个我们之前nio那篇文章中也已经讲过。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
spring-kafka源码分析二(Consumer) spring-kafka源码分析二(Consumer)
上一篇文章我们分析了spring-kafka的producer,这篇文章我们就要来分析下consumer。 ContainerProperties containerProps = new ContainerProperties(
2016-12-30
下一篇 
spring-cloud-eureka源码分析二(server端) spring-cloud-eureka源码分析二(server端)
上一篇文章我们介绍了eureka client的工作模式,我们知道eureka client是通过http rest来与eureka server交互,来注册服务,续约服务,取消服务,服务查询。所以,eureka server肯定要提供上述
2016-12-02
  目录