当前位置: 代码迷 >> 综合 >> RocketMQ 源码分析 —— Message 存储
  详细解决方案

RocketMQ 源码分析 —— Message 存储

热度:103   发布时间:2023-10-21 12:42:48.0

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-store/

1、概述

本文接《RocketMQ 源码分析 —— Message 发送与接收》。 主要解析 CommitLog 存储消息部分。

2、CommitLog 结构

CommitLogMappedFileQueueMappedFile 的关系如下:

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:


     
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd
/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l
total 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296


CommitLogMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。
  • MappedFileQueue :MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。
  • CommitLog :针对 MappedFileQueue 的封装使用。

CommitLog 目前存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

CommitLog 存储在 MappedFile的结构:

MESSAGE 在 CommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的地址+端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset   Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

BLANK 在 CommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 maxBlank 空白长度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

3、CommitLog 存储消息

CommitLog#putMessage(...)


     
1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
2: // Set the storage time
3: msg.setStoreTimestamp(System.currentTimeMillis());
4: // Set the message body BODY CRC (consider the most appropriate setting
5: // on the client)
6: msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
7: // Back to Results
8: AppendMessageResult result = null;
9:
10: StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
11:
12: String topic = msg.getTopic();
13: int queueId = msg.getQueueId();
14:
15: // 事务相关 TODO 待读:事务相关
16: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
17: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE //
18: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
19: // Delay Delivery
20: if (msg.getDelayTimeLevel() > 0) {
21: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
22: msg.setDelayTimeLevel( this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
23: }
24:
25: topic = ScheduleMessageService.SCHEDULE_TOPIC;
26: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
27:
28: // Backup real topic, queueId
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
30: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
31: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
32:
33: msg.setTopic(topic);
34: msg.setQueueId(queueId);
35: }
36: }
37:
38: long eclipseTimeInLock = 0;
39:
40: // 获取写入映射文件
41: MappedFile unlockMappedFile = null;
42: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
43:
44: // 获取写入锁
45: lockForPutMessage(); //spin...
46: try {
47: long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
48: this.beginTimeInLock = beginLockTimestamp;
49:
50: // Here settings are stored timestamp, in order to ensure an orderly
51: // global
52: msg.setStoreTimestamp(beginLockTimestamp);
53:
54: // 当不存在映射文件时,进行创建
55: if ( null == mappedFile || mappedFile.isFull()) {
56: mappedFile = this.mappedFileQueue.getLastMappedFile( 0); // Mark: NewFile may be cause noise
57: }
58: if ( null == mappedFile) {
59: log.error( "create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
60: beginTimeInLock = 0;
61: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
62: }
63:
64: // 存储消息
65: result = mappedFile.appendMessage(msg, this.appendMessageCallback);
66: switch (result.getStatus()) {
67: case PUT_OK:
68: break;
69: case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入
70: unlockMappedFile = mappedFile;
71: // Create a new file, re-write the message
72: mappedFile = this.mappedFileQueue.getLastMappedFile( 0);
73: if ( null == mappedFile) {
74: // XXX: warn and notify me
75: log.error( "create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
76: beginTimeInLock = 0;
77: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
78: }
79: result = mappedFile.appendMessage(msg, this.appendMessageCallback);
80: break;
81: case MESSAGE_SIZE_EXCEEDED:
82: case PROPERTIES_SIZE_EXCEEDED:
83: beginTimeInLock = 0;
84: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
85: case UNKNOWN_ERROR:
86: beginTimeInLock = 0;
87: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
88: default:
89: beginTimeInLock = 0;
90: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
91: }
92:
93: eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
94: beginTimeInLock = 0;
95: } finally {
96: // 释放写入锁
97: releasePutMessageLock();
98: }
99:
100: if (eclipseTimeInLock > 500) {
101: log.warn( "[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102: }
103:
104: //
105: if ( null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106: this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107: }
108:
109: PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
110:
111: // Statistics
112: storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113: storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114:
115: // 进行同步||异步 flush||commit
116: GroupCommitRequest request = null;
117: // Synchronization flush
118: if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119: final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
120: if (msg.isWaitStoreMsgOK()) {
121: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122: service.putRequest(request);
123: boolean flushOK = request.waitForFlush( this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124: if (!flushOK) {
125: log.error( "do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
126: + " client address: " + msg.getBornHostString());
127: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128: }
129: } else {
130: service.wakeup();
131: }
132: }
133: // Asynchronous flush
134: else {
135: if (! this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136: flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush
137: } else {
138: commitLogService.wakeup();
139: }
140: }
141:
142: // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步
143: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144: HAService service = this.defaultMessageStore.getHaService();
145: if (msg.isWaitStoreMsgOK()) {
146: // Determine whether to wait
147: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148: if ( null == request) {
149: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150: }
151: service.putRequest(request);
152:
153: service.getWaitNotifyObject().wakeupAll();
154:
155: boolean flushOK =
156: // TODO
157: request.waitForFlush( this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158: if (!flushOK) {
159: log.error( "do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
160: + msg.getTags() + " client address: " + msg.getBornHostString());
161: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162: }
163: }
164: // Slave problem
165: else {
166: // Tell the producer, slave not available
167: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168: }
169: }
170: }
171:
172: return putMessageResult;
173: }

  • 说明 :存储消息,并返回存储结果。
  • 第 2 行 :设置存储时间等。
  • 第 16 至 36 行 :事务消息相关,暂未了解。
  • 第 45 & 97 行 :获取锁与释放锁。
  • 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
  • 第 55 至 62 行 :获取 MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。
  • 第 65 行 :插入消息到 MappedFile,解析解析见:MappedFile#appendMessage(...)。
  • 第 69 至 80 行 :MappedFile 已满,创建新的,再次插入消息
  • 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
  • 第 143 至 173 行 :Broker 主从同步。后面的文章会详细解析
  相关解决方案