1、maven 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
spring:kafka:bootstrap-servers: 192.168.1.3:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: testenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerserver:port: 8081
3.代码
生产者
package com.example.bootkafka;import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;/*** kafka生产者测试** @author chenye*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BootKafkaProducerApplication.class})
public class ProducerTest {@Autowired(required = false)private KafkaTemplate defaultKafkaTemplate;@Testpublic void testDefaultKafkaTemplate() {defaultKafkaTemplate.send("test_topic", "I`m send msg to default topic");}@Beforepublic void testBefore() {System.out.println("before");}@Afterpublic void testAfter() {System.out.println("after");}
}
消费者
package com.example.bootkafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者测试** @author chenye*/
@Component
public class ConsumerTest {@KafkaListener(topics = "test_topic")public void listen(ConsumerRecord<?, ?> record) {System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());}
}