You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11 KiB
11 KiB
生产者主线程发送消息经过拦截器、序列化器、分区器后到达消息收集器RecordAccumulator,其作用是缓存消息来实现批量发送。 ProducerRecord会被放入到ProducerBatch中,一个batch是16K。然后Batch会存放到消息收集器队列,此时Sender线程会从队列头读取批次缓存从而发送到Kafka实例。
一、简单demo
1.1 kafka 连接配置
1.2 发送消息
必须指定topic和value
public class ProducerDemo01 {
/**
* 生产者
*/
private static final KafkaProducer<String, String> PRODUCER = new KafkaProducer<String, String>(KafkaConf.getKafkaConf());
public static void main(String[] args) {
//发送100条数据
for (int i = 0; i < 20; i++) {
//指定topic和value
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "hello-" + i);
//指定topic\key\value
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "key-" + i, "value-" + i);
//指定topic\分区\kv
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", 0, "key-" + i, "value-" + i);
PRODUCER.send(record);
}
PRODUCER.close();
}
}
二、消息发送模式
2.1 发送并忘记
把消息发送给服务器,但并不关心消息是否正常到达,也就是上面样例中的方式。大多数情况下,消息会正常到达,这可以由Kafka的高可用性和自动重发机制来保证。不过有时候也会丢失消息
2.2 同步发送
使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,我们就可以知道消息是否发送成功。 前2种的区别在于在send()方法后是否获取返回值;RecordMetadata对象包含了偏移量等信息
try {
RecordMetadata rt = PRODUCER.send(record).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
2.3 异步发送
调用send()方法时,同时指定一个回调函数,服务器在返回响应时调用该函数。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
例:
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-01", "hello-" + i);
//异步发送
PRODUCER.send(record, (recordMetadata, e) -> {
if (Objects.nonNull(e)) {
//异常处理
} else {
//读取recordMetadata对象信息
System.out.println(recordMetadata.offset());
}
});
}
PRODUCER.close();
}
三、序列化
KV 序列化,kafka自带的有String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long,它们都实现了org.apache.kafka.common.serialization接口,也可以自定义数据类型。
如果自带序列化不满足,可以选择第三方库:Avro、JSON、Thrift、ProtoBuf等
四、分区器
分区器的作用就是根据key为消息分配分区。 DefaultPartitioner是默认分区器,规则为:如果key不为空,对key进行hash计算,根据hash值取模的到分区号;如果key为空,则使用轮询方式发往分区。
五、生产者参数
定义 | 用途 | 默认值或示例 | |
---|---|---|---|
bootstrap.servers | broker地址 | 指定kafka服务地址 | 127.0.0.1:9092 |
key.serializer | key序列化 | 指定key的序列化方式 | StringSerializer |
value.serializer | value序列化 | 指定value的序列化方式 | StringSerializer |
acks | 应答参数 | 指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 | acks=0 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应; acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应; acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 |
buffer.memory | 生产者缓冲区大小 | 用来设置生产者内存缓冲区(消息收集器)的大小生产者用它缓冲要发送到服务器的消息。如果程序发送消息的速度超过了发送到服务器的速度,会导致生产者缓冲区空间不足,这时候调用send()方法要么被阻塞,要么抛出异常 | 32M |
compression.type | 消息压缩类型 | 指定了消息被发送给broker之前使用哪一种压缩算法进行压缩 | 可选值有 snappy(占用CPU少,关注性能和网络带宽时选用),gzip(占用CPU多,更高压缩比,网络带宽有限时选用),lz4 |
retries | 重试次数 | 0 | |
linger.ms | ProducerBatch发送前等待时间 | 生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。 | 0 |
max.in.flight.requests.per.connection | 指定了生产者在收到服务器响应之前可以发送多少个消息 | 把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试 | |
batch.size | 单个ProducerBatch大 |