当前位置: 代码迷 >> 综合 >> rocketMq-broker篇之消息存储IO
  详细解决方案

rocketMq-broker篇之消息存储IO

热度:25   发布时间:2023-11-14 22:23:25.0

RocketMQ文件存储的基础:MappedFile和MappedFileQueue

RocketMQ中的MappedFile类对应一个文件的mmap映射,是RocketMQ实现高效存储的基础。本文先介绍零拷贝的相关背景,再介绍RocketMQ中mmap的应用,为后面介绍MessageStore做过渡。

1. 零拷贝

零拷贝(zero copy)指的是当拷贝发生时,CPU并不参与实际的拷贝过程。CPU可以切换到其他线程,数据的拷贝过程异步进行,异步过程通常要由硬件DMA实现。常用的零拷贝有 mmap 和 sendFile。
?

采用传统的读写操作将磁盘中的数据发送到网络中,通常经历2次用户态/内核态的切换,并且读和写操作CPU分别要参与一次拷贝过程。
?

我们知道,操作系统设置了用户态和内核态两种状态,用户态想要获取系统资源(例如访问硬盘), 必须通过系统调用进入到内核态, 由内核态获取到系统资源,再切换回用户态返回应用程序。同时,操作系统在内核态中也增加了一个"内核缓冲区"(kernel buffer),读取数据时并不是直接把数据读取到应用程序的缓存区(app buffer), 而是先读取到内核缓冲区, 再由内核缓冲区复制到应用程序的缓存区。
?

传统的IO读写过程,数据会在用户态和内核态之间发送多次复制和多次上下文切换,如下:
在这里插入图片描述

主要流程为:
?

下面说说他们的步骤:

read 调用导致用户态到内核态的一次变化,同时,第一次复制开始:DMA(Direct Memory Access,直接内存存取,即不使用 CPU 拷贝数据到内存,而是 DMA 引擎传输数据到内存,用于解放 CPU) 引擎从磁盘读取index.html文件,并将数据放入到内核缓冲区。
发生第二次数据拷贝,即:将内核缓冲区的数据拷贝到用户缓冲区,同时,发生了一次用内核态到用户态的上下文切换。
发生第三次数据拷贝,我们调用 write 方法,系统将用户缓冲区的数据拷贝到 Socket 缓冲区。此时,又发生了一次用户态到内核态的上下文切换。
第四次拷贝,数据异步的从 Socket 缓冲区,使用 DMA 引擎拷贝到网络协议引擎。这一段,不需要进行上下文切换。
write 方法返回,再次从内核态切换到用户态。
?
很明显,第2次和第3次的copy只是把数据复制到app buffer又原封不动的复制回来, 为此带来了两次的cpu copy和两次上下文切换, 是完全没有必要的。

1.1 PageCache

pagecache是文件系统层级的缓存,从磁盘里读取的内容是存储到这里,这样程序读取磁盘内容就会非常快。page cache的大小为一页,通常为4K。
在这里插入图片描述

Java里的write写操作都是写到PageCache里便认为逻辑落盘成功,后续操作是通过操作系统把它刷到磁盘文件上,force操作可以前置将pageCache中的数据落盘,如果需要同步操作,需要受用force
?

1.2. sendfile

Linux提供sendfile技术。Kafka中,transferFrom和transferTo方法。
只要2次切换,2次拷贝。
用户态切换到内核态,DMA 引擎从文件拷贝到内核缓冲区,同时从内环缓冲区拷贝一些offset和length数据到socket缓冲区
从内核态切换到用户态,从内核缓冲区直接把数据拷贝到网络协议引擎里去,同时从Socket缓冲区拷贝一些offset和length信息到网络协议引擎里去
offset和length量几乎可以忽略。

这个系统调用可以在内核态内把数据从内核缓冲区直接复制到套接字(SOCKET)缓冲区内, 从而可以减少上下文的切换和不必要数据的复制,如下:
在这里插入图片描述

涉及到数据拷贝变成:
硬盘拷贝到内核缓冲区(DMA COPY)
内核缓冲区拷贝到socket缓冲区(CPU COPY)
socket缓冲区拷贝到网卡的buf(DMA COPY)
sendfile只能将数据从文件传递到套接字上,反之则不行。 使用sendfile不仅减少了数据拷贝的次数,还减少了上下文切换。
?

Java类库通过java.nio.channels.FileChannel的transgerTo方法支持零拷贝。而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。RocketMQ在涉及到网络传输的地方也使用了该方法。
?

FileChannel本身不是基于零拷贝实现的,而是是基于块来实现的。FileChannel配合着ByteBuffer,将读写的数据缓存到内存中,然后以批量/缓存的方式read/write,省去了非批量操作时的重复中间操作,操纵大文件时可以显著提高效率。FileChannel的write方法将数据写入PageCache后就认为落盘了,最终还是要操作系统完成PageCache到磁盘的最终写入。FileChannel的force方法则是用于通知操作系统进行及时的刷盘。

1.3. mmap

?mmap,内存映射,直接将磁盘文件数基于DMA引擎拷贝据映射到内核缓冲区,同时用户缓冲区是跟内核缓冲区共享一块映射数据,建立映射后,不需要从内核缓冲区拷贝到用户缓冲区。可减少一次拷贝。总共是4次切换,3次拷贝。

void mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);
mmap通过内存映射文件的方法,将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用read,write等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。如下图所示:
在这里插入图片描述

如上,每个进程都有独立的进程地址空间,通过页表和MMU,可将虚拟地址转换为物理地址,每个进程都有独立的页表数据,这可解释为什么两个不同进程相同的虚拟地址,却对应不同的物理地址。
?

mmap内存映射的实现过程,总的来说可以分为三个阶段:
?

进程启动映射过程,并在虚拟地址空间中为映射创建虚拟映射区域
调用内核空间的系统调用函数mmap(不同于用户空间函数),实现文件物理地址和进程虚拟地址的一一映射关系
进程发起对这片映射空间的访问,引发缺页异常,实现文件内容到物理内存(主存)的拷贝
前两个阶段仅在于创建虚拟区间并完成地址映射,但是并没有将任何文件数据的拷贝至主存。真正的文件读取是当进程发起读或写操作时。
?

完成上面步骤后,目前只是建立了地址映射,真正的硬盘数据还没有拷贝到内存中。这时候进程的读或写操作访问虚拟地址空间这一段映射地址时,通过查询页表,会发现这一段地址并不在物理页面上,因此将引发缺页异常。缺页异常会进行一系列判断,确定无非法操作后,会触发内核发起请求调页过程。
?

调页过程先在交换缓存空间(swap cache)中寻找需要访问的内存页,如果没有则调用nopage函数把所缺的页从磁盘装入到主存中,之后进程便可对这片主存进行读或者写的操作。如果写操作改变了其内容,一定时间后系统会自动回写脏页面到对应磁盘地址,也即完成了写入到文件的过程。
?

修改过的脏页面并不会立即更新回文件中,而是有一段时间的延迟,可以调用msync()来强制同步, 这样所写的内容就能立即保存到文件里了。
?

Java中的MappedByteBuffer便是对应mmap实现,在Java中一次只能映射1.5~2G的文件内存。其中MappedByteBuffer的put方法实际上是将数据写到了虚拟内存,而虚拟内存是依赖于操作系统的定时刷盘的。可以手动通过MappedByteBuffer的force方法来手动控制刷盘。
而不再需要使用fileChannel的read和write
demo

package javaio;import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Scanner;public class MappedByteBufferTest {
    public static void main(String[] args) {
    File file = new File("D://data.txt");long len = file.length();byte[] ds = new byte[(int) len];try {
    MappedByteBuffer mappedByteBuffer = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len);for (int offset = 0; offset < len; offset++) {
    byte b = mappedByteBuffer.get();ds[offset] = b;}Scanner scan = new Scanner(new ByteArrayInputStream(ds)).useDelimiter(" ");while (scan.hasNext()) {
    System.out.print(scan.next() + " ");}} catch (IOException e) {
    }}
}

1.4. mmap和sendFile的区别

mmap与sendFile区别
mmap 用于文件共享,很少用于socket操作,sendfile用于发送文件.
mmap 适合小数据量读写,sendFile 适合大文件传输。
mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 2 次上下文切换,最少 2 次数据拷贝。
sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。
mmap和共享内存的区别:
mmap是共享一个文件,共享内存是共享一段内存。mmap还可以写回到file.

mmap缺点:
mmap 每次读入都是1页即4k,所以少于4k会造成大量内存碎片. 但是通过read,write也是这样的。

mmap适用场景,是取代read,write 文件.

使用mmap+write方式
优点:即使频繁调用,使用小文件块传输,效率也很高
缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题

使用sendfile方式
优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO

rocketMQ 在消费消息时,使用了 mmap,因为小块数据传输比sendFile好。kafka 使用了 sendFile。下面为网上别人测试得出的使用mmap和FileChannel的结果
?

在这里插入图片描述

虽然FileChanel在读写时不支持零拷贝,但是基于ByteBuff缓冲,在数据量稍大时是有一定优势的。
mmap不仅具有内存共享,减少读取拷贝次数的功能,还具有,写功能这是sendFile不具备的
注意 mmap在读数据的时候是零拷贝,把读到的共享内存的数据写入到socket中间经过3次拷贝,应用层与内核拷贝次数为0,注意,在写数据时显然不可能零拷贝,因为写入的buff是在堆内创建的,或者堆外内存。
堆外内存:
在JAVA中,JVM内存指的是堆内存。
机器内存中,不属于堆内存的部分即为堆外内存。
堆外内存也被称为直接内存。

public class UnsafeTest {
    public static void main(String[] args) {
    Unsafe unsafe = Unsafe.getUnsafe();unsafe.allocateMemory(1024);unsafe.reallocateMemory(1024, 1024);unsafe.freeMemory(1024);}
}

堆外内存也是在用户空间的内存,只是不在jvm中罢了,可以减少一次jvm到堆外内存的拷贝

2. MappedFile

RocketMQ中的MappedFile类对应一个文件的mmap映射,提供了初始化、追加写、刷盘、清除、数据预热等功能。MappedFile支持同时使用mmap和FileChannel操作文件。两者同时使用时,mmap用于读,FileChannel则用于写,默认情况下只使用mmap进行读写。
?

定义如下:

public class MappedFile extends ReferenceResource {
    
}

ReferenceResource是一个引用技术的抽象类,因此可以看出来MappedFile也是支持引用计数的
?

2.1 初始化(init)

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;ensureDirOK(this.file.getParent());try {
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {
    log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {
    log.error("Failed to map file " + this.fileName, e);throw e;} finally {
    if (!ok && this.fileChannel != null) {
    this.fileChannel.close();}}}

通过RandomAccessFile获取文件的FileChannel对象,然后得到MappedByteBuffer对象,对应一个mmap映射。如果提供了TransientStorePool对象,则相当于打开了FileChannel用于写的开关。会从堆外内存获取一块写缓冲区(writeBuffer),对于数据的写操作,会在数据写到FileChannel前会先写到writeBuffer,等到数据满了再写入FileChannel。增加缓冲区是为加大一次写入的数据量,如上面数据反映。反之,如果没有提供,则文件的读写操作都基于mmap。
?

2.2. 追加写(append)

  public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;assert cb != null;int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBrokerInner) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}

包括commitLog,index,consumerQueue都可通过此方法追加文件内容
?

org.apache.rocketmq.store.MappedFile#appendMessage(byte[])
直接物理追加写:通过FileChannel直接写入byte[]

public boolean appendMessage(final byte[] data) {
    int currentPos = this.wrotePosition.get();if ((currentPos + data.length) <= this.fileSize) {
    try {
    this.fileChannel.position(currentPos);this.fileChannel.write(ByteBuffer.wrap(data));} catch (Throwable e) {
    log.error("Error occurred when append message to mappedFile.", e);}this.wrotePosition.addAndGet(data.length);return true;}return false;}

回调追加写:以AppendMessageCallback回调的方式写入,传给AppendMessageCallback的是FileChannel的写缓冲区或者MappedByteBuffer。
追加写不是线程安全的,需要上层自己控制。

 public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>// PHY OFFSETlong wroteOffset = fileFromOffset + byteBuffer.position();int sysflag = msgInner.getSysFlag();int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);this.resetByteBuffer(storeHostHolder, storeHostLength);String msgId;if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
    msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);} else {
    msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);}// Record ConsumeQueue informationkeyBuilder.setLength(0);keyBuilder.append(msgInner.getTopic());keyBuilder.append('-');keyBuilder.append(msgInner.getQueueId());String key = keyBuilder.toString();Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {
    queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}/*** Serialize message*/final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;if (propertiesLength > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long. length={}", propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);}final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength = topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);// Exceeds the maximum messageif (msgLen > this.maxMessageSize) {
    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength+ ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}// Initialization of storage spacethis.resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.resetByteBuffer(bornHostHolder, bornHostLength);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.resetByteBuffer(storeHostHolder, storeHostLength);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;}

先获取到写入偏移量

// Store the message contentprivate final ByteBuffer msgStoreItemMemory;

看下这个初始化

  DefaultAppendMessageCallback(final int size) {
    this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);this.maxMessageSize = size;}

初始化为一条消息的最大4kb
把消息的各个部分写入msgStoreItemMemory
最后将msgStoreItemMemory写入byteuf

byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

2.3. 提交缓冲内容(commit)

将FileChannel的写缓冲区中的数据提交到FileChannel中,然后归还缓冲区到TransientStorePool,最后将缓冲区置为空,后续写操作使用mmap。条件包括:
?

写缓冲区为空直接返回
写入位置等于文件大小(文件满了但缓冲区数据没同步)
指定最少提交页数(一页大小为4K)时,看是否有大于指定值的页数没提交,如果符合则提交缓冲区内容
?

 protected boolean isAbleToCommit(final int commitLeastPages) {
    int flush = this.committedPosition.get();int write = this.wrotePosition.get();if (this.isFull()) {
    return true;}if (commitLeastPages > 0) {
    return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;}return write > flush;}

写入位置是否大于已刷盘的位置,是则提交缓冲区内容
org.apache.rocketmq.store.MappedFile#commit0

  protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();int lastCommittedPosition = this.committedPosition.get();if (writePos - this.committedPosition.get() > 0) {
    try {
    ByteBuffer byteBuffer = writeBuffer.slice();byteBuffer.position(lastCommittedPosition);byteBuffer.limit(writePos);this.fileChannel.position(lastCommittedPosition);this.fileChannel.write(byteBuffer);this.committedPosition.set(writePos);} catch (Throwable e) {
    log.error("Error occurred when commit data to FileChannel.", e);}}}

2.4. 刷盘(flush)

调用FileChannel的force方法或者MappedByteBuffer的force方法进行强制刷盘,只调用其中一种方法。条件同提交缓冲内容的条件一致。

  public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
    if (this.hold()) {
    int value = getReadPosition();try {
    //We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);} else {
    this.mappedByteBuffer.force();}} catch (Throwable e) {
    log.error("Error occurred when force data to disk.", e);}this.flushedPosition.set(value);this.release();} else {
    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();}

2.5. flush和commit的关系

1.使用fileChannel读写 commit是写入到fileChannel,但是不保证落盘,但是flush中的fileChannel,force保证落盘
2.使用mappedByteBuf读写,即mmap的方式,读取数据是零拷贝,减少了到用户空间到内核空间的拷贝,但是写操作是不可避免的要发生拷贝的
?

2.6. 随机读(selectMappedBuffer)

使用mmap方式,返回从给定位置起指定大小的数据,数据以ByteBuffer的形式返回。
[

](https://blog.csdn.net/weixin_42509236/article/details/113366603)

3.MappedFileQueue

顾名思义,以队列的形式,将多个MappedFile组成一个无限的大文件对外提供服务。
定义如下:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.store;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;private final String storePath;private final int mappedFileSize;private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();private final AllocateMappedFileService allocateMappedFileService;private long flushedWhere = 0;private long committedWhere = 0;private volatile long storeTimestamp = 0;public MappedFileQueue(final String storePath, int mappedFileSize,AllocateMappedFileService allocateMappedFileService) {
    this.storePath = storePath;this.mappedFileSize = mappedFileSize;this.allocateMappedFileService = allocateMappedFileService;}public void checkSelf() {
    if (!this.mappedFiles.isEmpty()) {
    Iterator<MappedFile> iterator = mappedFiles.iterator();MappedFile pre = null;while (iterator.hasNext()) {
    MappedFile cur = iterator.next();if (pre != null) {
    if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
    LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",pre.getFileName(), cur.getFileName());}}pre = cur;}}}public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return null;for (int i = 0; i < mfs.length; i++) {
    MappedFile mappedFile = (MappedFile) mfs[i];if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
    return mappedFile;}}return (MappedFile) mfs[mfs.length - 1];}private Object[] copyMappedFiles(final int reservedMappedFiles) {
    Object[] mfs;if (this.mappedFiles.size() <= reservedMappedFiles) {
    return null;}mfs = this.mappedFiles.toArray();return mfs;}public void truncateDirtyFiles(long offset) {
    List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();for (MappedFile file : this.mappedFiles) {
    long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;if (fileTailOffset > offset) {
    if (offset >= file.getFileFromOffset()) {
    file.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {
    file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);}void deleteExpiredFile(List<MappedFile> files) {
    if (!files.isEmpty()) {
    Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {
    MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {
    iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {
    if (!this.mappedFiles.removeAll(files)) {
    log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {
    log.error("deleteExpiredFile has exception.", e);}}}public boolean load() {
    File dir = new File(this.storePath);File[] files = dir.listFiles();if (files != null) {
    // ascending orderArrays.sort(files);for (File file : files) {
    if (file.length() != this.mappedFileSize) {
    log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {
    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {
    log.error("load file " + file + " error", e);return false;}}}return true;}public long howMuchFallBehind() {
    if (this.mappedFiles.isEmpty())return 0;long committed = this.flushedWhere;if (committed != 0) {
    MappedFile mappedFile = this.getLastMappedFile(0, false);if (mappedFile != null) {
    return (mappedFile.getFileFromOffset() + mappedFile.getWrotePosition()) - committed;}}return 0;}public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {
    createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) {
    createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {
    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {
    try {
    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {
    log.error("create mappedFile exception", e);}}if (mappedFile != null) {
    if (this.mappedFiles.isEmpty()) {
    mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}public MappedFile getLastMappedFile(final long startOffset) {
    return getLastMappedFile(startOffset, true);}public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {
    try {
    mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {
    //continue;} catch (Exception e) {
    log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}public boolean resetOffset(long offset) {
    MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast != null) {
    long lastOffset = mappedFileLast.getFileFromOffset() +mappedFileLast.getWrotePosition();long diff = lastOffset - offset;final int maxDiff = this.mappedFileSize * 2;if (diff > maxDiff)return false;}ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();while (iterator.hasPrevious()) {
    mappedFileLast = iterator.previous();if (offset >= mappedFileLast.getFileFromOffset()) {
    int where = (int) (offset % mappedFileLast.getFileSize());mappedFileLast.setFlushedPosition(where);mappedFileLast.setWrotePosition(where);mappedFileLast.setCommittedPosition(where);break;} else {
    iterator.remove();}}return true;}public long getMinOffset() {
    if (!this.mappedFiles.isEmpty()) {
    try {
    return this.mappedFiles.get(0).getFileFromOffset();} catch (IndexOutOfBoundsException e) {
    //continue;} catch (Exception e) {
    log.error("getMinOffset has exception.", e);}}return -1;}public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {
    return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}return 0;}public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {
    return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();}return 0;}public long remainHowManyDataToCommit() {
    return getMaxWrotePosition() - committedWhere;}public long remainHowManyDataToFlush() {
    return getMaxOffset() - flushedWhere;}public void deleteLastMappedFile() {
    MappedFile lastMappedFile = getLastMappedFile();if (lastMappedFile != null) {
    lastMappedFile.destroy(1000);this.mappedFiles.remove(lastMappedFile);log.info("on recover, destroy a logic mapped file " + lastMappedFile.getFileName());}}public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {
    Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return 0;int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {
    for (int i = 0; i < mfsLength; i++) {
    MappedFile mappedFile = (MappedFile) mfs[i];long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
    if (mappedFile.destroy(intervalForcibly)) {
    files.add(mappedFile);deleteCount++;if (files.size() >= DELETE_FILES_BATCH_MAX) {
    break;}if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
    try {
    Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {
    }}} else {
    break;}} else {
    //avoid deleting files in the middlebreak;}}}deleteExpiredFile(files);return deleteCount;}public int deleteExpiredFileByOffset(long offset, int unitSize) {
    Object[] mfs = this.copyMappedFiles(0);List<MappedFile> files = new ArrayList<MappedFile>();int deleteCount = 0;if (null != mfs) {
    int mfsLength = mfs.length - 1;for (int i = 0; i < mfsLength; i++) {
    boolean destroy;MappedFile mappedFile = (MappedFile) mfs[i];SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);if (result != null) {
    long maxOffsetInLogicQueue = result.getByteBuffer().getLong();result.release();destroy = maxOffsetInLogicQueue < offset;if (destroy) {
    log.info("physic min offset " + offset + ", logics in current mappedFile max offset "+ maxOffsetInLogicQueue + ", delete it");}} else if (!mappedFile.isAvailable()) {
     // Handle hanged file.log.warn("Found a hanged consume queue file, attempting to delete it.");destroy = true;} else {
    log.warn("this being not executed forever.");break;}if (destroy && mappedFile.destroy(1000 * 60)) {
    files.add(mappedFile);deleteCount++;} else {
    break;}}}deleteExpiredFile(files);return deleteCount;}public boolean flush(final int flushLeastPages) {
    boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {
    long tmpTimeStamp = mappedFile.getStoreTimestamp();int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {
    this.storeTimestamp = tmpTimeStamp;}}return result;}public boolean commit(final int commitLeastPages) {
    boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {
    int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}return result;}/*** Finds a mapped file by offset.** @param offset Offset.* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).*/public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
    MappedFile firstMappedFile = this.getFirstMappedFile();MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {
    if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {
    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {
    targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {
    }if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
    return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {
    if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
    return tmpMappedFile;}}}if (returnFirstOnNotFound) {
    return firstMappedFile;}}} catch (Exception e) {
    log.error("findMappedFileByOffset Exception", e);}return null;}public MappedFile getFirstMappedFile() {
    MappedFile mappedFileFirst = null;if (!this.mappedFiles.isEmpty()) {
    try {
    mappedFileFirst = this.mappedFiles.get(0);} catch (IndexOutOfBoundsException e) {
    //ignore} catch (Exception e) {
    log.error("getFirstMappedFile has exception.", e);}}return mappedFileFirst;}public MappedFile findMappedFileByOffset(final long offset) {
    return findMappedFileByOffset(offset, false);}public long getMappedMemorySize() {
    long size = 0;Object[] mfs = this.copyMappedFiles(0);if (mfs != null) {
    for (Object mf : mfs) {
    if (((ReferenceResource) mf).isAvailable()) {
    size += this.mappedFileSize;}}}return size;}public boolean retryDeleteFirstFile(final long intervalForcibly) {
    MappedFile mappedFile = this.getFirstMappedFile();if (mappedFile != null) {
    if (!mappedFile.isAvailable()) {
    log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());boolean result = mappedFile.destroy(intervalForcibly);if (result) {
    log.info("the mappedFile re delete OK, " + mappedFile.getFileName());List<MappedFile> tmpFiles = new ArrayList<MappedFile>();tmpFiles.add(mappedFile);this.deleteExpiredFile(tmpFiles);} else {
    log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());}return result;}}return false;}public void shutdown(final long intervalForcibly) {
    for (MappedFile mf : this.mappedFiles) {
    mf.shutdown(intervalForcibly);}}public void destroy() {
    for (MappedFile mf : this.mappedFiles) {
    mf.destroy(1000 * 3);}this.mappedFiles.clear();this.flushedWhere = 0;// delete parent directoryFile file = new File(storePath);if (file.isDirectory()) {
    file.delete();}}public long getFlushedWhere() {
    return flushedWhere;}public void setFlushedWhere(long flushedWhere) {
    this.flushedWhere = flushedWhere;}public long getStoreTimestamp() {
    return storeTimestamp;}public List<MappedFile> getMappedFiles() {
    return mappedFiles;}public int getMappedFileSize() {
    return mappedFileSize;}public long getCommittedWhere() {
    return committedWhere;}public void setCommittedWhere(final long committedWhere) {
    this.committedWhere = committedWhere;}
}

在这里插入图片描述

3.1. 加载本地路径下的文件,load

路径下的每个文件都对应一个MappedFile,并添加到内部维护的mappedFiles属性中

private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList();

?

3.2. 根据offset查找MappedFile,findMappedFileByOffset

先尝试计算所给offset在mappedFiles里的下标index,获得对应的MappedFile后判断offset是否在MappedFile中。如果存在则返回,否则遍历mappedFiles,逐一判断是否存在符合条件的MappedFile。

  public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
    MappedFile firstMappedFile = this.getFirstMappedFile();MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {
    if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {
    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {
    targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {
    }if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
    return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {
    if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
    return tmpMappedFile;}}}if (returnFirstOnNotFound) {
    return firstMappedFile;}}} catch (Exception e) {
    log.error("findMappedFileByOffset Exception", e);}return null;}

[

](https://blog.csdn.net/weixin_42509236/article/details/113366603)

3.3. 根据过期时间删除MappedFile

遍历mappedFiles里的每个文件,判断文件的最后修改时间+超时时间是否小于当前时间,如果是则将给MappedFile卸下,关闭对应的FileChannel连接等资源。
[

](https://blog.csdn.net/weixin_42509236/article/details/113366603)

3.4. 根据offset删除MappedFile

遍历mappedFiles里的每个文件,如果MappedFile的最大偏移量,如果小于所给offset,则将MappedFile卸下。
?

3.5. 获取最后一个MappedFile

如果队列为空或者最后一个MappedFile已经满了,则初始化一个MappedFile。可以使用AllocateMappedFileService异步预分配两个文件,或者直接同步创建一个文件。
?

AllocateMappedFileService继承自ServiceThread,ServiceThread之前介绍过,类似定时器,会定时执行run方法。AllocateMappedFileService用于预分配MappedFile,将请求包装为AllocateRequest并放到内部的优先级队列中,并每隔5秒从队里中取出请求并执行分配操作。在这一步并会调用MappedFile的warmMappedFile方法,写入0值进行数据的预热,然后锁定该内部防止被交换出去。
?

MappedFileQueue主要为CommitLog和ConsumeQueue提供支持。CommitLog用于存储Producer的消息,ConsumeQueue会接受CommitLog分配的消息,等待Consumer消费
?

MappedFileQueue的操作都建立在MappedFile上,基本都是先找到目标的MappedFile,再针对该MappedFile进行操作。
?

MappedFile和MappedFileQueue作为RocketMQ消息存储的基础,将为上层的CommitLog和ConsumeQueue提供支持。
[

](https://blog.csdn.net/weixin_42509236/article/details/113366603)

如何写入一条消息

1.异步写入

org.apache.rocketmq.store.CommitLog#asyncPutMessage

  public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//多加几个属性,改topicmsg.setTopic(topic);msg.setQueueId(queueId);}}long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {
    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {
    case PUT_OK:break;case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {
    // XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {
    putMessageLock.unlock();}if (elapsedTimeInLock > 500) {
    log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}if (replicaStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

其中调用

result = mappedFile.appendMessage(msg, this.appendMessageCallback);

将数据写入到对应的byteBuf中

    byteBuffer.put(messagesByteBuff)

异步刷盘分为两种情况,是否开启堆外内存缓存池,具体配置参数:MessageStoreConfig#transientStorePoolEnable。
?

1)transientStorePoolEnable = true
?

消息在追加时,先放入到 writeBuffer 中,然后定时 commit 到 FileChannel,然后定时flush。
?

2)transientStorePoolEnable=false(默认)
?

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
    if (this.hold()) {
    int value = getReadPosition();try {
    //We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);} else {
    this.mappedByteBuffer.force();}} catch (Throwable e) {
    log.error("Error occurred when force data to disk.", e);}this.flushedPosition.set(value);this.release();} else {
    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();}



异步写入相关线程池
?

FlushRealTimeService 异步消息线程

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;private long printTimes = 0;public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {
    if (flushCommitLogTimed) {
    Thread.sleep(interval);} else {
    this.waitForRunning(interval);}if (printFlushProgress) {
    this.printFlushProgress();}long begin = System.currentTimeMillis();CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {
    log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {
    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// Normal shutdown, to ensure that all the flush before exitboolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
    result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {
    return FlushRealTimeService.class.getSimpleName();}private void printFlushProgress() {
    // CommitLog.log.info("how much disk fall behind memory, "// + CommitLog.this.mappedFileQueue.howMuchFallBehind());}@Overridepublic long getJointime() {
    return 1000 * 60 * 5;}}
   // CommitLog flush interval// flush data to disk@ImportantFieldprivate int flushIntervalCommitLog = 500;// Only used if TransientStorePool enabled// flush data to FileChannel@ImportantFieldprivate int commitIntervalCommitLog = 200;

默认,刷盘间隔为500毫秒,如果开启TransientStorePool刷盘间隔为200毫秒

2.同步写入

关键方法org.apache.rocketmq.store.CommitLog#submitFlushRequest

 public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,MessageExt messageExt) {
    // Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {
    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {
    service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// Asynchronous flushelse {
    if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    flushCommitLogService.wakeup();} else  {
    commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}
 return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}if (replicaStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});

添加消息之后,需要等到feature结束,才能继续执行,否则阻塞

GroupCommitService 同步消息flush线程

在org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit

       synchronized (this.requestsRead) {
    if (!this.requestsRead.isEmpty()) {
    for (GroupCommitRequest req : this.requestsRead) {
    // There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = false;for (int i = 0; i < 2 && !flushOK; i++) {
    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();if (!flushOK) {
    CommitLog.this.mappedFileQueue.flush(0);}}req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}this.requestsRead.clear();} else {
    // Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}}}

会进入

CommitLog.this.mappedFileQueue.flush(0);

flush完毕,到磁盘之后将会完成future返回成功

req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
 public boolean flush(final int flushLeastPages) {
    boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {
    long tmpTimeStamp = mappedFile.getStoreTimestamp();int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {
    this.storeTimestamp = tmpTimeStamp;}}return result;}
   /*** @return The current flushed position*/public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
    if (this.hold()) {
    int value = getReadPosition();try {
    //We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);} else {
    this.mappedByteBuffer.force();}} catch (Throwable e) {
    log.error("Error occurred when force data to disk.", e);}this.flushedPosition.set(value);this.release();} else {
    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();}

注意,调用了MappedByteBuffer的force,强制刷到磁盘,即使掉电也不会丢数据

     this.mappedByteBuffer.force();

文件结构与作用

目录结构

查看其目录文件
在这里插入图片描述

  • commitlog:消息的存储目录
  • config:运行期间一些配置信息

在这里插入图片描述

  • consumequeue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除
  • checkpoint:文件检测点。存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。

2、文件简介

2.1、commitlog文件

commitlog文件的存储地址:KaTeX parse error: Undefined control sequence: \store at position 5: HOME\?s?t?o?r?e?\commitlog{fileName},每个文件的大小默认1G =1024_1024_1024(这里也是因为Java中的MappedByteBuffer便是对应mmap实现,在Java中一次只能映射1.5~2G的文件内存,因此不应该设置太大),commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648 ,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。
在这里插入图片描述

commitlog目录下的文件主要存储消息,每条消息的长度不同,查看其存储的逻辑视图,每条消息的前面4个字节存储该条消息的总长度。
在这里插入图片描述

文件的消息单元存储详细信息

编号 字段简称 字段大小(字节) 字段含义
1 msgSize 4 代表这个消息的大小
2 MAGICCODE 4 MAGICCODE = daa320a7
3 BODY CRC 4 消息体BODY CRC 当broker重启recover时会校验
4 queueId 4
5 flag 4
6 QUEUEOFFSET 8 这个值是个自增值不是真正的consume queue的偏移量,可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数,若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据
7 PHYSICALOFFSET 8 代表消息在commitLog中的物理起始地址偏移量
8 SYSFLAG 4 指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数:当4个字节均为0(值为0)时表示非事务消息;当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed);当第2个字节为1(值为2)表示多消息(MultiTags);当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息;当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息
9 BORNTIMESTAMP 8 消息产生端(producer)的时间戳
10 BORNHOST 8 消息产生端(producer)地址(address:port)
11 STORETIMESTAMP 8 消息在broker存储时间
12 STOREHOSTADDRESS 8 消息存储到broker的地址(address:port)
13 RECONSUMETIMES 8 消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,成功消费一次记录为0;
14 PreparedTransaction Offset 8 表示是prepared状态的事物消息
15 messagebodyLength 4 消息体大小值
16 messagebody bodyLength 消息体内容
17 topicLength 1 topic名称内容大小
18 topic topicLength topic的内容值
19 propertiesLength 2 属性值大小
20 properties propertiesLength propertiesLength大小的属性数据

2.2、consumequeue

RocketMQ基于主题订阅模式实现消息的消费,消费者关心的是主题下的所有消息。但是由于不同的主题的消息不连续的存储在commitlog文件中,如果只是检索该消息文件可想而知会有多慢,为了提高效率,对应的主题的队列建立了索引文件,为了加快消息的检索和节省磁盘空间,每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。
在这里插入图片描述

查看目录结构:在这里插入图片描述

单个consumequeue文件中默认包含30万个条目,每个条目20个字节,所以每个文件的大小是固定的20w x 20字节,单个consumequeue文件可认为是一个数组,下标即为逻辑偏移量,消息的消费进度存储的偏移量即逻辑偏移量。
ReputMessageService
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

//为commitlog建立队列索引private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
    log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
    break;}SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {
    try {
    this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {
    DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {
    if (size > 0) {
    DefaultMessageStore.this.doDispatch(dispatchRequest);if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
    DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {
    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {
    if (size > 0) {
    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {
    doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
    log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {
    result.release();}} else {
    doNext = false;}}}
 private boolean isCommitLogAvailable() {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();}

当reputFromOffset小于最大的偏移量需要把未同步的消息同步到consumerqueue中
最终进入org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo

 private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {
    if (offset + size <= this.maxPhysicOffset) {
    log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {
    if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
    this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {
    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {
    log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {
    LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset + size;return mappedFile.appendMessage(this.byteBufferIndex.array());}return false;}

将新增加的消息添加索引到对应的consumerQueue

2.3、IndexFile

IndexFile:用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;

2.3.1、IndexFile结构分析

在这里插入图片描述

IndexHead 数据: beginTimestamp:该索引文件包含消息的最小存储时间 endTimestamp:该索引文件包含消息的最大存储时间 beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量) endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量) hashSlotCount:hashslot个数,并不是 hash 槽使用的个数,在这里意义不大, indexCount:已使用的 Index 条目个数
Hash 槽: 一个 IndexFile 默认包含 500W 个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引
Index 条目列表 hashcode:key 的 hashcode phyoffset:消息对应的物理偏移量 timedif:该消息存储时间与第一条消息的时间戳的差值,小于 0 表示该消息无效 preIndexNo:该条目的前一条记录的 Index 索引,hash 冲突时,根据该值构建链表结构

2.3.2、IndexFile条目存储

RocketMQ将消息索引键与消息的偏移量映射关系写入IndexFile中,其核心的实现方法是public boolean putKey(final String key, final long phyOffset, final long storeTimestamp);参数含义分别是消息的索引、消息的物理偏移量、消息的存储时间。
索引建立触发过程
与consumerQueue相同,也是org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput中触发
进入org.apache.rocketmq.store.index.IndexService#buildIndex
?

核心方法IndexFile中的putKey

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //判断当前的条目数是否大于最大的允许的条目数if (this.indexHeader.getIndexCount() < this.indexNum) {
    //获取KEY的hash值(正整数)int keyHash = indexKeyHashMethod(key);//计算hash槽的下标int slotPos = keyHash % this.hashSlotNum;//获取hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;FileLock fileLock = null;try {
    // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,// false);//获取hash槽中存储的数据int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//判断值是否小于等于0或者 大于当前索引文件的最大条目if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
    slotValue = invalidIndex;}//计算当前消息存储时间与第一条消息时间戳的时间差long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();//秒timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {
    timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {
    timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {
    timeDiff = 0;}//计算条目的物理地址 = 索引头部大小(40字节) + hash槽的大小(4字节)*槽的数量(500w) + 当前索引最大条目的个数*每index的大小(20字节)int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//依次存入 key的hash值(4字节)+消息的物理偏移量(8字节)+消息存储时间戳和index文件的时间戳差(4字节)+当前hash槽的值(4字节)this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//存储当前index中包含的条目数量存入hash槽中,覆盖原先hash槽的值this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());if (this.indexHeader.getIndexCount() <= 1) {
    this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}//更新文件索引的头信息,hash槽的总数、index条目的总数、最后消息的物理偏移量、最后消息的存储时间this.indexHeader.incHashSlotCount();this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {
    log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);} finally {
    if (fileLock != null) {
    try {
    fileLock.release();} catch (IOException e) {
    log.error("Failed to release the lock", e);}}}} else {
    log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

以上详细了分析了IndexFile条目存储的业务逻辑

2.3.3、通过KEY查找消息

在这里插入图片描述
DefaultMessageStore类中的public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) 中其核心方法是QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);获取消息的物理存储地址,通过偏移量去commitLog中获取消息集。
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)核心方法又是IndexFile类中的public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)方法

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
    //获取key的hash信息int keyHash = indexKeyHashMethod(key);//获取hash槽的下标int slotPos = keyHash % this.hashSlotNum;//获取hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;FileLock fileLock = null;try {
    if (lock) {
    // fileLock = this.fileChannel.lock(absSlotPos,// hashSlotSize, true);}//获取hash槽的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);// if (fileLock != null) {
    // fileLock.release();// fileLock = null;// }//判断值是否小于等于0或者 大于当前索引文件的最大条目if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {
    } else {
    for (int nextIndexToRead = slotValue; ; ) {
    if (phyOffsets.size() >= maxNum) {
    break;}//计算条目的物理地址 = 索引头部大小(40字节) + hash槽的大小(4字节)*槽的数量(500w) + 当前索引最大条目的个数*每index的大小(20字节)int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//获取key的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//获取消息的物理偏移量long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//获取当前消息的存储时间戳与index文件的时间戳差值long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//获取前一个条目的信息(链表结构)int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {
    break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;//判断该消息是否在查询的区间boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//判断key的hash值是否相等并且在查询的时间区间内if (keyHash == keyHashRead && timeMatched) {
    //加入到物理偏移量的List中phyOffsets.add(phyOffsetRead);}if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {
    break;}//继续前一个条目信息获取进行匹配nextIndexToRead = prevIndexRead;}}} catch (Exception e) {
    log.error("selectPhyOffset exception ", e);} finally {
    if (fileLock != null) {
    try {
    fileLock.release();} catch (IOException e) {
    log.error("Failed to release the lock", e);}}this.mappedFile.release();}}
}
  1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置( slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)。
  2. 根据 slotValue( slot 位置对应的值)查找到索引项列表的最后一项(倒序排列, slotValue 总是指向最新的一个 索引项)。
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记彔)
  4. Hash 冲突;寻找 key 的 slot 位置时相当于执行了两次散列函数,一次 key 的 hash,一次 key 的 hash 值取模,因此返里存在两次冲突的情况;第一种, key 的 hash 不同但模数相同,此时查询的时候会在比较一次key 的hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。第二种, hash 值相等但 key 不等,出于性能的考虑冲突的检测放到客户端处理( key 的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的 key 是否相同

?

?

?