使用CyclicBarrier控制Kafka多线程消费消息的位移提交问题

By | 2021年3月12日

使用CyclicBarrier控制Kafka多线程消费消息的位移提交问题

Kafka中消费者是线程不安全的,一个topic只能被一个消费组中的消费者消费,想要提高数据消费能力,可以增加分区数,因为消费者数可以和分区数进行对应,当消费者数大于分区数时,多余的消费者将处于空闲状态,或者也可以在每个线程中创建一个消费者实例,这样也可以对数据来处理,但创建多个消费者实例必然会造成资源的浪费。通过线程池来对数据进行消费,就会存在位移提交的问题,从而引发数据丢失或重复,所以对位移的提交要格外处理,消费者默认是定时提交位移信息的,如果需要手动提交,要先修改配置参数关闭自动提交,再通过代码里调用commitSync()方法。

由于多线程的不可控性,如果让每个线程单独来获取数据再提交位移,很有可能就会造成位移错位等问题,如何合理的控制线程之间任务处理和位移提交问题,这里采用CyclicBarrier工具类,它的本质是一种比较特别的锁,通过配置线程数,当到达指定线程数后再统一执行某些操作,这些特性很适合用来控制位移的提交,我们可以将拉取到的数据分配到线程池中,当所有线程都处理完成后,触发CyclicBarrier中的提交任务,进行一次提交,接着再分配下一轮的数据。

以下便是根据这一策略编写的代码,仅供参考,因为尚未在生产环境中使用,只是对这种思路提高一种实现方式,可能还存在些问题,如当这一批拉取的数据小于线程池中线程数该如何等待,会不会存在短板效应,导致其他线程完成后一直等待某个线程执行,造成短时间位移提交阻塞。

总体思路有点类似于滑动窗口,每次一批一批的处理,等待最后一个处理完成,再向前滑动。

发表评论

电子邮件地址不会被公开。 必填项已用*标注