Kafka Producer 发送消息源码阅读

服务器

浏览数:74

2020-6-8

今天看了kafka 发送消息部分的源码(0.8.2.1版本的),针对kafka的消息发送,分区策略如下:

1 kafka的分区策略

 1.1 如果指定了partition,则将消息发到对应的partition

 1.2 如果没有指定partition,但指定了key, 会根据key的hash选择一个partition,  

   如果如果key名固定,则消息只会发到固定的一个partition上, 所以key不要设置为固定的值,如果需要设置,则需要考虑修改kafka的源码,以支持将数据均匀发到不同的partition上

1.3 如果key,partition都没有指定,则采用round-robin即轮循的方式发到每个partition

2 消息的发送都是异步的,发送过程如下

涉及到三个对象:

2.1 RecordAccumulator

维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 对象

一个partition对应一个RecordBatch的ArrayDeque   

调用KafkaProducer.send方法发送消息,最终调用如下方法:

           

如果RecordBatch已经满 或 创建了新的RecordBatch,则唤醒发送对象Sender

                 

2.2 Sender

 The background thread that handles the sending of produce requests to the Kafka cluster

Sender通过kafkaclient将RecordAccumulator 的数据批量写入到server    

Sender定义的run方法实现如下:

   

在run(long now)中,实现逻辑如下:

2.2.1 首先通过如下条件获取发送数据的节点 

2.2.2删除掉当前不能发送的kafka node

                          

2.2.3 获取发送的数据列表

    循环此节点上是leader的partition

          根据partition,获取此partition对应的RecordBatch,并放到此节点对应的 List<RecordBatch>

                                                                

2.2.4组装请求对象,发送到不同的kafka节点

计算pollTimeout并发送请求对象到不同的kafka节点

 // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);

2.2.5 针对返回的数据进行处理

 // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
        for (ClientResponse response : responses) {
            if (response.wasDisconnected())
                handleDisconnect(response, now);
            else
                handleResponse(response, now);
        }

2.3 KafkaClient

其实现类是:NetworkClient,基于socket方式与server进行数据交互

3 kafka参数配置

用于存储批量数据的缓冲大小(对应类:MemoryRecords) batch-size : 16384

用于整个client缓存所有发送对象的大小(对应类:BufferPool ) :BUFFER_MEMORY  32 * 1024 * 1024L 即 32M

用于发送延迟的时间配置(LINGER_MS),如果设置为1秒,则记录先发送到client缓存中,等待1秒后再发送数据,默认为0 表示立即发送

指定数据压缩类型: compression.type ,支持:none,gzip, snappy, lz4, 默认为none

理论上,设置LINGER_MS 会提高消息的吞吐量

作者:cloud-coder