spring-kafka源码分析二(Consumer)


上一篇文章我们分析了spring-kafka的producer,这篇文章我们就要来分析下consumer。

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {

    @Override
    public void onMessage(ConsumerRecord<Integer, String> message) {
        System.out.println("received: " + message);
    }

});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();

private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
    Map<String, Object> props = consumerProps();
    DefaultKafkaConsumerFactory<Integer, String> cf =
            new DefaultKafkaConsumerFactory<Integer, String>(props);
    KafkaMessageListenerContainer<Integer, String> container =
            new KafkaMessageListenerContainer<Integer, String>(cf, containerProps);
    return container;
}

private static Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
}

可以看到我们需要创建一个KafkaMessageListenerContainer,而创建上述container又需要创建一个DefaultKafkaConsumerFactory和一个ContainerProperties。注意ContainerProperties必须设置一个MessageListener,这个listener有个onMessage方法,如果consumer收到消息,就会调用这个方法。创建了上述container之后,然后调用container.start方法就可以开启consumer了。

KafkaMessageListenerContainer.start

看源码不难发现,上述方法最终会调用到KafkaMessageListenerContainer.doStart方法
KafkaMessageListenerContainer.start==>AbstractMessageListenerContainer.start==>KafkaMessageListenerContainer.doStart

protected void doStart() {
    if (isRunning()) {
        return;
    }
    ContainerProperties containerProperties = getContainerProperties();

    if (!this.consumerFactory.isAutoCommit()) {
        AckMode ackMode = containerProperties.getAckMode();
        if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
            Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
        }
        if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                && containerProperties.getAckTime() == 0) {
            containerProperties.setAckTime(5000);
        }
    }

    Object messageListener = containerProperties.getMessageListener();
    Assert.state(messageListener != null, "A MessageListener is required");
    if (messageListener instanceof GenericAcknowledgingMessageListener) {
        this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
    }
    else if (messageListener instanceof GenericMessageListener) {
        this.listener = (GenericMessageListener<?>) messageListener;
    }
    else {
        throw new IllegalStateException("messageListener must be 'MessageListener' "
                + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
    }
    if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-kafka-consumer-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }
    if (containerProperties.getListenerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-kafka-listener-");
        containerProperties.setListenerTaskExecutor(listenerExecutor);
    }
    this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
    setRunning(true);
    this.listenerConsumerFuture = containerProperties
            .getConsumerTaskExecutor()
            .submitListenable(this.listenerConsumer);
}

可以看到这个方法给containerProperties设置了consumerTaskExecutor和listenerTaskExecutor。然后我们重点关注下这个方法最后会调用xxx.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);这个方法会开启一个”-kafka-consumer-“线程,这个线程会运行ListenerConsumer.run()方法。
KafkaMessageListenerContainer

public void run() {
    if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
        ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
    }
    this.count = 0;
    this.last = System.currentTimeMillis();
    if (isRunning() && this.definedPartitions != null) {
        initPartitionsIfNeeded();
        // we start the invoker here as there will be no rebalance calls to
        // trigger it, but only if the container is not set to autocommit
        // otherwise we will process records on a separate thread
        if (!this.autoCommit) {
            startInvoker();
        }
    }
    long lastReceive = System.currentTimeMillis();
    long lastAlertAt = lastReceive;
    while (isRunning()) {
        try {
            if (!this.autoCommit) {
                processCommits();
            }
            processSeeks();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Polling (paused=" + this.paused + ")...");
            }
            ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
            if (records != null && this.logger.isDebugEnabled()) {
                this.logger.debug("Received: " + records.count() + " records");
            }
            if (records != null && records.count() > 0) {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    lastReceive = System.currentTimeMillis();
                }
                // if the container is set to auto-commit, then execute in the
                // same thread
                // otherwise send to the buffering queue
                if (this.autoCommit) {
                    invokeListener(records);
                }
                else {
                    if (sendToListener(records)) {
                        if (this.assignedPartitions != null) {
                            // avoid group management rebalance due to a slow
                            // consumer
                            this.consumer.pause(this.assignedPartitions);
                            this.paused = true;
                            this.unsent = records;
                        }
                    }
                }
            }
            else {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    long now = System.currentTimeMillis();
                    if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                            && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                        publishIdleContainerEvent(now - lastReceive);
                        lastAlertAt = now;
                        if (this.theListener instanceof ConsumerSeekAware) {
                            seekPartitions(getAssignedPartitions(), true);
                        }
                    }
                }
            }
            this.unsent = checkPause(this.unsent);
        }
        catch (WakeupException e) {
            this.unsent = checkPause(this.unsent);
        }
        catch (Exception e) {
            if (this.containerProperties.getGenericErrorHandler() != null) {
                this.containerProperties.getGenericErrorHandler().handle(e, null);
            }
            else {
                this.logger.error("Container exception", e);
            }
        }
    }
    if (this.listenerInvokerFuture != null) {
        stopInvokerAndCommitManualAcks();
    }
    try {
        this.consumer.unsubscribe();
    }
    catch (WakeupException e) {
        // No-op. Continue process
    }
    this.consumer.close();
    if (this.logger.isInfoEnabled()) {
        this.logger.info("Consumer stopped");
    }
}

我们需要注意一点,上述代码很多地方都判断了if (!this.autoCommit)是不是自动提交,这个地方我们后面会提到。我们先看到while(isRunning())循环里面有个ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());这个是获取从kafka broker拿到的record,下面会判断如果record不为空,并且

if (this.autoCommit) {
	invokeListener(records);
}

则最终通过这个方法会调用到我们最早注册的MessageListener的onMessage方法。下面我们就会就这两个方面来分析一下源码。

从broker获取record

public ConsumerRecords<K, V> poll(long timeout) {
    acquire();
    try {
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");

        // poll for new data until the timeout expires
        long start = time.milliseconds();
        long remaining = timeout;
        do {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            if (!records.isEmpty()) {
                fetcher.sendFetches();
                client.pollNoWakeup();

                if (this.interceptors == null)
                    return new ConsumerRecords<>(records);
                else
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

我们看到上述方法又会调用pollOnce方法来获取record

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
    coordinator.ensureCoordinatorReady();

    // ensure we have partitions assigned if we expect to
    if (subscriptions.partitionsAutoAssigned())
        coordinator.ensurePartitionAssignment();

    // fetch positions if we have partitions we're subscribed to that we
    // don't know the offset for
    if (!subscriptions.hasAllFetchPositions())
        updateFetchPositions(this.subscriptions.missingFetchPositions());

    long now = time.milliseconds();

    // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
    client.executeDelayedTasks(now);

    // init any new fetches (won't resend pending fetches)
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

    // if data is available already, e.g. from a previous network client poll() call to commit,
    // then just return it immediately
    if (!records.isEmpty())
        return records;

    fetcher.sendFetches();
    client.poll(timeout, now);
    return fetcher.fetchedRecords();
}

这个方法通过注释就能看到,它首先在获取数据之前会确保subscribed topics or partitions,然后执行autocommits and heartbeats任务,最后会调用client.poll(timeout, now)方法。我们猜测这个方法会把获取的record放在一个类似于queue里面,然后通过fetcher.fetchedRecords()返回。

private void poll(long timeout, long now, boolean executeDelayedTasks) {
    // send all the requests we can send now
    trySend(now);

    // ensure we don't poll any longer than the deadline for
    // the next scheduled task
    timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
    clientPoll(timeout, now);
    now = time.milliseconds();

    // handle any disconnects by failing the active requests. note that disconnects must
    // be checked immediately following poll since any subsequent call to client.ready()
    // will reset the disconnect status
    checkDisconnects(now);

    // execute scheduled tasks
    if (executeDelayedTasks)
        delayedTasks.poll(now);

    // try again to send requests since buffer space may have been
    // cleared or a connect finished in the poll
    trySend(now);

    // fail requests that couldn't be sent if they have expired
    failExpiredRequests(now);
}

private void clientPoll(long timeout, long now) {
    client.poll(timeout, now);
    maybeTriggerWakeup();
}

@Override
public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleTimedOutRequests(responses, updatedNow);

    // invoke callbacks
    for (ClientResponse response : responses) {
        if (response.request().hasCallback()) {
            try {
                response.request().callback().onComplete(response);
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

    return responses;
}

可以看到最终又会调用到NetworkClient.poll方法,上一篇关于spring-kafka producer的文章我们已经讲到了这个类。它是通过NIO的Selector,将各种socketChannel绑定在这个selector上面,然后监听各种事件。

我们接下来看下kafka的selector是怎么处理就绪的读事件的
org.apache.kafka.common.network.Selector

@Override
public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    clear();

    if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
        timeout = 0;

    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    currentTimeNanos = endSelect;
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false);
        pollSelectionKeys(immediatelyConnectedKeys, true);
    }

    addToCompletedReceives();

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    maybeCloseOldestConnection();
}


private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        KafkaChannel channel = channel(key);
            /* if channel is ready read from any connections that have readable data */
            if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                NetworkReceive networkReceive;
                while ((networkReceive = channel.read()) != null)
                    addToStagedReceives(channel, networkReceive);
}

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
    if (!stagedReceives.containsKey(channel))
        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

    Deque<NetworkReceive> deque = stagedReceives.get(channel);
    deque.add(receive);
}

我们看到如果当前channel已经准备好并且SelectionKey已经可以读取并且没有staged receive,那么就把收到的消息缓存到Map<KafkaChannel, Deque> stagedReceives这个里面。然后我们看下后续是怎么处理这些缓存的消息的。
上面poll方法处理完了pollSelectionKeys之后,就会调用addToCompletedReceives,从方法名字也可以看出,它是把上面暂存的stagedReceives放进一个List completedReceives里面。
下面handleCompletedReceives会用到这个list

NetworkClient

public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleTimedOutRequests(responses, updatedNow);

    // invoke callbacks
    for (ClientResponse response : responses) {
        if (response.request().hasCallback()) {
            try {
                response.request().callback().onComplete(response);
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

    return responses;
}

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        ClientRequest req = inFlightRequests.completeNext(source);
        Struct body = parseResponse(receive.payload(), req.request().header());
        if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
            responses.add(new ClientResponse(req, now, false, body));
    }
}

可以看到上述方法会根据之前获得的List completedReceives组装response,然后调用response.request().callback().onComplete(response);最终会触发Fetcher类sendFetches方法中注册RequestFutureListener的回调onSuccess方法

public void sendFetches() {
    for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
        final FetchRequest request = fetchEntry.getValue();
        client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = new FetchResponse(resp.responseBody());
                        Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                        FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            TopicPartition partition = entry.getKey();
                            long fetchOffset = request.fetchData().get(partition).offset;
                            FetchResponse.PartitionData fetchData = entry.getValue();
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
                        }

                        sensors.fetchLatency.record(resp.requestLatencyMs());
                        sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        log.debug("Fetch failed", e);
                    }
                });
    }
}

可以看到onSuccess回调方法会把record封装后,添加到List completedFetches,之前上面讲到的pollOnce方法会调用
fetcher.fetchedRecords()方法,这个方法会根据获得的completedFetches,返回一个封装好的Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap。

消费获取的record

配置了自动commit

回到最开始的KafkaMessageListenerContainer.ListenerConsumer类的run方法

ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (records != null && this.logger.isDebugEnabled()) {
	this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
	if (this.containerProperties.getIdleEventInterval() != null) {
	    lastReceive = System.currentTimeMillis();
	}
	if (this.autoCommit) {
		invokeListener(records);
	}
	else {
		if (sendToListener(records)) {
			if (this.assignedPartitions != null) {
		    	this.consumer.pause(this.assignedPartitions);
				this.paused = true;
				this.unsent = records;
			}
		}
	}
}

可以看到若获取到的records不为空,并且配置了自动commit,则会执行invokeListener(records);

private void invokeListener(final ConsumerRecords<K, V> records) {
	if (this.isBatchListener) {
	    invokeBatchListener(records);
    }else {
	    invokeRecordListener(records);
    }
}

private void invokeRecordListener(final ConsumerRecords<K, V> records) {
    Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
    while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
        final ConsumerRecord<K, V> record = iterator.next();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Processing " + record);
        }
        try {
            if (this.acknowledgingMessageListener != null) {
                this.acknowledgingMessageListener.onMessage(record,
                        this.isAnyManualAck
                                ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
                                : null);
            }
            else {
                this.listener.onMessage(record);
            }
            if (!this.isAnyManualAck && !this.autoCommit) {
                this.acks.add(record);
            }
            if (this.isRecordAck) {
                this.consumer.wakeup();
            }
        }
        catch (Exception e) {
            if (this.containerProperties.isAckOnError() && !this.autoCommit) {
                this.acks.add(record);
            }
            try {
                this.errorHandler.handle(e, record);
            }
            catch (Exception ee) {
                this.logger.error("Error handler threw an exception", ee);
            }
            catch (Error er) { //NOSONAR
                this.logger.error("Error handler threw an error", er);
                throw er;
            }
        }
    }
    if (this.isManualAck || this.isBatchAck) {
        this.consumer.wakeup();
    }
}

可以看到最终会调用到this.listener.onMessage(record);
这个listnerer就是我们最开始给ContainerProperties设置的MessageListener,这样就调用到了这个listener的onMessage方法,也就回调到了业务逻辑了。可以看到这里就和我们上一篇文章有点比较像了,上一篇文章我们也可以添加发送消息成功后的回调函数,这里也可以看做是一个回调函数用来消费消息。两者都是在发送/接受消息的io线程里面。所以尽量都不能在这里面做一些比较耗时的操作,如果实在需要,建议开启子线程来处理。

没有配置自动commit

我们可以看到KafkaMessageListenerContainer.ListenerConsumer的run方法开始,有以下逻辑

if (!this.autoCommit) {
    startInvoker();
}

private void startInvoker() {
	ListenerConsumer.this.invoker = new ListenerInvoker();
	ListenerConsumer.this.listenerInvokerFuture =      this.containerProperties.getListenerTaskExecutor().submit(ListenerConsumer.this.invoker);
}

可以看到如果没有配置自动commit,就会用到了KafkaMessageListenerContainer的doStart方法中设置的listenerTaskExecutor,并开启了一个‘-kafka-listener-‘线程

if (this.autoCommit) {
	invokeListener(records);
}
else {
	if (sendToListener(records)) {
		if (this.assignedPartitions != null) {
			this.consumer.pause(this.assignedPartitions);
			this.paused = true;
			this.unsent = records;
		}
	}
}

private boolean sendToListener(final ConsumerRecords<K, V> records) throws InterruptedException {
	if (this.containerProperties.isPauseEnabled() && CollectionUtils.isEmpty(this.definedPartitions)) {
		return !this.recordsToProcess.offer(records, this.containerProperties.getPauseAfter(),
				TimeUnit.MILLISECONDS);
	}
	else {
		this.recordsToProcess.put(records);
		return false;
	}
}

可以看到如果不是自动commit,则会把这个record丢进BlockingQueue<ConsumerRecords<K, V>> recordsToProcess 里面,然后上面开启的‘-kafka-listener-‘线程会用到这个

public void run() {
	Assert.isTrue(this.active, "This instance is not active anymore");
	if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
		((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
	}
	try {
		this.executingThread = Thread.currentThread();
		while (this.active) {
			try {
				ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
						TimeUnit.SECONDS);
				if (this.active) {
					if (records != null) {
						invokeListener(records);
					}
					else {
						if (ListenerConsumer.this.logger.isTraceEnabled()) {
							ListenerConsumer.this.logger.trace("No records to process");
						}
					}
				}
			}
			catch (InterruptedException e) {
				if (!this.active) {
					Thread.currentThread().interrupt();
				}
				else {
					ListenerConsumer.this.logger.debug("Interrupt ignored");
				}
			}
			if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
				ListenerConsumer.this.consumer.wakeup();
			}
		}
	}
	finally {
		this.active = false;
		this.exitLatch.countDown();
	}
}

可以看到这里先从recordsToProcess取出一条record,如果record不为空,则跟之前一样调用invokeListener(records);所以我们可以理解这个‘-kafka-listener-‘线程仅仅是把消息的真正消费的逻辑从consumer的io线程搬到了这个单独的线程。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
elastic-job源码分析 elastic-job源码分析
先简单介绍下elastic-job,它是当当公司开源的一个分布式调度解决方案。大家都知道,当数据量比较小的时候,我们可以只用quartz只在一台服务器上处理所有的数据。随着业务发展,数据量越来越大,一台机器已经不足以支撑,就必须想办法将一个
2017-01-24
下一篇 
spring-kafka源码分析一(Producer) spring-kafka源码分析一(Producer)
spring-kafka是运用spring的概念基于apache kafka(linkedin开源已经捐献给apache基金会)消息解决方案开发的一个java client端。它提供了一些接口来更方便收发消息与kafka server端交互
2016-12-17
  目录