![[Pasted image 20231123194707.png]] 生产者**主线程**发送消息经过拦截器、序列化器、分区器后到达消息收集器RecordAccumulator,其作用是缓存消息来实现批量发送。 ProducerRecord会被放入到ProducerBatch中,一个batch是16K。然后Batch会存放到消息收集器队列,此时**Sender**线程会从队列头读取批次缓存从而发送到Kafka实例。 # 一、简单demo ## 1.1 kafka 连接配置 ![[Pasted image 20231123171533.png]] ## 1.2 发送消息 必须指定topic和value ```java public class ProducerDemo01 { /** * 生产者 */ private static final KafkaProducer PRODUCER = new KafkaProducer(KafkaConf.getKafkaConf()); public static void main(String[] args) { //发送100条数据 for (int i = 0; i < 20; i++) { //指定topic和value // ProducerRecord record = new ProducerRecord("test-01", "hello-" + i); //指定topic\key\value // ProducerRecord record = new ProducerRecord("test-01", "key-" + i, "value-" + i); //指定topic\分区\kv ProducerRecord record = new ProducerRecord("test-01", 0, "key-" + i, "value-" + i); PRODUCER.send(record); } PRODUCER.close(); } } ``` 消息体对象属性: ![[Pasted image 20231123174033.png]] # 二、消息发送模式 ## 2.1 发送并忘记 把消息发送给服务器,但并不关心消息是否正常到达,也就是上面样例中的方式。大多数情况下,消息会正常到达,这可以由Kafka的高可用性和自动重发机制来保证。不过有时候也会丢失消息 ## 2.2 同步发送 使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,我们就可以知道消息是否发送成功。 前2种的区别在于在send()方法后是否获取返回值;RecordMetadata对象包含了偏移量等信息 ```java try { RecordMetadata rt = PRODUCER.send(record).get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } ``` ![[Pasted image 20231123180041.png]] ## 2.3 异步发送 调用send()方法时,同时指定一个回调函数,服务器在返回响应时调用该函数。 ```java public Future send(ProducerRecord record, Callback callback) ``` 例: ```java public static void main(String[] args) { for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("test-01", "hello-" + i); //异步发送 PRODUCER.send(record, (recordMetadata, e) -> { if (Objects.nonNull(e)) { //异常处理 } else { //读取recordMetadata对象信息 System.out.println(recordMetadata.offset()); } }); } PRODUCER.close(); } ``` # 三、序列化 KV 序列化,kafka自带的有String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long,它们都实现了org.apache.kafka.common.serialization接口,也可以自定义数据类型。 如果自带序列化不满足,可以选择第三方库:Avro、JSON、Thrift、ProtoBuf等 # 四、分区器 分区器的作用就是根据key为消息分配分区。 DefaultPartitioner是默认分区器,规则为:如果key不为空,对key进行hash计算,根据hash值取模的到分区号;如果key为空,则使用轮询方式发往分区。 # 五、生产者参数 | | 定义 | 用途 | 默认值或示例 | | ------------------------------------- | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | bootstrap.servers | broker地址 | 指定kafka服务地址 | 127.0.0.1:9092 | | key.serializer | key序列化 | 指定key的序列化方式 | StringSerializer | | value.serializer | value序列化 | 指定value的序列化方式 | StringSerializer | | acks | 应答参数 | 指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 | acks=0 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 | | buffer.memory | 生产者缓冲区大小 | 用来设置生产者内存缓冲区(消息收集器)的大小生产者用它缓冲要发送到服务器的消息。如果程序发送消息的速度超过了发送到服务器的速度,会导致生产者缓冲区空间不足,这时候调用send()方法要么被阻塞,要么抛出异常 | 32M | | compression.type | 消息压缩类型 | 指定了消息被发送给broker之前使用哪一种压缩算法进行压缩 | 可选值有 snappy(占用CPU少,关注性能和网络带宽时选用),gzip(占用CPU多,更高压缩比,网络带宽有限时选用),lz4 | | retries | 重试次数 | | 0 | | linger.ms | ProducerBatch发送前等待时间 | 生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。 | 0 | | max.in.flight.requests.per.connection | 指定了生产者在收到服务器响应之前可以发送多少个消息 | 把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试 | | | batch.size | 单个ProducerBatch大| | |