配置文件
<!-- 定义consumer的参数 --><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="127.0.0。1" /><!--关闭自动提交,使用spring实现的提交方案--><entry key="enable.auto.commit" value="false" /><entry key="max.poll.records" value="10" /><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /><entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /></map></constructor-arg></bean><!-- 创建consumerFactory bean --><bean id="consumerFactory"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties" /></constructor-arg></bean><!-- 消息监听器,实际执行消息消费的类 --><bean id="myMsgListener" class="com.xiao.zhen.MyMsgListener" /><!-- 消费者容器配置参数 --><bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="topic.name"/><!-- 每接收到一条消息ack一次,可靠性最高 --><property name="ackMode" value="RECORD" /><property name="groupId" value="groupId" /><!-- 消息监听器 --><property name="messageListener" ref="myMsgListener" /></bean><!-- 消费者容器 --><bean id="kafkaMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"init-method="doStart"><constructor-arg ref="consumerFactory" /><constructor-arg ref="containerProperties" /><!-- 多线程消费 --><property name="concurrency" value="6" /></bean>
执行消费的类
@Service
public class myMsgListener implements MessageListerner<String,String> {@Overridepublic void omMessage(ConsumerRecord<String, String> consumerRecord) {try {自定义类 = JSON.parseObject(consumerRecord.value(), 自定义类.class);// 业务逻辑处理 ......} catch(Exception e) {}}}