Skip to main content

Hadoop Notes

Hadoop Intro

Hadoop is a open-source framework for reliable, scalable, distributed computing

Modules 模块

  • Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data. 分布式文件系统
  • YARN: A framework for job scheduling and cluster resource management.
  • MapReduce: A YARN-based system for parallel processing of large data sets. 分布式计算框架

HDFS

简介 Intro

HDFS (Hadoop Distributed File System) 是 Hadoop 中的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。

HDFS 是一个分布式的文件系统

  • commodity hardware
  • fault-tolerant 容错
  • high throughput 高吞吐
  • large data sets
  • Streaming Data Access 流式数据

架构 Architecture

HDFS 遵循主/从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成:

  • NameNode : 负责执行有关 The File System Namespace 的操作,例如打开,关闭、重命名文件和目录等。它同时还负责集群元数据的存储,记录着文件中各个数据块的位置信息。
  • DataNode:负责提供来自文件系统客户端的读写请求,执行块的创建,删除等操作。

运行机制

用户文件会被切块后存储在多台 DataNode 服务器当中,并且每个文件在整个集群中存放多个副本,可以提高数据的安全性

  • 对用户提供统一的目录,存储时会把文件切分为若干个块存储在不同 DataNode
  • 用户可以设置多个数据副本,以增强数据的安全性
  • 用户存储的信息放在 NameNode 中,可以理解为目录

NameNode

整个文件系统的管理节点,接收用户的请求,保存这文件/目录的元数据信息和每个文件对应的 block 映射表。即目录。 不会存储任何的用户数据

  • fsimage:元数据镜像文件,存储一段时间内 NameNode 的元数据信息
  • edits:保存操作日志文件
  • fstime:保存最近一次 checkpoint 的时间
  • seen_txid:最后一个 edites 的数字
  • VERSION

DataNode

  • 提供正式文件的数据存储服务.
  • block 是最基础的存储单元. HDFS 默认的block大小是 128M

数据复制 Data Replication

由于 Hadoop 被设计运行在廉价的机器上,这意味着硬件是不可靠的,为了保证容错性,HDFS 提供了数据复制机制。HDFS 将每一个文件存储为一系列block,每个块由多个副本来保证容错,块的大小和复制因子可以自行配置 (默认情况下,块大小是 128M,默认复制因子是 3).

大型的 HDFS 实例在通常分布在多个机架的多台服务器上,不同机架上的两台服务器之间通过交换机进行通讯。在大多数情况下,同一机架中的服务器间的网络带宽大于不同机架中的服务器之间的带宽。因此 HDFS 采用 rack-aware replica placement policy 机架感知副本放置策略,对于常见情况,当复制因子为 3 时,HDFS 的放置策略是:

  • 在写入程序位于 DataNode 上时,就优先将写入文件的一个副本放置在该 DataNode 上,否则放在随机 DataNode 上。
  • 之后在另一个远程机架上的任意一个节点上放置另一个副本,并在该机架上的另一个节点上放置最后一个副本。
  • This policy cuts the inter-rack write traffic which generally improves write performance. 此策略可以减少机架间的写入流量,从而提高写入性能。

如果复制因子大于 3,则随机确定第 4 个和之后副本的放置位置,同时保持每个机架的副本数量低于上限,上限值通常为 (复制系数 - 1)/机架数量 + 2,需要注意的是不允许同一个 DataNode 上具有同一个块的多个副本。

健壮性 Robustness

1. 心跳机制和重新复制 Heartbeats and Re-Replication

每个 DataNode 定期向 NameNode 发送心跳消息,如果超过指定时间没有收到心跳消息,则将 DataNode 标记为死亡。NameNode 不会将任何新的 I/O 请求转发给标记为死亡的 DataNode, 也不会再使用这些 DataNode 上的数据。 由于数据不再可用,可能会导致某些块的复制因子小于其指定值,NameNode 会跟踪这些块,并在必要的时候进行重新复制。

2. 数据的完整性 Data Integrity

由于存储设备故障等原因,存储在 DataNode 上的数据块也会发生损坏。为了避免读取到已经损坏的数据而导致错误,HDFS 提供了数据完整性校验机制来保证数据的完整性,具体操作如下:

当客户端创建 HDFS 文件时,它会计算文件的每个块的 checksum 校验和,并将 checksum 存储在同一 HDFS namespace 下的单独的隐藏文件中。当客户端检索文件内容时,它会验证从每个 DataNode 接收的数据是否与存储在关联校验和文件中的 checksum 匹配。如果匹配失败,则证明数据已经损坏,此时客户端会选择从其他 DataNode 获取该块的其他可用副本。

3.元数据的磁盘故障 Metadata Disk Failure

FsImageEditLog 是 HDFS 的核心数据,这些数据的意外丢失可能会导致整个 HDFS 服务不可用。为了避免这个问题,可以配置 NameNode 使其支持 FsImageEditLog 多副本同步,这样 FsImageEditLog 的任何改变都会引起每个副本 FsImageEditLog 的同步更新。

4.支持快照 Snapshots

  • Snapshots support storing a copy of data at a particular instant of time. One usage of the snapshot feature may be to roll back a corrupted HDFS instance to a previously known good point in time.
  • 快照支持在特定时刻存储数据副本,在数据意外损坏时,可以通过回滚操作恢复到健康的数据状态。

MapReduce

MapReduce 解决了什么问题?

2004 年谷歌提出了 MapReduce, 在此之前谷歌程序员面对的大规模数据集,常常需要编程实现:

  • 统计某个关键词的现的频率,计算 pageRank
  • 对大规模数据按词频排序
  • 对多台机器上的文件进行 grep 等

这些工作不可能在一台机器上完成(否则也不能称之为大规模),因此谷歌的程序员每次编写代码都需要处理,多机并行协同,网络通信,处理错误,提高执行效率等问题。

这些问题使得开发效率严重降低,因此为了治理这一现象导致的复杂度,Jeff Dean,设计了一种新的编程模型 MapReduce。

所以 MapReduce 就是为了编写在普通机器上运行的大规模并行数据处理程序而抽象出来的编程模型,为解决 多机并行协同、网络通信、处理错误、提高执行效率 等通用性问题的一个编程框架。

简介 Intro

Hadoop MapReduce 是一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到 Hadoop 集群上用于并行处理大规模的数据集。

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 以并行的方式处理,框架对 map 的输出进行排序,然后输入到 reduce 中。MapReduce 框架专门用于 <key,value> 键值对处理,它将作业的输入视为一组 <key,value> 对,并生成一组 <key,value> 对作为输出。输入和输出的 keyvalue 都必须实现Writable 接口。

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

局限与不足

  • 抽象层次低,需要手工编写代码来完成,使用上难以上手。
  • 只提供两个操作,Map 和 Reduce,表达力欠缺。
  • 一个 Job 只有 Map 和 Reduce 两个阶段(Phase),复杂的计算需要大量的 Job 完成,Job 之间的依赖关系是由开发者自己管理的。
  • 处理逻辑隐藏在代码细节中,没有整体逻辑。
  • 中间结果也放在 HDFS 文件系统中
  • ReduceTask 需要等待所有 MapTask 都完成后才可以开始
  • 时延高,只适用 Batch 数据处理,对于交互式数据处理,实时数据处理的支持不够。
  • 对于迭代式数据处理性能比较差。

计算框架对比

  • MapReduce: 它是一种离线计算框架,将一个算法抽象成 Map 和 Reduce 两个阶段进行处理,非常适合数据密集型计算。
  • Spark: MapReduce 计算框架不适合(不是不能做,是不适合,效率太低)迭代计算(常见于 machine learning 领域,比如 PageRank)和交互式计算(data mining 领域,比如 SQL 查询),MapReduce 是一种磁盘计算框架,而 Spark 则是一种内存计算框架,它将数据尽可能放到内存中以提高迭代应用和交互式应用的计算效率。
  • Storm: MapReduce 也不适合进行流式计算、实时分析,比如广告点击计算等,而 Storm 则更擅长这种计算、它在实时性要远远好于 MapReduce 计算框架。

Programming Model 编程模型

MapReduce 计算模型主要由三个阶段构成: Map, Shuffle, Reduce.

  • Map: 数据输入,做初步的处理,输出形式的中间结果
  • Shuffle: 按照 partition, key 对中间结果进行排序合并,输出给 reduce 线程
  • Reduce: 对相同 key 的输入进行最终的处理, 并将结果写入到文件中

Map 和 Reduce 操作需要我们自己定义相应 Map 类和 Reduce 类,以完成我们所需要的化简、合并操作,而 shuffle 则是系统自动帮我们实现的。Shuffle 过程有一部分是在 Map 端,有一部分是在 Reduce 端。

Shuffle

Shuffle 的使用地点:发生在 map task 输出结果传送到 reduce task 输入的阶段。

  • Shuffle 过程中的几个名词:
    • Shuffle 洗牌
    • spill 溢写
    • combiner
    • merge
    • copy
  • 使用 Shuffle 的好处:
    • 在从 map task 端拉取数据到 reduce task 端时,减少宽带的消耗
    • 将数据完整的从 map task 端拉取数据到 reduce task 端
    • 减少磁盘 IO 对 task 的影响

Eg. 词频统计

这里以词频统计为例进行说明,MapReduce 处理的流程如下:

  1. input: 读取文本文件;
  2. splitting: 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping: 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling:
    • 由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
    • The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged. 同时会执行排序阶段
  5. Reducing: 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

执行过程

1. InputFormat & RecordReaders

InputFormat 将输出文件拆分为多个 InputSplit,并由 RecordReadersInputSplit 转换为标准的<key,value>键值对,作为 map 的输出。这一步的意义在于只有先进行逻辑拆分并转为标准的键值对格式后,才能为多个 map 提供输入,以便进行并行处理。

2. Map Worker

  • 某个 TaskTracker 领取 map 任务
  • TaskTracker 启动单独的 JVM 运行这个 map 任务
  • Workers are assigned tasks to perform actions.

3. Partitioner

  • 可理解为 Hash 函数,用于分配任务给 reducer
  • map 的输出按照 key 值的不同分别分给对应的 reducer,支持自定义实现
  • 后将数据写入到内存缓冲区,缓冲区的作用是批量收集 map 结果,减少磁盘 IO 的影响。key/value 对以及 Partition 的结果都会被写入缓冲区。在写入之前,key 与 value 值都会被序列化成字节数组。

4. Spill: Sort & Combiner

在 map task 端将输出的数据写入缓冲区直到到达阈值(默认缓冲区大小 100M,阈值是 80%)。达到阈值启动溢写线程 Spill 即用于 把内存缓冲区中的数据写入到本地磁盘,在写入本地磁盘时先按照 partition、再按照 key 进行排序 quick sort

  • Spill 是由另外单独的线程来完成,不影响往缓冲区写 map 结果的线程.
  • 在将数据写入磁盘之前,先要对要写入磁盘的数据进行一次排序操作,先按 <key, value, partition> 中的 partition 分区号排序,然后再按 key 排序,这个就是 Sort 操作,最后溢出的小文件是分区的,且同一个分区内是保证 key 有序的.

Combiner

combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作,它主要是在 map 计算出中间文件后做一个简单的合并重复 key 值的操作。这里以词频统计为例:

map 在遇到一个 hadoop 的单词时就会记录为 1,但是这篇文章里 hadoop 可能会出现 n 多次,那么 map 输出文件冗余就会很多,因此在 reduce 计算前对相同的 key 做一个合并操作,那么需要传输的数据量就会减少,传输效率就可以得到提升。

但并非所有场景都适合使用 combiner,使用它的原则是 combiner 的输出不会影响到 reduce 计算的最终输入,例如:求总数,最大值,最小值时都可以使用 combiner,但是做平均值计算则不能使用 combiner

不使用 combiner 的情况:

使用 combiner 的情况:

可以看到使用 combiner 的时候,需要传输到 reducer 中的数据由 12keys,降低到 10keys。降低的幅度取决于你 keys 的重复率.

5. Merge

如果数据很大,会发生多次溢写既会有多个溢写文件,待 Map Task 任务的所有数据都处理完后,会对任务产生的所有中间数据文件做一次合并操作,以确保一个 Map Task 最终只生成一个中间数据文件

6. Reduce

  • 在部分 map 任务执行完后 (不用等到所有 map 任务结束) JobTracker 开始分配 reduce 任务到 TaskTracker;
  • TaskTracker 启动单独的 JVM 运行这个 reduce 任务;
  • TaskTracker 从远地下载中间结果文件到本地 (指 partition 文件、一个 partition 对应一个 reduce), 为 reduce 任务真正开展做准备,但不会开始执行 reduce() 函数. 待所有的 map 任务都完成以后,JobTracker 通知所有的 TaskTracker 开始做 reduce 任务.
  • TaskTracker 和 JobTracker 定期通信,报告进度.

YARN

简介 Intro

Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。

YARN is essentially a system for managing distributed applications. It consists of a central ResourceManager, which manages all available cluster resources, and a per-node NodeManager, which takes direction from the ResourceManager and is responsible for managing resources available on a single node.

架构 Architecture

1. ResourceManager

  • ResourceManager is the scheduler for the resource among all the applications in the system.
  • ResourceManager 通常在独立的机器上以后台进程的形式运行,它是整个集群资源的主要协调者和管理者。ResourceManager 负责给用户提交的所有应用程序分配资源,它根据应用程序优先级、队列容量、ACLs、数据位置等信息,做出决策,然后以共享的、安全的、多租户的方式制定分配策略,调度集群资源。
  • The ResourceManager has two main components:
    • Scheduler
      • FIFO scheduler - This places applications in a queue and runs them in the order of submission (first in, first out). It is not desirable, as a long-running application might block the small running applications
      • Capacity scheduler - A separate dedicated queue allows the small job to start as soon as it is submitted. The large job finishes later compared to using the FIFO scheduler
      • Fair scheduler - There is no need to reserve a set amount of capacity since it will dynamically balance resources between all the running jobs
    • ApplicationsManager

2. NodeManager

  • The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
  • NodeManager 是 YARN 集群中的每个具体节点的管理者。主要负责该节点内所有容器的生命周期的管理,监视资源和跟踪节点健康。具体如下:
    • 启动时向 ResourceManager 注册并定时发送心跳消息,等待 ResourceManager 的指令;
    • 维护 Container 的生命周期,监控 Container 的资源使用情况;
    • 管理任务运行时的相关依赖,根据 ApplicationMaster 的需要,在启动 Container 之前将需要的程序及其依赖拷贝到本地。

3. ApplicationMaster

  • The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.
  • 在用户提交一个应用程序时,YARN 会启动一个轻量级的进程 ApplicationMasterApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器内资源的使用情况,同时还负责任务的监控与容错。具体如下:
    • 根据应用的运行状态来决定动态计算资源需求;
    • ResourceManager 申请资源,监控申请的资源的使用情况;
    • 跟踪任务状态和进度,报告资源的使用情况和应用的进度信息;
    • 负责任务的容错。

4. Container

  • Essentially, the Container is the resource allocation, which is the successful result of the ResourceManager granting a specific ResourceRequest. A Container grants rights to an application to use a specific amount of resources (memory, cpu etc.) on a specific host. 本质上是资源分配,运行应用程序在某个主机上使用 memory, cpu etc.
  • Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当 AM 向 RM 申请资源时,RM 为 AM 返回的资源是用 Container 表示的。
  • YARN 会为每个任务分配一个 Container,该任务只能使用该 Container 中描述的资源。ApplicationMaster 可在 Container 内运行任何类型的任务。
    • 例如,MapReduce ApplicationMaster 请求一个容器来启动 map 或 reduce 任务,而 Giraph ApplicationMaster 请求一个容器来运行 Giraph 任务。

工作原理 How to work

  1. Client 提交作业到 YARN 上.
  2. Resource Manager 选择一个 Node Manager,启动一个 Container 并运行 Application Master 实例.
  3. Application Master 根据实际需要向 Resource Manager 请求更多的 Container 资源 (如果作业很小, 应用管理器会选择在其自己的 JVM 中运行任务).
  4. Application Master 通过获取到的 Container 资源执行分布式计算.

环境配置

安装

本机连接 ECS 服务器

# 我的服务器ip 登录名@root
ssh root@116.62.21.206

# 输入密码

scp 上传安装包

# hadoop
scp hadoop-2.6.0-cdh5.15.1.tar.gz root@116.62.21.206:/tmp/
# hive
scp hive-1.1.0-cdh5.15.1.tar.gz root@116.62.21.206:/tmp/

解压

# 创建 app/ 目录
mkdir app

# 解压到 //app/ 目录下面
tar -zxvf hadoop-2.6.0-cdh5.15.1.tar.gz -C //app/

# 解压完成

修改配置文件

配置 /etc/hadoop/hadoop_env.sh

# 进入hadoop目录
cd etc/hadoop

vi hadoop-env.sh

# insert
export JAVA_HOME=/root/jdk1.8
# 保存退出

配置 /etc/hadoop/core-site.xml

vi core-site.xml

# insert
# 本机名@root, 8020 port
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:8020</value>
</property>

配置 /etc/hadoop/hdfs-site.xml

vi hdfs-site.xml

<property>
<name>dfs.replication</name>
<value>1</value>
</property>

配置临时文件存放目录

mkdir tmp
# /app/tmp 目录存放与hadoop相关的东西

vi hdfs-site.xml

<property>
<name>hadoop.tmp.dir</name>
<value>/app/tmp</value>
</property>

配置 ~/.bash_profile 环境变量

# hadoop pwd
# /app/hadoop-2.6.0-cdh5.15.1

vi ~/.bash_profile

# insert
export HADOOP_HOME=/app/hadoop-2.6.0-cdh5.15.1
export PATH=$HADOOP_HOME/bin:$PATH

# 退出
source ~/.bash_profile

启动 HDFS

第一次启动一定要格式化

cd $HADOOP_HOME/bin

hdfs namenode -format

# Storage directory /app/tmp/dfs/name has been successfully formatted.

正式启动服务

$HADOOP_HOME/sbin/start-dfs.sh

# 验证
jps

# 3552 DataNode
# ...
# 4487 SecondaryNameNode
# 4284 NameNode

启动服务后,尝试访问 http://116.62.21.206:50070/

若打不开,在阿里云中配置安全组端口号=50070

停止 HDFS

$HADOOP_HOME/sbin/stop-dfs.sh

# localhost: stopping namenode
# localhost: stopping datanode
# Stopping secondary namenodes [0.0.0.0]