KafkaProducer是线程安全的,然而 KafkaConsumer却是非线程安全的。 Kafka Consumer中定义了一个 acquire(方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 Concurrentmodifcationexception异常: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象,如图3-10所示。
/** * @Author: wcy * @Date: 2020/5/31 */ public class FirstMultiConsumerThreadDemo {
public static final String brokerList = "nas-cluster1:9092"; public static final String topic = "test.topic"; public static final String groupId = "group.demo";
public static Properties initConfig(){ Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer",StringDeserializer.class.getName()); return properties; }
public static void main(String[] args) { Properties props = initConfig(); int consumerThreadNum = 4; for (int i = 0; i < consumerThreadNum; i++) { new KafkaConsumerThread(props,topic).start(); } }
public static class KafkaConsumerThread extends Thread{ private KafkaConsumer<String,String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) { this.kafkaConsumer = new KafkaConsumer<>(props); this.kafkaConsumer.subscribe(Arrays.asList(topic)); }
@Override public void run() { try { while (true){ ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String,String> record : records){ //实现处理逻辑 System.out.println(record.value()); } } }catch (Exception e){ e.printStackTrace(); }finally { kafkaConsumer.close(); } } } }
/** * @Author: wcy * @Date: 2020/5/31 */ public class SecondMultiConsumerThreadDemo { public static final String brokerList = "nas-cluster1:9092"; public static final String topic = "test.topic"; public static final String groupId = "group.demo";
public static Properties initConfig(){ Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer",StringDeserializer.class.getName()); return properties; }
public static void main(String[] args) { Properties properties = initConfig(); KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties,topic, Runtime.getRuntime().availableProcessors()); consumerThread.start(); }
public static class KafkaConsumerThread extends Thread{ private KafkaConsumer<String,String> kafkaConsumer; private ExecutorService executorService; private int threadNumber;
public KafkaConsumerThread(Properties properties, String topic, int availableProcessors) { kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); this.threadNumber = availableProcessors; executorService = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override public void run() { try { while (true){ ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100)); if (!records.isEmpty()){ executorService.submit(new RecordsHandler(records)); } } }catch (Exception e){ e.printStackTrace(); }finally { kafkaConsumer.close(); } } } public static class RecordsHandler extends Thread{ public final ConsumerRecords<String,String> records;
public RecordsHandler(ConsumerRecords<String, String> records) { this.records = records; }
@Override public void run() { for (ConsumerRecord<String,String> record : records){ //实现处理逻辑 System.out.println(record.value()); } } } }