上一篇文章我们分析了spring-kafka的producer,这篇文章我们就要来分析下consumer。
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
System.out.println("received: " + message);
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<Integer, String>(cf, containerProps);
return container;
}
private static Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
可以看到我们需要创建一个KafkaMessageListenerContainer,而创建上述container又需要创建一个DefaultKafkaConsumerFactory和一个ContainerProperties。注意ContainerProperties必须设置一个MessageListener,这个listener有个onMessage方法,如果consumer收到消息,就会调用这个方法。创建了上述container之后,然后调用container.start方法就可以开启consumer了。
KafkaMessageListenerContainer.start
看源码不难发现,上述方法最终会调用到KafkaMessageListenerContainer.doStart方法
KafkaMessageListenerContainer.start==>AbstractMessageListenerContainer.start==>KafkaMessageListenerContainer.doStart
protected void doStart() {
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
if (!this.consumerFactory.isAutoCommit()) {
AckMode ackMode = containerProperties.getAckMode();
if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
}
if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
&& containerProperties.getAckTime() == 0) {
containerProperties.setAckTime(5000);
}
}
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (messageListener instanceof GenericAcknowledgingMessageListener) {
this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
}
else if (messageListener instanceof GenericMessageListener) {
this.listener = (GenericMessageListener<?>) messageListener;
}
else {
throw new IllegalStateException("messageListener must be 'MessageListener' "
+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
}
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-kafka-consumer-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
if (containerProperties.getListenerTaskExecutor() == null) {
SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-kafka-listener-");
containerProperties.setListenerTaskExecutor(listenerExecutor);
}
this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
可以看到这个方法给containerProperties设置了consumerTaskExecutor和listenerTaskExecutor。然后我们重点关注下这个方法最后会调用xxx.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);这个方法会开启一个”-kafka-consumer-“线程,这个线程会运行ListenerConsumer.run()方法。
KafkaMessageListenerContainer
public void run() {
if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
}
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit) {
processCommits();
}
processSeeks();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive);
lastAlertAt = now;
if (this.theListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
}
}
this.unsent = checkPause(this.unsent);
}
catch (WakeupException e) {
this.unsent = checkPause(this.unsent);
}
catch (Exception e) {
if (this.containerProperties.getGenericErrorHandler() != null) {
this.containerProperties.getGenericErrorHandler().handle(e, null);
}
else {
this.logger.error("Container exception", e);
}
}
}
if (this.listenerInvokerFuture != null) {
stopInvokerAndCommitManualAcks();
}
try {
this.consumer.unsubscribe();
}
catch (WakeupException e) {
// No-op. Continue process
}
this.consumer.close();
if (this.logger.isInfoEnabled()) {
this.logger.info("Consumer stopped");
}
}
我们需要注意一点,上述代码很多地方都判断了if (!this.autoCommit)是不是自动提交,这个地方我们后面会提到。我们先看到while(isRunning())循环里面有个ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());这个是获取从kafka broker拿到的record,下面会判断如果record不为空,并且
if (this.autoCommit) {
invokeListener(records);
}
则最终通过这个方法会调用到我们最早注册的MessageListener的onMessage方法。下面我们就会就这两个方面来分析一下源码。
从broker获取record
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
fetcher.sendFetches();
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
我们看到上述方法又会调用pollOnce方法来获取record
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorReady();
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
client.executeDelayedTasks(now);
// init any new fetches (won't resend pending fetches)
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
// if data is available already, e.g. from a previous network client poll() call to commit,
// then just return it immediately
if (!records.isEmpty())
return records;
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}
这个方法通过注释就能看到,它首先在获取数据之前会确保subscribed topics or partitions,然后执行autocommits and heartbeats任务,最后会调用client.poll(timeout, now)方法。我们猜测这个方法会把获取的record放在一个类似于queue里面,然后通过fetcher.fetchedRecords()返回。
private void poll(long timeout, long now, boolean executeDelayedTasks) {
// send all the requests we can send now
trySend(now);
// ensure we don't poll any longer than the deadline for
// the next scheduled task
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
clientPoll(timeout, now);
now = time.milliseconds();
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// execute scheduled tasks
if (executeDelayedTasks)
delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
}
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
maybeTriggerWakeup();
}
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
可以看到最终又会调用到NetworkClient.poll方法,上一篇关于spring-kafka producer的文章我们已经讲到了这个类。它是通过NIO的Selector,将各种socketChannel绑定在这个selector上面,然后监听各种事件。
我们接下来看下kafka的selector是怎么处理就绪的读事件的
org.apache.kafka.common.network.Selector
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
我们看到如果当前channel已经准备好并且SelectionKey已经可以读取并且没有staged receive,那么就把收到的消息缓存到Map<KafkaChannel, Deque
上面poll方法处理完了pollSelectionKeys之后,就会调用addToCompletedReceives,从方法名字也可以看出,它是把上面暂存的stagedReceives放进一个List
下面handleCompletedReceives会用到这个list
NetworkClient
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
可以看到上述方法会根据之前获得的List
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch failed", e);
}
});
}
}
可以看到onSuccess回调方法会把record封装后,添加到List
fetcher.fetchedRecords()方法,这个方法会根据获得的completedFetches,返回一个封装好的Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap。
消费获取的record
配置了自动commit
回到最开始的KafkaMessageListenerContainer.ListenerConsumer类的run方法
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
}
可以看到若获取到的records不为空,并且配置了自动commit,则会执行invokeListener(records);
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}else {
invokeRecordListener(records);
}
}
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
final ConsumerRecord<K, V> record = iterator.next();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Processing " + record);
}
try {
if (this.acknowledgingMessageListener != null) {
this.acknowledgingMessageListener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
: null);
}
else {
this.listener.onMessage(record);
}
if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (this.isRecordAck) {
this.consumer.wakeup();
}
}
catch (Exception e) {
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
this.acks.add(record);
}
try {
this.errorHandler.handle(e, record);
}
catch (Exception ee) {
this.logger.error("Error handler threw an exception", ee);
}
catch (Error er) { //NOSONAR
this.logger.error("Error handler threw an error", er);
throw er;
}
}
}
if (this.isManualAck || this.isBatchAck) {
this.consumer.wakeup();
}
}
可以看到最终会调用到this.listener.onMessage(record);
这个listnerer就是我们最开始给ContainerProperties设置的MessageListener,这样就调用到了这个listener的onMessage方法,也就回调到了业务逻辑了。可以看到这里就和我们上一篇文章有点比较像了,上一篇文章我们也可以添加发送消息成功后的回调函数,这里也可以看做是一个回调函数用来消费消息。两者都是在发送/接受消息的io线程里面。所以尽量都不能在这里面做一些比较耗时的操作,如果实在需要,建议开启子线程来处理。
没有配置自动commit
我们可以看到KafkaMessageListenerContainer.ListenerConsumer的run方法开始,有以下逻辑
if (!this.autoCommit) {
startInvoker();
}
private void startInvoker() {
ListenerConsumer.this.invoker = new ListenerInvoker();
ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor().submit(ListenerConsumer.this.invoker);
}
可以看到如果没有配置自动commit,就会用到了KafkaMessageListenerContainer的doStart方法中设置的listenerTaskExecutor,并开启了一个‘-kafka-listener-‘线程。
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
private boolean sendToListener(final ConsumerRecords<K, V> records) throws InterruptedException {
if (this.containerProperties.isPauseEnabled() && CollectionUtils.isEmpty(this.definedPartitions)) {
return !this.recordsToProcess.offer(records, this.containerProperties.getPauseAfter(),
TimeUnit.MILLISECONDS);
}
else {
this.recordsToProcess.put(records);
return false;
}
}
可以看到如果不是自动commit,则会把这个record丢进BlockingQueue<ConsumerRecords<K, V>> recordsToProcess 里面,然后上面开启的‘-kafka-listener-‘线程会用到这个
public void run() {
Assert.isTrue(this.active, "This instance is not active anymore");
if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
}
try {
this.executingThread = Thread.currentThread();
while (this.active) {
try {
ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
TimeUnit.SECONDS);
if (this.active) {
if (records != null) {
invokeListener(records);
}
else {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger.trace("No records to process");
}
}
}
}
catch (InterruptedException e) {
if (!this.active) {
Thread.currentThread().interrupt();
}
else {
ListenerConsumer.this.logger.debug("Interrupt ignored");
}
}
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
ListenerConsumer.this.consumer.wakeup();
}
}
}
finally {
this.active = false;
this.exitLatch.countDown();
}
}
可以看到这里先从recordsToProcess取出一条record,如果record不为空,则跟之前一样调用invokeListener(records);所以我们可以理解这个‘-kafka-listener-‘线程仅仅是把消息的真正消费的逻辑从consumer的io线程搬到了这个单独的线程。