使用CyclicBarrier控制Kafka多线程消费消息的位移提交问题
Kafka中消费者是线程不安全的,一个topic只能被一个消费组中的消费者消费,想要提高数据消费能力,可以增加分区数,因为消费者数可以和分区数进行对应,当消费者数大于分区数时,多余的消费者将处于空闲状态,或者也可以在每个线程中创建一个消费者实例,这样也可以对数据来处理,但创建多个消费者实例必然会造成资源的浪费。通过线程池来对数据进行消费,就会存在位移提交的问题,从而引发数据丢失或重复,所以对位移的提交要格外处理,消费者默认是定时提交位移信息的,如果需要手动提交,要先修改配置参数关闭自动提交,再通过代码里调用commitSync()方法。 由于多线程的不可控性,如果让每个线程单独来获取数据再提交位移,很有可能就会造成位移错位等问题,如何合理的控制线程之间任务处理和位移提交问题,这里采用CyclicBarrier工具类,它的本质是一种比较特别的锁,通过配置线程数,当到达指定线程数后再统一执行某些操作,这些特性很适合用来控制位移的提交,我们可以将拉取到的数据分配到线程池中,当所有线程都处理完成后,触发CyclicBarrier中的提交任务,进行一次提交,接着再分配下一轮的数据。 以下便是根据这一策略编写的代码,仅供参考,因为尚未在生产环境中使用,只是对这种思路提高一种实现方式,可能还存在些问题,如当这一批拉取的数据小于线程池中线程数该如何等待,会不会存在短板效应,导致其他线程完成后一直等待某个线程执行,造成短时间位移提交阻塞。
总体思路有点类似于滑动窗口,每次一批一批的处理,等待最后一个处理完成,再向前滑动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class KafkaMultiThread {
private static final Logger LOG = LoggerFactory .getLogger(KafkaMultiThread.class);
public static void main(String[] args) { //这里创建一个消费者,使用外部的配置文件信息 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(new Properties()); //订阅主题 kafkaConsumer.subscribe(Arrays.asList("topic-test")); //需要创建的线程数,这里线程池数和CyclicBarrier的数量要一致 int threadNumber = 10; ExecutorService pool = Executors.newFixedThreadPool(threadNumber); CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNumber,()->{ LOG.info("一批数据处理完毕,统一提交位移"); kafkaConsumer.commitSync(); }); //轮询获取消息 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); //对拉取到一批数据分别放入线程池中 for (ConsumerRecord<String, String> record : records) { pool.execute(new KafkaHandle(record,cyclicBarrier)); } }
} }
/** * 单独处理的线程 */ class KafkaHandle implements Runnable{ private static final Logger LOG = LoggerFactory .getLogger(KafkaHandle.class);
ConsumerRecord<String, String> record; CyclicBarrier cyclicBarrier;
/** * 构造函数初始化 * @param record 消息内容 * @param cyclicBarrier 主线程中的CyclicBarrier */ public KafkaHandle(ConsumerRecord<String, String> record, CyclicBarrier cyclicBarrier) { this.record = record; this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { //模拟处理已分配的消息 LOG.info("offset = %d, value = %s", record.offset(), record.value()); //这里假设需要将数据转换为json Map<String, Object> json = JSONUtil.parseObj(record.value()); LOG.info(String.valueOf(json.size())); try { //处理完毕后进入等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }
|