文章目录
- 简介
- kafka消费机制
- 消费时出现几种异常情况
- 自动提交
- 手动提交
- 精确一次消费实现
- 总结
简介
在使用kafka时,大多数场景对于数据少量的不一致(重复或者丢失)并不关注,比如日志,因为不会影响最终的使用或者分析,但是在某些应用场景(比如业务数据),需要对任何一条消息都要做到精确一次的消费,才能保证系统的正确性,kafka并不提供准确一致的消费API,需要我们在实际使用时借用外部的一些手段来保证消费的精确性,下面我们介绍如何实现
kafka消费机制
这篇文章KafkaConsumer使用介绍、参数配置介绍了如何kafka具有两种提交offset(消费偏移量)方式,我们在Kafka简介以及安装和使用可知每个分区具备一offset记录消费位置,如果消费者一直处于正常的运行转态,那么offset将没有什么用处,因为正常消费时,consumer记录了本次消费的offset和下一次将要进行poll数据的offset起始位置,但是如果消费者发生崩溃或者有新的消费者加入消费者组,就会触发再均衡Rebalance,Rebalance之后,每个消费者将会分配到新的分区,而消费者对于新的分区应该从哪里进行起始消费,这时候提交的offset信息就起作用了,提交的offset信息包括消费者组所有分区的消费进度,这时候消费者可以根据消费进度继续消费,提交offset提交自动提交是最不具确定性的,所以要使用手动提交来控制offset
消费时出现几种异常情况
自动提交
- 重复消费:当数据已经被处理,然后自动提交offset时消费者出现故障或者有新消费者加入组导致再均衡,这时候offset提交失败,导致这批已经处理的数据的信息没有记录,后续会重复消费一次
- 丢失数据:如果业务处理时间较长一点,这时候数据处理业务还未完成,offset信息已经提交了,但是在后续处理数据过程中程序发生了崩溃,导致这批数据未正常消费,这时候offset已经提交,消费者后续将不在消费这批数据,导致这批数据将会丢失
手动提交
- 重复消费(最少一次消费语义实现):消费数据处理业务完成后进行offset提交,可以保证数据最少一次消费,因为在提交offset的过程中可能出现提交失败的情况,导致数据重复消费
代码实现
package com.huawei.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;/*** @author: xuqiangnj@163.com* @date: 2019/5/3 13:13* @description:同步结合异步方式提交*/
public class SyncAndAsyncCommitConsumer {private static Properties props = new Properties();static {props.put("bootstrap.servers", "192.168.142.139:9092");props.put("group.id", "Test");props.put("enable.auto.commit", false);//注意这里设置为手动提交方式props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");}final KafkaConsumer<String, String> consumer;private volatile boolean isRunning = true;public SyncAndAsyncCommitConsumer(String topicName) {consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {//Rebalance之前执行@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {//确保在Rebalance的时候也能提交成功consumer.commitSync();}//重新分配分区时执行@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});}//这里使用异步提交和同步提交的组合方式public void receiveMsg() {try {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {/*System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +" Msg:" + consumerRecord.value());*///进行逻辑处理//handle(consumerRecord);}consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)-> {//通过回调函数,查看提交结果if (exception != null){consumer.commitSync(offsets);//如果发生异常,尝试重试}});//正常情况使用异步提交,提升性能,因为同步提交需要等待提交结果}}}catch (Exception e){//处理异常}finally {//在consumer关闭之前进行同步提交,保证所有offset在程序退出之前提交一次consumer.commitSync();close();}}public void close() {isRunning = false;if (consumer != null) {consumer.close();}}
}
丢失数据(最多一次消费语义实现):在消费数据业务处理前进行offset提交,可以保证最多一次消费,在后续数据业务处理程序出现故障,将导致数据丢失
代码实现
package com.huawei.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;/*** @author: xuqiangnj@163.com* @date: 2019/5/3 13:13* @description:异步方式提交*/
public class CommitBeforeHandleConsumer {private static Properties props = new Properties();static {props.put("bootstrap.servers", "192.168.142.139:9092");props.put("group.id", "Test");props.put("enable.auto.commit", false);//注意这里设置为手动提交方式props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");}final KafkaConsumer<String, String> consumer;private volatile boolean isRunning = true;public CommitBeforeHandleConsumer(String topicName) {consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName));}//这里使用异步提交和同步提交的组合方式public void receiveMsg() {try {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {consumer.commitAsync();//在业务处理前提交offsetfor (ConsumerRecord<String, String> consumerRecord : consumerRecords) {/*System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +" Msg:" + consumerRecord.value());*///进行逻辑处理//handle(consumerRecord);}}}}catch (Exception e){//处理异常}finally {close();}}public void close() {isRunning = false;if (consumer != null) {consumer.close();}}
}
精确一次消费实现
从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性
- 数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果
- 存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重
- 数据无状态,并且存储容器不具备幂等:这种场景需要自行控制offset的准确性,今天文章主要说明这种场景下的处理方式,这里数据不具备状态,存储使用关系型数据库,比如MySQL
这里简单说明一下实现思路
1)利用consumer api的seek方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费
2)关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行
3)使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑
记录kafka信息表设计
create table kafka_info(topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便通过这个主键进行更新提升效率 topic_group varchar(30), //主题和组名partition_num tinyint,//分区号offsets bigint default 0 //offset信息
);
package com.huawei.kafka.consumer;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.*;/*** @author: xuqiangnj@163.com* @date: 2019/5/3 14:36* @description:精确一次消费实现*/
public class AccurateConsumer {private static final Properties props = new Properties();private static final String GROUP_ID = "Test";static {props.put("bootstrap.servers", "192.168.142.139:9092");props.put("group.id", GROUP_ID);props.put("enable.auto.commit", false);//注意这里设置为手动提交方式props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");}final KafkaConsumer<String, String> consumer;//用于记录每次消费时每个partition的最新offsetprivate Map<TopicPartition, Long> partitionOffsetMap;//用于缓存接受消息,然后进行批量入库private List<Message> list;private volatile boolean isRunning = true;private final String topicName;private final String topicNameAndGroupId;public AccurateConsumer(String topicName) {this.topicName = topicName;topicNameAndGroupId = topicName + "_" + GROUP_ID;consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topicName), new HandleRebalance());list = new ArrayList<>(100);partitionOffsetMap = new HashMap<>();}//这里使用异步提交和同步提交的组合方式public void receiveMsg() {try {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {for (TopicPartition topicPartition : consumerRecords.partitions()) {List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);for (ConsumerRecord<String, String> record : records) {//使用fastjson将记录中的值转换为Message对象,并添加到list中list.addAll(JSON.parseArray(record.value(), Message.class));}//将partition对应的offset信息添加到map中,入库时将offset-partition信息一起进行入库partitionOffsetMap.put(topicPartition, records.get(records.size() - 1).offset() + 1);//记住这里一定要加1,因为下次消费的位置就是从+1的位置开始}}//如果list中存在有数据,则进行入库操作if (list.size() > 0) {boolean isSuccess = insertIntoDB(list, partitionOffsetMap);if (isSuccess) {//将缓存数据清空,并将offset信息清空list.clear();partitionOffsetMap.clear();}}}} catch (Exception e) {//处理异常} finally {//offset信息由我们自己保存,提交offset其实没有什么必要//consumer.commitSync();close();}}private boolean insertIntoDB(List<Message> list, Map<TopicPartition, Long> partitionOffsetMap) {Connection connection = getConnection();//获取数据库连接 自行实现boolean flag = false;try {//设置手动提交,让插入数据和更新offset信息在一个事务中完成connection.setAutoCommit(false);insertMessage(list);//将数据进行入库 自行实现updateOffset(partitionOffsetMap);//更新offset信息 自行实现connection.commit();flag = true;} catch (SQLException e) {try {//出现异常则回滚事务connection.rollback();} catch (SQLException e1) {//处理异常}}return flag;}//获取数据库连接 自行实现private Connection getConnection() {return null;}public void close() {isRunning = false;if (consumer != null) {consumer.close();}}private class HandleRebalance implements ConsumerRebalanceListener {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {//发生Rebalance时,只需要将list中数据和记录offset信息清空即可//这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库,//并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复list.clear();partitionOffsetMap.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//获取对应Topic的分区数List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size());//在分配分区时指定消费位置for (TopicPartition partition : partitions) {//如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费if (partitionOffsetMapFromDB.get(partition) != null) {consumer.seek(partition, partitionOffsetMapFromDB.get(partition));} else {consumer.seek(partition, 0L);}}}}/*** 从数据库中查询分区和offset信息* @param size 分区数量* @return 分区号和offset信息*/private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();//从数据库中查询出对应信息Connection connection = getConnection();//获取数据库连接 自行实现PreparedStatement preparedStatement = null;ResultSet resultSet = null;String querySql = "SELECT partition_num,offsets from kafka_info WHERE topic_group = ?";try {preparedStatement = connection.prepareStatement(querySql);preparedStatement.setString(1, topicNameAndGroupId);resultSet = preparedStatement.executeQuery();while (resultSet.next()) {partitionOffsetMapFromDB.put(new TopicPartition(topicName, resultSet.getInt(1)),resultSet.getLong(2));}//判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化if (partitionOffsetMapFromDB.size() < size) {connection.setAutoCommit(false);StringBuilder sqlBuilder = new StringBuilder();//partition分区号是从0开始,如果有10个分区,那么分区号就是0-9/*这里拼接插入数据 格式 INSERT INTO kafka_info(topic_group_partition,topic_group,partition_num) VALUES(topicNameAndGroupId_0,topicNameAndGroupId,0),(topicNameAndGroupId_1, topicNameAndGroupId,1)....*/for (int i = 0; i < size; i++) {sqlBuilder.append("(").append(topicNameAndGroupId).append("_").append(i).append(",").append(topicNameAndGroupId).append(",").append(i).append("),");}//将最后一个逗号去掉加上分号结束sqlBuilder.deleteCharAt(sqlBuilder.length() - 1).append(";");preparedStatement = connection.prepareStatement("INSERT INTO kafa_info" +"(topic_group_partition,topic_group,partition_num) VALUES " + sqlBuilder.toString());preparedStatement.execute();connection.commit();}} catch (SQLException e) {//处理异常 回滚事务 这里应该结束程序 排查错误try {connection.rollback();} catch (SQLException e1) {//打印日志 排查错误信息}} finally {try {if (resultSet != null) {resultSet.close();}if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}} catch (SQLException e) {//处理异常 打印日志即可 关闭资源失败}}return partitionOffsetMapFromDB;}
}
package com.huawei.kafka.consumer;/*** @author: xuqiangnj@163.com* @date: 2019/5/3 14:07* @description: Message对象 这里模拟数据*/
public class Message {private String id;private String name;private Syring desc;private Date time;//get set toString 方法省略
}
这种实现方式对于以下故障场景测试通过,虽然不能说所有故障场景均可以保证精确一次消费,但目前基本覆盖大部分故障场景
- 一个消费者组中的某个消费者频繁加入组或离开组
- 直接kill消费应用程序
- 故障kafka集群中某个节点
- 故障客户端网络,使其不能连接kafka server端
- 直接重启应用程序所在虚拟机
总结
这里主要使用自己管理offset的方式来确保数据和offset信息是同时变化的,通过数据库事务的特性来保证一致性和原子性。如果有不清楚的地方可以留言随时交流,或者有更好的实现方式也请不吝赐教