Hadoop系列之DistributedCache用法

By | 03月08日
Advertisement

DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义;用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点。本文将介绍DistributedCache在Hadoop
1.0和2.0中的使用方法及实现原理。

Hadoop DistributedCache有以下几种典型的应用场景:1)分发字典文件,一些情况下Mapper或者Reducer需要用到一些外部字典,比如黑白名单、词表等;2)map-side join:当多表连接时,一种场景是一个表很大,一个表很小,小到足以加载到内存中,这时可以使用DistributedCache将小表分发到各个节点上,以供Mapper加载使用;3)自动化软件部署:有些情况下,MapReduce需依赖于特定版本的库,比如依赖于某个版本的PHP解释器,一种做法是让集群管理员把这个版本的PHP装到各个机器上,这通常比较麻烦,另一种方法是使用DistributedCache分发到各个节点上,程序运行完后,Hadoop自动将其删除。

Hadoop提供了两种DistributedCache使用方式,一种是通过API,在程序中设置文件路径,另外一种是通过命令行(-files,-archives或-libjars)参数告诉Hadoop,个人建议使用第二种方式,该方式可使用以下三个参数设置文件:

(1)-files:将指定的本地/hdfs文件分发到各个Task的工作目录下,不对文件进行任何处理;

(2)-archives:将指定文件分发到各个Task的工作目录下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中,比如压缩包为dict.zip,则解压后内容存放到目录dict.zip中。为此,你可以给文件起个别名/软链接,比如dict.zip#dict,这样,压缩包会被解压到目录dict中。

(3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自动添加到任务的CLASSPATH环境变量中。

前面提到,DistributedCache分发的文件是有可见范围的,有的文件可以只对当前程序可见,程序运行完后,直接删除;有的文件只对当前用户可见(该用户所有程序都可以访问);有的文件对所有用户可见。DistributedCache会为每种资源(文件)计算一个唯一ID,以识别每个资源,从而防止资源重复下载,举个例子,如果文件可见范围是所有用户,则在每个节点上,第一个使用该文件的用户负责缓存该文件,之后的用户直接使用即可,无需重复下载。那么,Hadoop是怎样区分文件可见范围的呢?

在Hadoop 1.0版本中,Hadoop是以HDFS文件的属性作为标识判断文件可见性的,需要注意的是,待缓存的文件即使是在Hadoop提交作业的客户端上,也会首先上传到HDFS的某一目录下,再分发到各个节点上的,因此,HDFS是缓存文件的必经之路。对于经常使用的文件或者字典,建议放到HDFS上,这样可以防止每次重复下载,做法如下:

比如将数据保存在HDFS的/dict/public目录下,并将/dict和/dict/public两层目录的可执行权限全部打开(在Hadoop中,可执行权限的含义与linux中的不同,该权限只对目录有意义,表示可以查看该目录中的子目录),这样,里面所有的资源(文件)便是所有用户可用的,并且第一个用到的应用程序会将之缓存到各个节点上,之后所有的应用程序无需重复下载,可以在提交作业时通过以下命令指定:

-files hdfs:///dict/public/blacklist.txt, hdfs:///dict/public/whilelist.txt

如果有多个HDFS集群可以指定namenode的对外rpc地址:

-files hdfs://host:port/dict/public/blacklist.txt, hdfs://host:port/dict/public/whilelist.txt

DistributedCache会将blacklist.txt和whilelist.txt两个文件缓存到各个节点的一个公共目录下,并在需要时,在任务的工作目录下建立一个指向这两个文件的软连接。

如果可执行权限没有打开,则默认只对该应用程序的拥有者可见,该用户所有应用程序可共享这些文件。

一旦你对/dict/public下的某个文件进行了修改,则下次有作业用到对应文件时,会发现文件被修改过了,进而自动重新缓存文件。

对于一些频繁使用的字典,不建议存放在客户端,每次通过-files指定,这样的文件,每次都要经历以下流程:上传到HDFS上—》缓存到各个节点上—》之后不再使用这些文件,直到被清除,也就是说,这样的文件,只会被这次运行的应用程序使用,如果再次运行同样的应用程序,即使文件没有被修改,也会重新经历以上流程,非常耗费时间,尤其是字典非常多,非常大时。

DistributedCache内置缓存置换算法,一旦缓存(文件数目达到一定上限或者文件总大小超过某一上限)满了之后,会踢除最久没有使用的文件。

在Hadopo 2.0中,自带的MapReduce框架仍支持1.0的这种DistributedCache使用方式,但DistributedCache本身是由YARN实现的,不再集成到MapReduce中。YARN还提供了很多相关编程接口供用户调用,有兴趣的可以阅读源代码。

下面介绍Hadoop 2.0中,DistributedCache通过命令行分发文件的基本使用方式:

(1)运行Hadoop自带的example例子, dict.txt会被缓存到各个Task的工作目录下,因此,直接像读取本地文件一样,在Mapper和Reducer中,读取dict.txt即可:


1

2

3

4

5

6

bin/Hadoop

jar \

share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar
\

wordcount
\

-files
hdfs:
///dict/public/dict.txt
\

/test/input

\

/test/output

(2)Hadoop Streaming例子,需要通过-files指定mapper和reducer可执行文件或者脚本文件,这些文件就是通过DistributedCache分发到各个节点上的。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

#!/bin/bash

HADOOP_HOME=/opt/yarn-client

INPUT_PATH=/test/input/data

OUTPUT_PATH=/test/output/data

echo

"Clearing output path: $OUTPUT_PATH"

$HADOOP_HOME/bin/hadoop

fs -rmr $OUTPUT_PATH

${HADOOP_HOME}/bin/hadoop

jar\

   ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\

  -D
mapred.reduce.tasks=2\

  -files
mapper,reducer\

  -input
$INPUT_PATH\

  -output
$OUTPUT_PATH\

  -mapper
mapper\

  -reducer
reducer

(3)接下给出一个缓存压缩文件的例子,假设压缩文件为dict.zip,里面存的数据为:


1

2

3

4

data/1.txt

data/2.txt

mapper.list

reducer.list

通过-archives参数指定dict.zip后,该文件被解压后,将被缓存(实际上是软连接)到各个Task的工作目录下的dict.zip目录下,组织结构如下:


1

2

3

4

5

6

dict.zip/

    data/

        1.txt

        2.txt

    mapper.list

    reducer.list

你可以在Mapper或Reducer程序中,使用类似下面的代码读取解压后的文件:


1

2

3

File
file2 = read(“dict.zip/data/1.txt”, “r”);

…….

File
file3 = read(“dict.zip/mapper.list”, “r”);

如果你想直接将内容解压到Task工作目录下,而不是子目录dict.zip中,可以用“-files”(注意,不要使用-archives,“-files”指定的文件不会被解压)指定dict.zip,并自己在程序中实现解压缩:


1

2

3

4

#include
<cstdlib>

…….

system(“unzip
–q dict.zip”);
//C++代码

……

总之,Hadoop DistributedCache是一个非常好用的工具,合理的使用它能够解决很多非常困难的问题。

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce-nextgen/hadoop-distributedcache-details/

作者:Dong,作者介绍:http://dongxicheng.org/about/

本博客的文章集合:http://dongxicheng.org/recommend/

Similar Posts:

  • C/C++系列: static用法总结

    C/C++系列: static用法总结 <!-- [if gte mso 9]><xml> <w:WordDocument> <w:View>Normal</w:View> <w:Zoom>0</w:Zoom> <w:TrackMoves/> <w:TrackFormatting/> <w:PunctuationKerning/> <w:DrawingGridVerticalSpa

  • 小丸子学Hadoop系列之——部署Hbase集群

    0.集群规划 主机名 ip地址 安装的软件 运行的进程 AI-OPT-HBS01 10.46.52.30 hadoop,hbase namenode,zkfc,resourcemanager AI-OPT-HBS02 10.46.52.31 hadoop namenode,zkfc,resourcemanager AI-OPT-HBS03 10.46.52.32 hadoop,hbase datanode AI-OPT-HBS04 10.46.52.33 hadoop,zookeeper,hba

  • Python中dictionary items()系列函数的用法实例

    本文实例讲述了Python中dictionary items()系列函数的用法,对Python程序设计有很好的参考借鉴价值.具体分析如下: 先来看一个示例: import html # available only in Python 3.x def make_elements(name, value, **attrs): keyvals = [' %s="%s"' % item for item in attrs.items()] attr_str = ''.join(keyvals

  • shell基础系列:awk 用法

    shell基础系列:awk 用法 awk ' pattern {action} ' 变量名 含义 argc 命令行变元个数 argv 命令行变元数组 filename 当前输入文件名 fnr 当前文件中的记录号 fs 输入域分隔符,默认为一个空格 rs 输入记录分隔符 nf 当前记录里域个数 NR 到目前为止记录数 OFS 输出域分隔符 ORS 输出记录分隔符 1.awk '/101/' file 显示文件file中包含101的匹配行. IXDBA.NET技术社区 awk '/101/,/105

  • Hadoop系列之初始Hadoop

    前言 工作后很少主动再学习其他新的技术了,这次终于鼓足勇气开始了新的篇章--Hadoop,作为一个如今最火的技术之一,我也来瞧瞧它的庐山真面目.不过本次要准备写的Hadoop系列基本是个学习笔记了,中间会掺杂些自己的理解,以自己理解的方式展现出来. 正题 Hadoop历史 万事从头说起,我们先来看下Hadoop的由来: 始于2002年的apache项目Nutch 2003年Google发表了关于GFS的论文 2004年Nutch的开发者开发了NDFS 2004年Google发表了关于MapRed

  • hadoop中的DistributedCache 2

    hadoop中的DistributedCache 2 WordCount.javaHadoop的分布式缓存机制使得一个job的所有map或reduce可以访问同一份文件.在任务提交后,hadoop将由-files和-archive选项指定的文件复制到HDFS上(JobTracker的文件系统).在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这样任务就可以访问这些文件.对于job来说,它并不关心文件是从哪儿来的.在使用DistributedCache

  • spring hadoop系列二(MapReduce and Distributed cache)

    关于MapReduce and Distributed Cache 一.创建Hadoop Job 在前面的系列一里面我们已经知道如何配置hadoop了,在完成配置工作之后,我们如何提交job,并运行这些job将是接下来我们讲述的 使用SHDP创建job是相当简单的 <hdp:job id="mr-job" 指定jod id input-path="/input/" output-path="/ouput/" 指定任务input和output

  • Hadoop 中使用DistributedCache遇到的问题

    自己在写MAR/REDUCE代码时,遇到了一个问题,一个大数据文件和一个小数据文件匹配计算,但是小数据文件太小,所以想采用HIVE的MAP JOIN的方式,把小数据文件放到直接大数据文件map的datanode的内存中,这样少了MR代码的1对N的数据文件关联. 实现这个的最佳方案就是利用distributed cache.HIVE的MAP JOIN也是利用这个技术. 首先简要介绍一下distributed cache是如何使用的,然后总结下自己在使用distributed cache遇到的问题,

  • Hadoop系列之七:分布式文件系统HDFS(2)

    1.访问HDFS文件系统 HDFS是工作于用户空间的文件系统,它的树状文件系统是独立的,不能像传统上工作于内核空间的文件系统一样挂载至当前操作系统的目录树上对HDFS进行访问,传统上实现文件或目录管理的命令如ls.cat等此处也无法正常使用.对HDFS文件系统上的文件进行访问,需要通过HDFS的API或者由hadoop提供的命令行工具进行. 1.1 HDFS用户接口 (1) hadoop dfs命令行接口: (2) hadoop dfsadmin命令行接口: (3) web接口: (4) HDF

  • Hadoop系列之HBASE(分布式数据库)安装配置

    1.hbase安装 cd /root/soft tar zxvf hbase-0.98.5-hadoop2-bin.tar.gz mv hbase-0.98.5-hadoop2 /usr/local/hadoop/hbase 2.添加环境变量(所有节点都增加) #vim /etc/profile export HBASE_HOME=/usr/local/hadoop/hbase export PATH=$PATH:/usr/local/hadoop/hbase/bin #source /etc/

Tags: