You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
7.1 KiB

2 years ago
kafka中存在消费组的概念每个消费者程序都有一个对应的消费组。
如图3-1所示存在2个消费组组A的消费者数量与topic分区数量一致因此每个消费者都能接受到1个分区的数据组B的消费者数量刚好是分区一半因此每个消费者可以获取2个分区的数据。
![[Pasted image 20231124171015.png]]
如图3-2所示单一消费者可以获取topic内所有分区数据
![[Pasted image 20231124171026.png]]
如图3-5所示消费者数量大于分区数量出现无法获取数据的消费者
![[Pasted image 20231124171046.png]]
消费者数量和分区数量关系如下:
1. 当分区数量等于同一消费者组内消费者数量时一个分区对应一个消费者
2. 当分区数量小于同一消费者组内消费者数量时多余的消费者将接收不到数据
3. 当分区数量大于同一消费者组内消费者数量时所有分区都将被消费
4. 特殊情况存在多个消费者组订阅同一个topic每个消费者组都能消费到数据
# 一、消费者demo
最低配置
```java
public final class KafkaConsumerConf {
private static final Properties PROP = new Properties();
static {
PROP.put("bootstrap.servers", "127.0.0.1:9092");
//key序列化器
PROP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value序列化器
PROP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//消费者组ID
PROP.put("group.id", "consumer-01");
}
public static Properties getConf() {
return PROP;
}
}
```
轮询+自动提交偏移量
```java
public class KafkaConsumerDemo01 {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConsumerConf.getConf());
//订阅主题
consumer.subscribe(Collections.singleton("test-01"));
//指定分区
consumer.assign(Arrays.asList(new TopicPartition("test-01", 0)));
try {
while (true) {
ConsumerRecords<String, String> rt = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : rt) {
System.out.println("topic=" + record.topic() + "##partition=" + record.partition() + "##offset=" + record.offset() + "##key=" + record.key() + "##value=" + record.value());
}
}
} catch (Exception e) {
} finally {
consumer.close();
}
}
}
```
从返回的ConsumerRecords中获取ConsumerRecord,与生产者ProducerRecord相对应可以从中读取相关信息。
```java
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;\\主题
private final int partition;\\分区
private final long offset;\\偏移量
private final long timestamp;\\时间戳
private final TimestampType timestampType;\\消息创建时间|消息追加到日志时间
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;\\key
private final V value;\\val
private final Optional<Integer> leaderEpoch;
private volatile Long checksum;
```
# 二、偏移量
默认情况下自动提交每隔5秒提交一次也可以设置手动提交。手动提交会出现消息丢失或者重复消费的情况。
![[Pasted image 20231124181323.png]]
+ 消息丢失
例:当前拉取的消息集为[x+2,x+7]当x+5时出现了异常当前提交的偏移量为x+8,故障恢复后重新拉取消息也是从x+8开始因此x+5到x+7之间的消息未消费。
+ 重复消费
当消费x+5时出现异常故障恢复后重新从x+2开始也就是x+2到x+4的数据重新消费了一遍。
# 三、rebalance
再均衡是指消费者数量发生变化时,分区所属权从一个消费者转移到另一个消费者的行为。再均衡发生期间,消费组内的消费者是无法读取消息的。一般情况下,尽量避免不必要的再均衡的发生。
# 四、多线程消费者
KafkaConsumer对象不是线程安全的其中定义了一个acquire()方法用于检测当前是否只有一个线程在操作。
## 4.1 线程封闭
每个线程实例话一个KafkaConsumer消费者对象
![[Pasted image 20231124183943.png]]
这种方式的问题在于**消费线程数必须小于等于分区数**。多消费者的问题在于会浪费tcp连接并且在有事务的情况下会很慢。
## 4.2 将数据消费和数据处理拆分
![[Pasted image 20231126105610.png]]
### 4.2.1 自动提交方式实现:
```java
public class KafkaConsumerDemo02 {
public static void main(String[] args) {
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(KafkaConsumerConf.getConf(), "test-01", 50, 1000, new ThreadPoolExecutor.CallerRunsPolicy());
kafkaConsumerThread.start();
}
/**
* 消费者线程
*/
private static class KafkaConsumerThread extends Thread {
private final KafkaConsumer<String, String> consumer;
private final ThreadPoolExecutor handlerPool;
/**
* 消费者线程初始化
*
* @param conf
* @param topic
* @param threadNum
* @param queueSize
* @param handler
*/
public KafkaConsumerThread(Properties conf, String topic, int threadNum, int queueSize, RejectedExecutionHandler handler) {
consumer = new KafkaConsumer<>(conf);
consumer.subscribe(Collections.singleton(topic));
//消息处理线程池
handlerPool = new ThreadPoolExecutor(0, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> rt = consumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
if (rt != null) {
handlerPool.submit(new MessageHandler(rt));
}
}
} catch (Exception e) {
System.err.println("消费者异常,e=" + e.getMessage());
} finally {
consumer.close();
}
}
}
/**
* 消息处理线程
*/
private static class MessageHandler extends Thread {
public ConsumerRecords<String, String> records;
public MessageHandler(ConsumerRecords<String, String> rt) {
records = rt;
}
@Override
public void run() {
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic=" + record.topic() + "##partition=" + record.partition() + "##offset=" + record.offset() + "##key=" + record.key() + "##value=" + record.value());
//手动GC
record = null;
}
}
}
}
```
### 4.2.2 窗口滑动方式
# 五、消费者参数