DataNode物理结构


简单总结下DataNode的物理结构

分两个方向来讲述DataNode的目录结构

一、DataNode磁盘目录结构

  1. 比如datanode上有12个盘,同一个块池目录比如BP-1007908154-10.10.10.10-1533290355162存在于所有的盘上。
  1. 在HDFS-6482之前,是通过LDir这个类来存放具体的数据块的目录位置。所以存在一个很大的问题就是当由于数据块的数量非常大的时候,由于需要在内存中记录数据块的具体位置,还需要记录对应的subdirs,会对DN造成很大的内存开销。

    所以这个Patch的思想是通过两层subdir的方式来存放不同的数据块。具体实现是DatanodeUtil.idToBlockDir()分别根据块池id,取第二和第三个字节位来得到两层的subdir的id。这样只需要在用到的时候计算出相应目录即可。

二、副本状态

1.NameNode副本状态

1
2
3
4
5
6
7
8
9
10
static public enum BlockUCState {
//commit状态的块收到DN的块汇报后
COMPLETE,
//正在写入的数据块,大部分的数据块读可见,对应Replica的状态为RBW
UNDER_CONSTRUCTION,
//如果客户端写文件超过租约后,如果最后一个数据块处于UNDER_CONSTRUCTION状态,当block恢复开始时,UnderConstruction变为UnderRecovery状态,对应Replica的状态为RUR
UNDER_RECOVERY,
//客户端每次请求新的数据块时候,比如写文件,都会对上一个数据块进行提交。这个表示客户端已经收到这个数据块的请求了,只是还没有收到DN的块汇报
COMMITTED;
}

2.DataNode副本状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//所有副本状态类的父类
//子类有 FinalizedReplica,ReplicaBeingWritten,ReplicaUnderRecovery,ReplicaWaitingToBeRecovered,ReplicaInPipeline
abstract public class ReplicaInfo extends Block implements Replica {
}
//分别对应于ReplicaState
static public enum ReplicaState {
/** Replica is finalized. The state when replica is not modified. */
FINALIZED(0),
/** Replica is being written to. */
RBW(1),
/** Replica is waiting to be recovered. */
RWR(2),
/** Replica is under recovery. */
RUR(3),
/** Temporary replica: created for replication and relocation only. */
TEMPORARY(4);
}

RWR: ReplicaWaitingToBeRecovered

  1. 如果client挂了之后,RWR状态下的replica将会过期,或者将出现在租约恢复的过程中(将RBW状态的Replica转为RWR)
    1
  2. 在DN重启加载块的时候,会将所有RWR目录下的数据块标记为ReplicaWaitingToBeRecovered,并将其及副本信息一起添加到ReplicaMap中
1
2
3
4
class ReplicaMap{
//池id->(块id->副本信息)
private final Map<String, Map<Long, ReplicaInfo>> map = new HashMap<String, Map<Long, ReplicaInfo>>();
}

FINALIZED: FinalizedReplica

  1. 表示这个块已经写入完成了,对所有用户都是可见的,最终DU等统计都是以FinalizedReplica的副本为准

RBW: ReplicaBeingWritten

  1. 通常表示一个文件中最后一个正在写入的副本

  2. bytesAcked:表示接收到下游ack的bytes数,bytesReserved,block已经接收到的bytes数,包括写入磁盘的和在dn内存中的数据

  3. 当datanode向下一个datanode发送的数据块写成功了,并且接收到了下一个datanode的ack时,会通过finalizeBlock()提交这个数据块

  4. 在DN重启加载块的时候,会将所有RBW目录下的数据块标记为ReplicaWaitingToBeRecovered

RUR: ReplicaUnderRecovery

  1. 租约恢复时候,任意一个非Temporary状态的副本都有可能转换为RUR状态。
  2. 比如当客户端写文件中途退出时候,为了保证最后一个数据块的数据一致性,NN会通过下发恢复指令,选择一个主节点,最终通过FsDatasetImpl.initReplicaRecovery()对这个数据块进行恢复

TEMPORARY:

  1. 数据对客户端不可见
  2. 当DataNode成功接收了其他DataNode的数据块之后,通过DataSetImpl.createTemporary()创建tmp目录,会将这个状态的副本转换为RBW状态

ReplicaInPipeline:

  1. 通过DataSetImpl.createTemporary()创建的副本类型为ReplicaInPipeline类型

    各个目录文件的简单总结

  • finalized:客户端已经完成写入并提交的数据块
  • rbw:客户端正在写入的
  • tmp:比如当这个副本是Datanode在接收其他Datanode写数据块的请求时在构造BlockReceiver时调用的,即写数据块拷贝的时候,最终调用的入口为DataSetImpl.createTemporary()
  • in_used.lock:在初始化块池时,要根据当前的目录分析当前的状态时,会对这个目录加锁。

三、目录结构对应的逻辑结构

1.目录维度

FsDatasetImpl: 实现了FsDatasetSpi接口,管理DataNode上所有的数据块,一些对数据块的各种操作最终都是要访问这个类

FSVolumeImpl: 管理单个存储目录保存的所有数据块,内部通过CHM维护当前目录下块池ID对应的BlockPoolSlice的映射

FSVolumeList: 维护所有FSVolumeImpl的引用,通过FsVolumeImpl[]的AtomicReference来管理

BlockPoolSlice: 管理一个块池的所有数据块,所有的数据块是通过ReplicaMap这个类来进行管理的,ReplicaMap中通过HashMap<String, Map<Long, ReplicaInfo>>来记录,块池id->(块id->副本信息)

BlockPoolSliceStorage: 一个BlockPoolSliceStorage用来管理名字相同的所有的BlockPoolSlice

2.功能维度

每个块池对应于每个BPOfferService,目前一共有41个块池,对应线上41个namespace,BPOfferService内部维护了BPServiceActor的列表,实际和NN进行交互的逻辑都是在BPServiceActor中。

BPOfferService内部还维护了NamespaceInfo的信息,只有当它向NN注册之后才会获取这个信息。

四、DN启动时两个维度间的交互

1.重启时主要过程

  • 首先DN通过配置文件获取NN的命名空间和对应的通信地址
  • DN根据命名空间的个数创建对应的BPOfferService,并且在每个BPOfferService中创建数量相同的BPServiceActor来维持通信
  • 通过调用BPOfferService.start()方法启动BPOfferService下的所有BPServiceActor
  • BPServerActor和NN进行握手
  • rpc获取NamespaceInfo信息,包含了块池的ID,即BlockPoolID,BP-1007908154-10.10.10.10-1533290355162,还有代码的版本,还有ClusterID,如果失败会sleep一段时间后继续重试。拿到信息后还要进行版本的校验。因为同一组BPActor最终是在BPofferService中的,所以BPofferService只要通过加锁BPOfferService.NamespaceInfo判断是否为空已经初始化过了,来保证一个每个BPofferService中只会进行一次尝试DN初始化操作。
  • 如果这是当前BPofferService第一个启动的Actor,还会进行初始化块池,如果块池已经初始化完成了则会跳过。
  • 向NN进行注册,注册的时候会不断尝试直到成功。
  • 一直执行BPServiceActor.offerService()直到退出-
  • 发送心跳包
  • 计算时间定时发送心跳,首先构造StorageReport
  • StorageReport初始化的时候会通过DataSetImpl获取每块盘下的存储信息
  • 发送心跳包时,除了storageReport还包含DN的DU等情况的一些信息
  • NN接受心跳包
  • Rpc调用sendHeartbeat()
  • 通过datanodeManger来处理心跳,根据dataNode的id获取dataNodeMap中的DatanodeDescriptor
  • 最后通过DatanodeDescriptor.updateHeartbeatState()来更新心跳
  • 每次心跳都会根据report数组来更新这个map,并且把错误的DatanodeStorage从map中移除
  • 更新失败块的状态,normal–>failed 最后对这个DatanodeDescriptor中的map进行修剪
  • NN上还有一个异步的线程定时来检查DatanodeDescriptor坏掉的卷上是否有数据块,如果有则通过blockManager将这些块都移除
  • 处理从NN发送回来的信息
  • 调用BPOfferService.processCommand()方法对命令数组进行处理,根据NN返回的cmds数组执行对应的操作

2.块池初始化逻辑

主要流程:盘目录的初始化,块池的初始化,数据块的初始化。

具体的过程:

  1. DataStorage初始化
  2. DN上的盘目录的初始化
1
2
3
4
5
6
7
8
//具体的盘的信息存放在DataNode.DataStorage中
//DataStorage extends Storage
public abstract class Storage extends StorageInfo {
//存在已经加载完成的盘的列表
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
//块池的ID到其对应的BlockPoolSliceStorage的映射
private final Map<String, BlockPoolSliceStorage> bpStorageMap = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());
}
  • 初始化DN上的存储目录,即之前通过data.dir配置项获取的

  • 遍历保存所有的储存的StorageDirectory方法,调用每个StorageDirectory.analyzeStorage() 进行分析

  • 加载某个盘的目录时候会通过系统调用判断这个盘目录是否可存在,是否可写,来得到这个盘的一个状态,同时还会判断hasFinalizedTmp,hasRemovedTmp目录的是否存在来判断当前的是否处于升级状态的某个阶段。通常情况下在加载完这个盘后就是Normal状态,最后会持久化Version文件到本地。当这个目录初始化完成后最终会将其添加到上述的storageDirs中。

1
2
3
4
5
6
boolean hasPrevious = getPreviousDir().exists();
boolean hasPreviousTmp = getPreviousTmp().exists();
boolean hasRemovedTmp = getRemovedTmp().exists();
//...
if (hasCurrent)
return StorageState.NORMAL;
  • DN中定义的存储目录下对应的块池的初始化
  • 上面加载的都是盘的目录,由于这个块池是存在于所有盘的。所以BlockPoolSliceStorage.recoverTransitionRead()要在每个盘上对应的块池目录调用一次这个方法,(由于BlockPoolSliceStorage也是继承于Storage所以也是有如上两个列表来记录加载完成的目录的信息),只不过这里对应的盘的列表就变成了对应的块池目录的列表
1
2
3
4
5
6
7
8
9
for (StorageLocation dataDir : dataDirs) {
//....
BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
if (bpStorage == null) {
bpStorage = new BlockPoolSliceStorage( );
}
bpStorage.recoverTransitionRead( )
addBlockPoolStorage(bpid, bpStorage);
}
  • 构造BlockPoolSliceStorage对象,调用BlockPoolSliceStorage.recoverTransitionRead()对每个块池初始化
  • 块池的初始化和上面的过程类似先analyzeStorage()分析状态,然后根据状态进行恢复,通常重启是normal状态,根据不同的状态执行对应的操作
  • 所以最终块池目录加载完毕后DataStorage.bpStorageMap中会存在所有块池id及其对应BlockPoolSliceStorage的唯一映射。
  • DataSetImpl初始化,通过工厂模式创建
  • 初始化volumes

    1
    2
    //FsVolumeLis用来存放FsVolumeImpl的结构,好处是checkDirs(), getAvailable()就不需要加锁了
    private final AtomicReference<FsVolumeImpl[]> volumes
  • 根据DN的存储目录初始化FsVolumeImpl

  • cas并发的添加到AtomicReference<FsVolumeImpl[]> volumes,checkDirs(), getAvailable()
  • 添加对blockScanner的引用,blockScanner在DN初始化后就已经完成
  • 在volumes添加FsVolumeReference时候会在blockScanner中也添加FsVolumeReference
  • 初始化完毕后开始在FsVolumeLis创建多个线程并发的添加块池
  • 遍历volumes列表调用 FsVolumeImpl.addBlockPool()方法
  • FsVolumeImpl构造BlockPoolSlice 并将其添加到bpSlices 的Map中
  • 首先启动和盘的数量相同的线程并行的加载每个盘下的块池目录,比如初始化每个盘下对应的块池id->BlockPoolSliceStorage的映射。
  • 构造BlockPoolSlice时会创建current,rbw,tmp,等目录
  • 数据块的初始化操作
  • 块池目录加载完成后,启动多个线程对每个盘下的数据块副本进行加。
1
2
3
4
5
6
7
8
9
10
11
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
//...
}
replicaAddingThreads.add(t);
t.start();
}
for (Thread t : replicaAddingThreads) {
t.join();
}
  1. 最终通过BlockPoolSlice.addToReplicasMap()在每个传入的ReplicaMap上添加各个目录下的数据块,比如Finalize和rbwDir的数据块。
  2. 最终初始化之后的结果是FsVolumeList这个类中的volumes包含了所有的盘的目录,并且每个FsVolumeImpl中记录了每个目录下的各个块的实例,最终存放在ReplicaMap.Map<String, Map<Long, ReplicaInfo>> map这个map中块池id->(块id->副本信息)
文章目录
  1. 1. 一、DataNode磁盘目录结构
  2. 2. 二、副本状态
    1. 2.1. 各个目录文件的简单总结
  3. 3. 三、目录结构对应的逻辑结构
    1. 3.1. 1.目录维度
    2. 3.2. 2.功能维度
  4. 4. 四、DN启动时两个维度间的交互
    1. 4.1. 1.重启时主要过程
    2. 4.2. 2.块池初始化逻辑