鉴于在hdfs客户端读取hdfs文件过程中,其在获取到数据块所在的DataNode之后,会构造blockReader对象用来从指定数据节点上读取数据块;其中RemoteBlockReader2是使用socket连接从datanode读取数据块的实现类,其reader.read()方法用于从socket stream中读取对应的数据包。接下来将分析一下该处对应的DataNode节点上是如何响应该请求的:
在介绍如何响应之前,先简单介绍一下DataNode进程在启动中所开启的一些基本服务如下(其启动源码在DataNode.main()方法中,不过多赘述,后续会写一篇blog来介绍具体的启动过程):
- DataNode.startDataNode():
- 初始化DataStorage对象
- 初始化DataXceiverServer对象
- 启动HttpInfoServer对象
- 初始化DataNode的IPC Server对象
- 创建BlockPoolManager对象
- DataNode.runDatanodeDaemon():
- 启动BlockPoolManager所管理的所有线程
- 启动dataXceiverServer线程
- 启动DataNode的IPC Server
其中主要用于响应客户端流式接口请求的服务就是DataXceiverServer服务线程;其基本逻辑接口图如下:
接下来一步步来看DataXceiverServer是如何启动并对外提供服务的:
1、DataXceiverServer的初始化
在数据节点DataNode进程启动的startDataNode()方法中,会调用initDataXceiver()方法,完成DataXceiverServer的初始化;首先其会创建tcpPeerServer对象(对ServerSocket的封装),其能够通过accept()方法返回Peer对象(对Socket的封装)用于提供输入输出流。并针对短路读提供domainPeerServer对于本地短路读请求;其源码如下:
private void initDataXceiver(Configuration conf) throws IOException {// find free port or use privileged port provided// 对ServerSocket的封装TcpPeerServer tcpPeerServer;if (secureResources != null) {tcpPeerServer = new TcpPeerServer(secureResources);} // .........// 设置tcp接受缓冲区 并绑定对应InetSocketAddresstcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);streamingAddr = tcpPeerServer.getStreamingAddr();LOG.info("Opened streaming server at " + streamingAddr);this.threadGroup = new ThreadGroup("dataXceiverServer");// 构造DataXceiverServer对象xserver = new DataXceiverServer(tcpPeerServer, conf, this);this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty// 针对短路读情况,构造localDataXceiverServer 用于本地进程通信if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {DomainPeerServer domainPeerServer =getDomainPeerServer(conf, streamingAddr.getPort());if (domainPeerServer != null) {this.localDataXceiverServer = new Daemon(threadGroup,new DataXceiverServer(domainPeerServer, conf, this));LOG.info("Listening on UNIX domain socket: " +domainPeerServer.getBindPath());}}this.shortCircuitRegistry = new ShortCircuitRegistry(conf);}
之后其便会通过DataNode.runDatanodeDaemon();启动dataXceiverServer线程;其run()方法的基本逻辑和源码如下:
- while循环,等待阻塞TcpPeerServer(也就是ServerSocket)的accept()方法,直到接收到客户端或者其他DataNode的连接请求;
- 获得peer,即Socket的封装;
- 判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续4;
- 创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求;
通过该方法可以知道dataXceiverServer只负责连接的建立以及构造并启动DataXceiver,流式接口的请求则是由DataXceiver响应的,所有的输入输出流的操作都是由DataXceiver来执行的;(这种连接建立和响应分离的设计方式在hadoop rpc处也出现过)
@Override// 核心方法public void run() {Peer peer = null;// 如果标志位shouldRun为true,且没有为升级而执行shutdownwhile (datanode.shouldRun && !datanode.shutdownForUpgrade) {try {// 阻塞,直到接收到客户端或者其他DataNode的连接请求peer = peerServer.accept();// Make sure the xceiver count is not exceeded// 确保DataXceiver数目没有超过最大限制/*** DataNode的getXceiverCount方法计算得到,返回线程组的活跃线程数目* threadGroup == null ? 0 : threadGroup.activeCount();*/int curXceiverCount = datanode.getXceiverCount();if (curXceiverCount > maxXceiverCount) {throw new IOException("Xceiver count " + curXceiverCount+ " exceeds the limit of concurrent xcievers: "+ maxXceiverCount);}// 创建一个后台线程,DataXceiver,并加入到线程组datanode.threadGroupnew Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start();} catch (SocketTimeoutException ignored) {// wake up to see if should continue to run// 等待唤醒看看是否能够继续运行} catch (AsynchronousCloseException ace) {// 异步的关闭异常// 正如我们所预料的,只有在关机的过程中,通过其他线程关闭我们的侦听套接字,其他情况下则不会发生if (datanode.shouldRun && !datanode.shutdownForUpgrade) {LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);}} catch (IOException ie) {IOUtils.cleanup(null, peer);LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);} catch (OutOfMemoryError ie) {IOUtils.cleanup(null, peer);// 数据节点可能由于存在太多的数据传输导致内存溢出,记录该事件,并等待30秒,其他的数据传输可能到时就完成了LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);try {Thread.sleep(30 * 1000);} catch (InterruptedException e) {// ignore}} catch (Throwable te) {LOG.error(datanode.getDisplayName()+ ":DataXceiverServer: Exiting due to: ", te);datanode.shouldRun = false;}}// Close the server to stop reception of more requests.// 关闭服务器停止接收更多请求try {peerServer.close();closed = true;} catch (IOException ie) {LOG.warn(datanode.getDisplayName()+ " :DataXceiverServer: close exception", ie);}// if in restart prep stage, notify peers before closing them.// 如果在重新启动前准备阶段,在关闭前通知peersif (datanode.shutdownForUpgrade) {restartNotifyPeers();LOG.info("Shutting down DataXceiverServer before restart");// Allow roughly up to 2 seconds.for (int i = 0; getNumPeers() > 0 && i < 10; i++) {try {Thread.sleep(200);} catch (InterruptedException e) {// ignore}}}// Close all peers.// 关闭所有的peerscloseAllPeers();}
2、DataXceiver流式请求的读写
DataXceiver是一个线程类,其运行的run方法的主要功能是读取请求的类型,根据类型执行相应的操作,我们主要分析读和写数据块相关的方法:
/*** Read/write data from/to the DataXceiverServer.*/@Overridepublic void run() {int opsProcessed = 0;Op op = null;try {// 先根据传入的peer对象获取对应的输入/输出流,并对流进行装饰 dataXceiverServer.addPeer(peer, Thread.currentThread(), this);peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);InputStream input = socketIn;try {IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,socketIn, datanode.getXferAddress().getPort(),datanode.getDatanodeId());input = new BufferedInputStream(saslStreams.in,HdfsConstants.SMALL_BUFFER_SIZE);socketOut = saslStreams.out;} catch (InvalidMagicNumberException imne) {// ......return;}super.initialize(new DataInputStream(input));// We process requests in a loop, and stay around for a short timeout.// This optimistic behaviour allows the other end to reuse connections.// Setting keepalive timeout to 0 disable this behavior.// DataXceiver主体循环do {updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));try {if (opsProcessed != 0) {assert dnConf.socketKeepaliveTimeout > 0;peer.setReadTimeout(dnConf.socketKeepaliveTimeout);} else {peer.setReadTimeout(dnConf.socketTimeout);}// 调用Receiver.readOp()从输入流中解析操作符op = readOp();} catch (InterruptedIOException ignored) {// ......}// ......opStartTime = now();processOp(op); // 调用processOp()处理该流式请求操作++opsProcessed;} while ((peer != null) &&(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));} catch (Throwable t) {// ......}}
其主要的工作流程是:从IO中读取流式接口请求并解析处对应的操作符,并调用processOp()处理该流式请求操作,该方法会根据Op调用DataXceiver对应的处理方法:
/** Read an Op. It also checks protocol version. */protected final Op readOp() throws IOException {final short version = in.readShort();if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {throw new IOException( "Version Mismatch (Expected: " +DataTransferProtocol.DATA_TRANSFER_VERSION +", Received: " + version + " )");}return Op.read(in);}/** Process op by the corresponding method. */protected final void processOp(Op op) throws IOException {switch(op) {case READ_BLOCK:opReadBlock();break;case WRITE_BLOCK:opWriteBlock(in);break;case REPLACE_BLOCK:opReplaceBlock(in);break;case COPY_BLOCK:opCopyBlock(in);break;case BLOCK_CHECKSUM:opBlockChecksum(in);break;case TRANSFER_BLOCK:opTransferBlock(in);break;case REQUEST_SHORT_CIRCUIT_FDS:opRequestShortCircuitFds(in);break;case RELEASE_SHORT_CIRCUIT_FDS:opReleaseShortCircuitFds(in);break;case REQUEST_SHORT_CIRCUIT_SHM:opRequestShortCircuitShm(in);break;default:throw new IOException("Unknown op " + op + " in data stream");}}
3、数据块读取readBlock():
我们知道在client读取hdfs文件数据块的时候,会在构造blockReader的时候调用
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
向目标DataNode发送对应数据块的Op.READ_BLOCK操作码,DataReceiver在接收到来自客户端的Op.READ_BLOCK读数据块操作码后,会调用DataXceiver.readBlock()响应这个读请求。其调用流程如下:
DataXceiver.readBlock()首先会向客户端回复一个BlockOpResponseProto响应,表示当前请求DataXceiver已经成功接收,并通过BlockOpResponseProto告知Client客户端当前DataNode所使用的校验方式。接下来便会将数据块block切分成若干个数据包packet,然后依次将数据包发送至客户端。客户端在接收到每个数据包packet时会进行校验,并将校验结果发送给DataNode;其基本的读取流程如下:
DataXceiver.readBlock()方法的基本流程如下:
- 创建BlockSender对象,首先调用getOutputStream()获取DataNode连接到客户端的IO流,并创建构造BlockSender对象;
- 之后便调用writeSuccessWithChecksumInfo()向客户端发送BlockOpResponseProto响应,告知客户端读请求已经接收,并告知客户端当前节点的校验信息;
- 之后便调用blockSender.sendBlock()方法将数据块按照数据包packet的形式发送给客户端;
- 当blockSender完成发送数据块数据包后,客户端会响应一个ReadStatus状态码告知DataNode;
blockSender.sendBlock()方法会将数据块按照一定的组织格式发送到接收方,其发送数据的格式如下:
BlockSender发送的数据格式包括两部分:校验信息头(ChecksumHeader)和数据包序列(packets)
- ChecksumHeader:用于描述当前DataNode使用的校验方式等信息。一个校验头信息也包括2个部分:
- CHECKSUM_TYPE:数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2
- BYTES_PER_CHECKSUM:校验块大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元
- 数据包序列(packets):BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据
- 数据包头:用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度;其数据包信息如下:
- 当前数据包在整个数据块中的位置
- 数据包在管道中的序列号
- 当前数据包是不是数据块中的最后一个数据包
- 当前数据包数据部分的长度
- 是否需要DN同步
- 校验数据:校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度
- 实际数据:数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块
- 数据包头:用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度;其数据包信息如下:
BlockSender中的数据块发送过程主要包括:1、发送准备;2、发送数据块;3、清理工作
1、发送准备:主要是根据参数进行 是否需要验证校验数据、是否开启transferTo模式、从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值、寻找正确的offset等判断;其源码主要在BlockSender的构造函数当中;
2、发送数据块:BlockSender.sendBlock()用读取数据以及校验和,并将它们发送到接收方。整体流程步骤如下:
- 在开始读文件时,会触发一次预读取,也就是将数据缓存到操作系统缓冲区中;
- 构造pktBuf缓冲区,也即是能容纳一个数据包的缓冲区;
- 循环调用sendPacket()发送数据包序列,直到整个数据块发送完毕;
- 发送一个空的数据包来标识数据块的结束;
- 完成数据包发送过程之后,调用close()方法关闭数据块以及校验文件;
sendBlock()以及sendPacket()源码如下:
long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException {// Trigger readahead of beginning of file if configured.// 数据预读取至缓存manageOsCache();final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;// 构造存放数据包的缓冲区try {int maxChunksPerPacket;int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;boolean transferTo = transferToAllowed && !verifyChecksum&& baseStream instanceof SocketOutputStream&& blockIn instanceof FileInputStream;if (transferTo) {FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();blockInPosition = fileChannel.position();streamForSendChunks = baseStream;maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);// Smaller packet size to only hold checksum when doing transferTopktBufSize += checksumSize * maxChunksPerPacket;} else {maxChunksPerPacket = Math.max(1,numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));// Packet size includes both checksum and data// 缓冲区存放checksum 和 datapktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;}// 构造缓冲区pktBuf ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);// 循环调用sendPacket()发送数据包packetwhile (endOffset > offset && !Thread.currentThread().isInterrupted()) {manageOsCache();long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,transferTo, throttler);offset += len;totalRead += len + (numberOfChunks(len) * checksumSize);seqno++;}// If this thread was interrupted, then it did not send the full block.if (!Thread.currentThread().isInterrupted()) {try {// send an empty packet to mark the end of the block// 发送一个空的数据包来标识数据块的结束sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,throttler);out.flush();} catch (IOException e) { //socket errorthrow ioeToSocketException(e);}sentEntireByteRange = true;}} finally {if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {final long endTime = System.nanoTime();ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,initialOffset, endTime - startTime));}close();}return totalRead;}
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,boolean transferTo, DataTransferThrottler throttler) throws IOException {int dataLen = (int) Math.min(endOffset - offset, (chunkSize * (long) maxChunks));// 数据包中包含多少校验块int numChunks = numberOfChunks(dataLen);// 校验数据长度int checksumDataLen = numChunks * checksumSize;// 数据包长度int packetLen = dataLen + checksumDataLen + 4;boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;// 将数据包头写入缓存int headerLen = writePacketHeader(pkt, dataLen, packetLen);// 数据包头在缓存中的位置int headerOff = pkt.position() - headerLen;// 校验数据在缓存中的位置int checksumOff = pkt.position();byte[] buf = pkt.array();if (checksumSize > 0 && checksumIn != null) {// 校验数据写入缓存readChecksum(buf, checksumOff, checksumDataLen);// write in progress that we need to use to get lastchecksuif (lastDataPacket && lastChunkChecksum != null) int start = checksumOff + checksumDataLen - checksumSize;byte[] updatedChecksum = lastChunkChecksum.getChecksum();if (updatedChecksum != null) {System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);}}}int dataOff = checksumOff + checksumDataLen;// 在普通模式下下将数据写入缓存if (!transferTo) { // normal transferIOUtils.readFully(blockIn, buf, dataOff, dataLen);// 确认校验和数据if (verifyChecksum) {verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);}}try {if (transferTo) {SocketOutputStream sockOut = (SocketOutputStream) out;// 首先将头和校验和数据写入缓存sockOut.write(buf, headerOff, dataOff - headerOff);// 使用transfer方式,将数据通过0拷贝的方式写入IO流FileChannel fileCh = ((FileInputStream) blockIn).getChannel();LongWritable waitTime = new LongWritable();LongWritable transferTime = new LongWritable();sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());blockInPosition += dataLen;} else {// 普通模式下数据写入IO流out.write(buf, headerOff, dataOff + dataLen - headerOff);}} catch (IOException e) {if (e instanceof SocketTimeoutException) } else {String ioem = e.getMessage();if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {LOG.error("BlockSender.sendChunks()exception: ", e);}datanode.getBlockScanner().markSuspectBlock(volumeRef.getVolume().getStorageID(),block);}throw ioeToSocketException(e);}if (throttler != null) { // rebalancing so throttlethrottler.throttle(packetLen);}return dataLen;
}
3、清理工作:主要是关闭对应的数据流:
checksumIn.close(); // close checksum fileblockIn.close(); // close data file