当前位置: 代码迷 >> 综合 >> Kafka消费者高级API
  详细解决方案

Kafka消费者高级API

热度:94   发布时间:2023-09-14 15:46:49.0
优点:
  • 高级API 写起来简单
  • 不需要自行去管理offset,系统通过zookeeper自行管理
  • 不需要管理分区,副本等情况,系统自动管理
  • 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据;可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
缺点:
  • 不能自行控制offset(对于某些特殊需求来说)
  • 不能细化控制如分区、副本、zk等

实操案例:
1.创建控制台生产者

bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first

2.1创建消费者(①过时API)

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer {@SuppressWarnings("deprecation")public static void main(String[] args) {Properties properties = new Properties();properties.put("zookeeper.connect", "hadoop102:2181");properties.put("group.id", "g1");properties.put("zookeeper.session.timeout.ms", "500");properties.put("zookeeper.sync.time.ms", "250");properties.put("auto.commit.interval.ms", "1000");// 创建消费者连接器ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));HashMap<String, Integer> topicCount = new HashMap<>();topicCount.put("first", 1);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {System.out.println(new String(it.next().message()));}}
}

2.2创建消费者(②新API)
自动维护offset

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {public static void main(String[] args) {Properties props = new Properties();// 定义kakfa 服务的地址,不需要将所有broker指定上 props.put("bootstrap.servers", "hadoop102:9092");// 制定consumer group props.put("group.id", "test");// 是否自动确认offset props.put("enable.auto.commit", "true");// 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000");// key的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("first", "second","third"));while (true) {// 读取数据,读取超时时间为100ms ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}