spring-kafka是运用spring的概念基于apache kafka(linkedin开源已经捐献给apache基金会)消息解决方案开发的一个java client端。它提供了一些接口来更方便收发消息与kafka server端交互,并且支持spring 注解形式。并且我们知道消息是一个异步调用逻辑,我们下文也会分析源码体现出spring kafka怎么实现这个异步的过程。下面我们通过一个demo来分析下spring kafka的源码。
public static void main(String[] args) throws InterruptedException {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
System.out.println("received: " + message);
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
---------------------------------------------------------
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, "foo");
template.flush();
container.stop();
}
上述方法被“—-”分割为两部分,上面一部分是从kafka server接受消息的相关配置,下面一部分是发消息给kafka server的配置。本文我们也主要从这两个方面来分析spring-kafka的源码(消息系统理所当然就是收发消息的嘛)
发消息主线程(调用发送消息的线程)
KafkaTemplate
KafkaTemplate是spring提供给我们用来发送消息的实现类,我们看下怎么创建一个KafkaTemplate
KafkaTemplate<Integer, String> template = createTemplate();
private static KafkaTemplate<Integer, String> createTemplate() {
Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(pf);
return template;
}
private static Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
可以看到,你首先需要创建一个DefaultKafkaConsumerFactory,而创建
DefaultKafkaConsumerFactory需要指定kafka server地址相关的一些配置参数,然后调用KafkaTemplate的构造方法把factory当作参数传进去
public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
this(producerFactory, false);
}
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
this.producerFactory = producerFactory;
this.autoFlush = autoFlush;
}
解释下autoFlush,默认为false,若设置为true,则代表每次send后自动flush。
主线程KafkaTemplate.send发消息
文章开头的main方法中template.sendDefault最终会调用KafkaTemplate.doSend方法。
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
getTheProducer(); -------------(1)--------------
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
--(2)---getTheProducer().send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null
&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
}
}
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
}
}
}
});
if (this.autoFlush) {
flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
我们下面关注下getTheProducer(),时序图如下
KafkaTemplate
private Producer<K, V> getTheProducer() {
if (this.producer == null) {
synchronized (this) {
if (this.producer == null) {
this.producer = this.producerFactory.createProducer();
}
}
}
return this.producer;
}
我们可以看到KafkaTemplate里面getTheProducer方法会初始化一个KafkaProducer,并且采用了synchronized,保证了只会new一次。我们从KafkaProducer类开头的注释可以看到,KafkaProducer是kafka client端publish record到kafka cluster上的。并且它是线程安全的,并且在多线程环境下共享一个实例比多个实例效率高很多。
另外我们关注到上面时序图最后一步KafkaProducer构造方法会初始化producer相关的参数,比如会初始化一个类似于mesageQueue的RecordAccumulator(这个后面会提到),并启动一个sender线程,这个线程基本就是kafaka producer的工作io线程(可能大家会想难道producer就靠一个线程,没错,我们等会会讲,它也用到了nio的selector事件循环机制,讲多个kafka server clusters对应的channel都绑定到这个selector上面)。
KafkaProducer.send
继续看最上面一段代码我们标记的—(2)—getTheProducer().send(x,x)。这个getTheProducer()会返回一个CloseSafeProducer,从本文第一张图可以看到这个类其实有个代理类KafkaProducer。最终会调用到KafkaProducer.doSend(ProducerRecord<K, V> record, Callback callback)方法。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}
上面源码我们可以看到它首先会把消息的key和value都序列化,然后得到要发送到的partition,然后验证消息的长度是否合法,最后会调用到RecordAccumulator.append方法。这个方法会把消息append到这个accumulator,以供producer io子线程发给kafka server。
总结上面的send过程,我们可以看到主线程(调用template.sendDefault线程)并没有把消息真正发送到kafka server,只是把消息累加到一个类似于queue的RecordAccumulator上面。(正因为这样,所以发消息才会很迅速,因为都是操作内存,没有io)我们不难猜测上文我们提到的sender子线程会获取RecordAccumulator上的消息record,然后把消息发给kafka server,这也体现了发送消息确实是一个异步的过程
真正与server交互的线程
sender线程时序图
Sender
通过Sender类开头的注释我们可以知道它是真正与kafka server交互的一个后台线程,它会把待发送的消息发给合适的server集群节点。它实现了Runnable接口,来看下它的run方法。
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
上述代码大致可以分为3部分:
- 如果sender线程还处于running状态,则调用run方法发送消息。
- 如果sender线程不在running状态,调用者也没有强制close,accumulator中还有待发送的request或者还有等待kafka server返回ack的请求,都会继续调用run方法。
- 若sender线程不在running状态,并且已经强制关闭了,则放弃所有还未完成的request
我们主要分析下第一部分中的run方法
void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}
不难看到它先从accumulator(之前template.sendDefault的线程已经把待发的record append到accumulator了)读取待发送的消息信息。考虑到这段代码比较复杂,我们只关注一下两部分:
- 怎么与kafka server端建立连接
- 向kafka server端发送数据
与kafka server端建立连接
NetworkClient类
/**
* Initiate a connection to the given node
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
上述这个方法也是在sender子线程里面完成的,它会调用selector.connect方法(这个类是spring-kafka封装的一个类似于nio selector的实现类,它通过持有一个java.nio.channels.Selector成员变量,采用非阻塞事件驱动形式,用来实现非阻塞的连接,发送,接受消息,之前我们再nio源码分析中已经讲到过),源码如下:
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
上述代码我们在之前的博文中Nio学习简单理解已经分析过了,首先与kafka serve端建立了一个non blocking 的SocketChannel,然后将该channel注册到一个java.nio.channels.Selector上面,并注册OP_CONNECT事件,可以看到这是很典型的nio selector的用法。
真正的向kafka sever发送消息
先来看下KafkaChannel类里面的这个方法
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
可以看到,这里又注册了OP_WRITE事件。
然后我们再回到之前的sender run(long time)方法,看下这个方法最后一部分
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
它先把待发给kafka server的request放进queue里面(client.send方法),然后调用client.poll方法
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
这个方法又会调用this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs))方法,并且会调用回调函数,这里我们先来讲解一下回调
ListenableFuture<SendResult<Integer, String>> future = template.sendDefault(0, "foo");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("xiaoming send success");
}
@Override
public void onFailure(Throwable ex) {
}
});
最早发送消息template.sendDefault方法会返回一个ListenableFuture<SendResult<Integer, String>> future参数,如果你对发送消息的结果(是否发送成功)之类的感兴趣,当然你可以直接调用future.get,但是这是个阻塞方法,会阻塞当前线程。所以你可以像上面一样future.addCallback添加一个回调参数。
最终这个response.request().callback().onComplete(response)就会调用这个回调函数(具体这个回调是怎么实现的我这里就不展开了)。
所以这个回调函数显示不是发生在template.sendDefault的线程里面,它是在kafka producer的工作io线程中,也就是nio eventloop那个事件循环的线程中,所以我们这个回调函数里面最好不要直接进行一些耗时的操作,这样会影响整个producer的工作。实在要进行耗时操作,可以启动一个子线程来完成
然后我们回到this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs))这个方法。这个方法最终会调用到Selector.pollSelectionKeys方法
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
通过这个方法的名字就能看到这里是找到之前注册的OP_CONNECT,OP_WRITE,OP_READ(后面接受消息会讲)这些事件已经就绪的事件,然后调用相应方法来处理。对于发送消息,就是想上面那样,如果channel.ready() && key.isWritable()那么就调用Send send = channel.write()
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
真正向socker写数据,写完必须删除OP_WRITE事件,不然会造成空循环,这个我们之前nio那篇文章中也已经讲过。