You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
11 KiB

2 years ago
![[Pasted image 20231123194707.png]]
生产者**主线程**发送消息经过拦截器、序列化器、分区器后到达消息收集器RecordAccumulator其作用是缓存消息来实现批量发送。
ProducerRecord会被放入到ProducerBatch中一个batch是16K。然后Batch会存放到消息收集器队列此时**Sender**线程会从队列头读取批次缓存从而发送到Kafka实例。
# 一、简单demo
## 1.1 kafka 连接配置
![[Pasted image 20231123171533.png]]
## 1.2 发送消息
必须指定topic和value
```java
public class ProducerDemo01 {
/**
* 生产者
*/
private static final KafkaProducer<String, String> PRODUCER = new KafkaProducer<String, String>(KafkaConf.getKafkaConf());
public static void main(String[] args) {
//发送100条数据
for (int i = 0; i < 20; i++) {
//指定topic和value
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "hello-" + i);
//指定topic\key\value
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "key-" + i, "value-" + i);
//指定topic\分区\kv
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", 0, "key-" + i, "value-" + i);
PRODUCER.send(record);
}
PRODUCER.close();
}
}
```
消息体对象属性:
![[Pasted image 20231123174033.png]]
# 二、消息发送模式
## 2.1 发送并忘记
把消息发送给服务器但并不关心消息是否正常到达也就是上面样例中的方式。大多数情况下消息会正常到达这可以由Kafka的高可用性和自动重发机制来保证。不过有时候也会丢失消息
## 2.2 同步发送
使用send()方法发送消息它会返回一个Future对象调用get()方法进行等待,我们就可以知道消息是否发送成功。
前2种的区别在于在send()方法后是否获取返回值RecordMetadata对象包含了偏移量等信息
```java
try {
RecordMetadata rt = PRODUCER.send(record).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
```
![[Pasted image 20231123180041.png]]
## 2.3 异步发送
调用send()方法时,同时指定一个回调函数,服务器在返回响应时调用该函数。
```java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
```
例:
```java
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "hello-" + i);
//异步发送
PRODUCER.send(record, (recordMetadata, e) -> {
if (Objects.nonNull(e)) {
//异常处理
} else {
//读取recordMetadata对象信息
System.out.println(recordMetadata.offset());
}
});
}
PRODUCER.close();
}
```
# 三、序列化
KV 序列化kafka自带的有String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long它们都实现了org.apache.kafka.common.serialization接口也可以自定义数据类型。
如果自带序列化不满足可以选择第三方库Avro、JSON、Thrift、ProtoBuf等
# 四、分区器
分区器的作用就是根据key为消息分配分区。
DefaultPartitioner是默认分区器规则为如果key不为空对key进行hash计算根据hash值取模的到分区号如果key为空则使用轮询方式发往分区。
# 五、生产者参数
| | 定义 | 用途 | 默认值或示例 |
| ------------------------------------- | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| bootstrap.servers | broker地址 | 指定kafka服务地址 | 127.0.0.1:9092 |
| key.serializer | key序列化 | 指定key的序列化方式 | StringSerializer |
| value.serializer | value序列化 | 指定value的序列化方式 | StringSerializer |
| acks | 应答参数 | 指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 | acks=0 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;<br/> acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;<br/> acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 |
| buffer.memory | 生产者缓冲区大小 | 用来设置生产者内存缓冲区消息收集器的大小生产者用它缓冲要发送到服务器的消息。如果程序发送消息的速度超过了发送到服务器的速度会导致生产者缓冲区空间不足这时候调用send()方法要么被阻塞,要么抛出异常 | 32M |
| compression.type | 消息压缩类型 | 指定了消息被发送给broker之前使用哪一种压缩算法进行压缩 | 可选值有 snappy占用CPU少关注性能和网络带宽时选用gzip占用CPU多更高压缩比网络带宽有限时选用lz4 |
| retries | 重试次数 | | 0 |
| linger.ms | ProducerBatch发送前等待时间 | 生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。 | 0 |
| max.in.flight.requests.per.connection | 指定了生产者在收到服务器响应之前可以发送多少个消息 | 把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试 | |
| batch.size | 单个ProducerBatch大| | |