|
|
|
|
kafka中存在消费组的概念,每个消费者程序都有一个对应的消费组。
|
|
|
|
|
|
|
|
|
|
如图3-1所示,存在2个消费组,组A的消费者数量与topic分区数量一致,因此每个消费者都能接受到1个分区的数据;组B的消费者数量刚好是分区一半,因此每个消费者可以获取2个分区的数据。
|
|
|
|
|
![[Pasted image 20231124171015.png]]
|
|
|
|
|
|
|
|
|
|
如图3-2所示,单一消费者可以获取topic内所有分区数据
|
|
|
|
|
![[Pasted image 20231124171026.png]]
|
|
|
|
|
|
|
|
|
|
如图3-5所示,消费者数量大于分区数量,出现无法获取数据的消费者
|
|
|
|
|
![[Pasted image 20231124171046.png]]
|
|
|
|
|
|
|
|
|
|
消费者数量和分区数量关系如下:
|
|
|
|
|
1. 当分区数量等于同一消费者组内消费者数量时一个分区对应一个消费者
|
|
|
|
|
2. 当分区数量小于同一消费者组内消费者数量时多余的消费者将接收不到数据
|
|
|
|
|
3. 当分区数量大于同一消费者组内消费者数量时所有分区都将被消费
|
|
|
|
|
4. 特殊情况,存在多个消费者组订阅同一个topic,每个消费者组都能消费到数据
|
|
|
|
|
|
|
|
|
|
# 一、消费者demo
|
|
|
|
|
最低配置
|
|
|
|
|
```java
|
|
|
|
|
public final class KafkaConsumerConf {
|
|
|
|
|
private static final Properties PROP = new Properties();
|
|
|
|
|
|
|
|
|
|
static {
|
|
|
|
|
PROP.put("bootstrap.servers", "127.0.0.1:9092");
|
|
|
|
|
//key序列化器
|
|
|
|
|
PROP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
|
|
//value序列化器
|
|
|
|
|
PROP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
|
|
//消费者组ID
|
|
|
|
|
PROP.put("group.id", "consumer-01");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static Properties getConf() {
|
|
|
|
|
return PROP;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
轮询+自动提交偏移量
|
|
|
|
|
```java
|
|
|
|
|
public class KafkaConsumerDemo01 {
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConsumerConf.getConf());
|
|
|
|
|
//订阅主题
|
|
|
|
|
consumer.subscribe(Collections.singleton("test-01"));
|
|
|
|
|
//指定分区
|
|
|
|
|
consumer.assign(Arrays.asList(new TopicPartition("test-01", 0)));
|
|
|
|
|
try {
|
|
|
|
|
while (true) {
|
|
|
|
|
ConsumerRecords<String, String> rt = consumer.poll(Duration.ofSeconds(1));
|
|
|
|
|
for (ConsumerRecord<String, String> record : rt) {
|
|
|
|
|
System.out.println("topic=" + record.topic() + "##partition=" + record.partition() + "##offset=" + record.offset() + "##key=" + record.key() + "##value=" + record.value());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
consumer.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
从返回的ConsumerRecords中获取ConsumerRecord,与生产者ProducerRecord相对应,可以从中读取相关信息。
|
|
|
|
|
```java
|
|
|
|
|
public class ConsumerRecord<K, V> {
|
|
|
|
|
public static final long NO_TIMESTAMP = -1L;
|
|
|
|
|
public static final int NULL_SIZE = -1;
|
|
|
|
|
public static final int NULL_CHECKSUM = -1;
|
|
|
|
|
private final String topic;\\主题
|
|
|
|
|
private final int partition;\\分区
|
|
|
|
|
private final long offset;\\偏移量
|
|
|
|
|
private final long timestamp;\\时间戳
|
|
|
|
|
private final TimestampType timestampType;\\消息创建时间|消息追加到日志时间
|
|
|
|
|
private final int serializedKeySize;
|
|
|
|
|
private final int serializedValueSize;
|
|
|
|
|
private final Headers headers;
|
|
|
|
|
private final K key;\\key
|
|
|
|
|
private final V value;\\val
|
|
|
|
|
private final Optional<Integer> leaderEpoch;
|
|
|
|
|
private volatile Long checksum;
|
|
|
|
|
```
|
|
|
|
|
# 二、偏移量
|
|
|
|
|
默认情况下自动提交(每隔5秒提交一次),也可以设置手动提交。手动提交会出现消息丢失或者重复消费的情况。
|
|
|
|
|
![[Pasted image 20231124181323.png]]
|
|
|
|
|
+ 消息丢失
|
|
|
|
|
例:当前拉取的消息集为[x+2,x+7],当x+5时出现了异常,当前提交的偏移量为x+8,故障恢复后重新拉取消息也是从x+8开始,因此x+5到x+7之间的消息未消费。
|
|
|
|
|
+ 重复消费
|
|
|
|
|
例:当消费x+5时出现异常,故障恢复后重新从x+2开始,也就是x+2到x+4的数据重新消费了一遍。
|
|
|
|
|
|
|
|
|
|
# 三、rebalance
|
|
|
|
|
再均衡是指消费者数量发生变化时,分区所属权从一个消费者转移到另一个消费者的行为。再均衡发生期间,消费组内的消费者是无法读取消息的。一般情况下,尽量避免不必要的再均衡的发生。
|
|
|
|
|
|
|
|
|
|
# 四、多线程消费者
|
|
|
|
|
KafkaConsumer对象不是线程安全的,其中定义了一个acquire()方法用于检测当前是否只有一个线程在操作。
|
|
|
|
|
|
|
|
|
|
## 4.1 线程封闭
|
|
|
|
|
每个线程实例话一个KafkaConsumer消费者对象
|
|
|
|
|
![[Pasted image 20231124183943.png]]
|
|
|
|
|
这种方式的问题在于**消费线程数必须小于等于分区数**。多消费者的问题在于会浪费tcp连接,并且在有事务的情况下会很慢。
|
|
|
|
|
|
|
|
|
|
## 4.2 将数据消费和数据处理拆分
|
|
|
|
|
![[Pasted image 20231126105610.png]]
|
|
|
|
|
|
|
|
|
|
### 4.2.1 自动提交方式实现:
|
|
|
|
|
```java
|
|
|
|
|
public class KafkaConsumerDemo02 {
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(KafkaConsumerConf.getConf(), "test-01", 50, 1000, new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
|
|
kafkaConsumerThread.start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消费者线程
|
|
|
|
|
*/
|
|
|
|
|
private static class KafkaConsumerThread extends Thread {
|
|
|
|
|
|
|
|
|
|
private final KafkaConsumer<String, String> consumer;
|
|
|
|
|
|
|
|
|
|
private final ThreadPoolExecutor handlerPool;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消费者线程初始化
|
|
|
|
|
*
|
|
|
|
|
* @param conf
|
|
|
|
|
* @param topic
|
|
|
|
|
* @param threadNum
|
|
|
|
|
* @param queueSize
|
|
|
|
|
* @param handler
|
|
|
|
|
*/
|
|
|
|
|
public KafkaConsumerThread(Properties conf, String topic, int threadNum, int queueSize, RejectedExecutionHandler handler) {
|
|
|
|
|
consumer = new KafkaConsumer<>(conf);
|
|
|
|
|
consumer.subscribe(Collections.singleton(topic));
|
|
|
|
|
//消息处理线程池
|
|
|
|
|
handlerPool = new ThreadPoolExecutor(0, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
try {
|
|
|
|
|
while (true) {
|
|
|
|
|
ConsumerRecords<String, String> rt = consumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
|
|
|
|
|
if (rt != null) {
|
|
|
|
|
handlerPool.submit(new MessageHandler(rt));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
System.err.println("消费者异常,e=" + e.getMessage());
|
|
|
|
|
} finally {
|
|
|
|
|
consumer.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消息处理线程
|
|
|
|
|
*/
|
|
|
|
|
private static class MessageHandler extends Thread {
|
|
|
|
|
|
|
|
|
|
public ConsumerRecords<String, String> records;
|
|
|
|
|
|
|
|
|
|
public MessageHandler(ConsumerRecords<String, String> rt) {
|
|
|
|
|
records = rt;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
for (ConsumerRecord<String, String> record : records) {
|
|
|
|
|
System.out.println("topic=" + record.topic() + "##partition=" + record.partition() + "##offset=" + record.offset() + "##key=" + record.key() + "##value=" + record.value());
|
|
|
|
|
//手动GC
|
|
|
|
|
record = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 4.2.2 窗口滑动方式
|
|
|
|
|
|
|
|
|
|
# 五、消费者参数
|