当前位置: 代码迷 >> 综合 >> 【RocketMQ】 (三)定时消息
  详细解决方案

【RocketMQ】 (三)定时消息

热度:82   发布时间:2023-11-17 12:48:07.0

定时消息:

        是指消息发到 Broker 后,不能立刻被Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

        如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。 RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

 

生产者代码: 

/*** @author lucifer* @date 2020/4/9 0:12* @description 延时队列  生产者*/
public class ScheduledMessageProducer {public static final String NAME_SERVER_ADDR = "192.168.160.131:9876";public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {//1.创建生产者对象DefaultMQProducer defaultMQProducer = new DefaultMQProducer("GROUP_TEST");//2.设置NameServer的地址defaultMQProducer.setNamesrvAddr(NAME_SERVER_ADDR);//3.启动生产者defaultMQProducer.start();for (int i = 0; i < 10; i++) {String content = "Hello scheduled message " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date());Message message = new Message("TopicTest", content.getBytes(RemotingHelper.DEFAULT_CHARSET));//4.设置延时等级,此消息将在10秒后传递给消费者// 可以在broker服务器端自行配置messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmessage.setDelayTimeLevel(3);//5.发送消息SendResult result = defaultMQProducer.send(message);System.out.println("发送结果:"+result+",发送时间:"+ LocalTime.now());}//6.停止生产者defaultMQProducer.shutdown();}
}

生产者启动,控制台打印:

发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74AE0000, offsetMsgId=C0A8A08300002A9F00000000000133A9, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=80],发送时间:16:28:15.932
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74BC0001, offsetMsgId=C0A8A08300002A9F00000000000134D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=9], queueOffset=81],发送时间:16:28:15.934
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74BE0002, offsetMsgId=C0A8A08300002A9F00000000000135F9, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=10], queueOffset=82],发送时间:16:28:15.936
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74C10003, offsetMsgId=C0A8A08300002A9F0000000000013722, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=11], queueOffset=83],发送时间:16:28:15.938
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74C30004, offsetMsgId=C0A8A08300002A9F000000000001384B, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=12], queueOffset=84],发送时间:16:28:15.940
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74C40005, offsetMsgId=C0A8A08300002A9F0000000000013974, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=13], queueOffset=85],发送时间:16:28:15.942
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74C60006, offsetMsgId=C0A8A08300002A9F0000000000013A9D, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=14], queueOffset=86],发送时间:16:28:15.944
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74C80007, offsetMsgId=C0A8A08300002A9F0000000000013BC6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=15], queueOffset=87],发送时间:16:28:15.947
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74CC0008, offsetMsgId=C0A8A08300002A9F0000000000013CEF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=88],发送时间:16:28:15.950
发送结果:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2467B74CE0009, offsetMsgId=C0A8A08300002A9F0000000000013E17, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=89],发送时间:16:28:15.952
16:28:15.964 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:10911] result: true
16:28:15.967 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:9876] result: true

消费者代码: 

/*** @author lucifer* @date 2020/4/9 0:20* @description 延时队列 消费者*/
public class ScheduledMessageConsumer {public static final String NAME_SERVER_ADDR = "192.168.160.131:9876";public static void main(String[] args) throws MQClientException {//1.创建消费者(push)对象DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("GROUP_TEST");//2.设置NameServer的地址defaultMQPushConsumer.setNamesrvAddr(NAME_SERVER_ADDR);//3.订阅对应的主题和TagdefaultMQPushConsumer.subscribe("TopicTest","*");//4.注册消息接收到Broker消息后的处理接口defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);try {System.out.printf("%-25s 接收到新消息 --- %s %n", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者(必须在注册完消息监听器后启动,否则会报错)defaultMQPushConsumer.start();System.out.println("已启动消费者");}
}

 消费者启动,控制台打印:10秒后才打印出信息

2020-04-14 16:28:25.925   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:14.832 
2020-04-14 16:28:25.938   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.932 
2020-04-14 16:28:25.938   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.934 
2020-04-14 16:28:25.940   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.936 
2020-04-14 16:28:25.942   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.938 
2020-04-14 16:28:25.948   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.940 
2020-04-14 16:28:25.949   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.944 
2020-04-14 16:28:25.949   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.942 
2020-04-14 16:28:25.962   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.950 
2020-04-14 16:28:25.962   接收到新消息 --- Hello scheduled message 2020-04-14 16:28:15.948