当地提交MapReduce作业至集群,大额学习day_五

By admin in 美高梅手机版4858 on 2019年4月21日

 

Hadoop-2.4.1学习之Mapper和Reducer

MapReduce允许技师可以轻巧地编写并行运转在分布集群上拍卖大量数据的次序,确定保障程序的运维稳固可信赖和兼具容错处理本领。程序员编写的运作在MapReduce上的应用程序称为作业(job),Hadoop既扶助用Java编写的job,也支撑任何语言编写的学业,比方Hadoop
Streaming(shell、python)和Hadoop
Pipes(c++)。Hadoop-二.X不再保留Hadoop-一.X版本中的JobTracker和TaskTracker组件,但那并不代表Hadoop-二.X不再帮衬MapReduce作业,相反Hadoop-贰.X通过唯壹的主ResourceManager、每一种节点2个的从NodeManager和种种应用程序多少个的MRAppMaster保留了对MapReduce作业的向后11分。在新本子中MapReduce作业照旧由Map和Reduce职责组成,Map依旧接收由MapReduce框架将输入数据分割为数据块,然后Map义务以完全并行的形式管理那个数据块,接着MapReduce框架对Map义务的输出举办排序,并将结果做为Reduce职务的输入,最终由Reduce任务输出最终的结果,在整个实行进程中MapReduce框架肩负职分的调解,监察和控制和重新推行倒闭的职分等。

一般总计节点和仓库储存节点是平等的,MapReduce框架会立见功用地将任务陈设在仓库储存数据的节点上,有助于下跌传输数据时的带宽使用量。MapReduce应用程序通过得以落成或然接续合适的接口或类提供了map和reduce函数,那多个函数担任Map职分和Reduce职分。作业客户端将编写制定好的课业提交给ResourceManager,而不再是JobTracker,ResourceManager担当将作业布满到从节点上,调解和监察作业,为作业客户端提供景况和会诊音讯。

MapReduce框架只管理<key,
value>键值对,也正是将作业的输入视为一些键值对并出口键值对。做为键值的类必须能够被MapReduce框架连串化,因而需求实现Writable接口,常用的IntWritable,LongWritable和Text都以贯彻该接口的类。做为键的类除却要落到实处Writable接口外,还索要达成WritableComparable接口,落成该接口首要为了促进排序,上边提到的三个类也都落成了该接口。

在简约介绍了MapReduce框架后,上边深远学习框架中的四个第二概念:Mapper和Reducer,正如上文提到了,它们构成了MapReduce作业并担任完结实际的事体逻辑管理。

Mapper是独自的任务,将输入记录转变为中等记录,即对输入的键值对进展拍卖,并出口为一组中间键值对,输出的键值对选用context.write(WritableComparable,
Writable)方法收集起来,中间记录的键值类型不必与输入记录的键值类型一样,实际上也1再是见仁见智的。一条输入记录经由Mapper管理后只怕输出为0条恐怕多条个中记录。比方,假使输入记录不满意专门的工作须要(未有包括特定的值只怕隐含了一定的值)的话,能够一贯回到,则会输出0条记下,此时Mapper起了过滤器的意义。

继之MapReduce框架将与给定键相关联的有所中等值分组,然后传递给Reducer。用户能够经过Job.setGroupingComparatorClass(Class)方法钦赐Comparator来调整分组。Mapper的输出被排序然后根据Reducer分区,总的分区数与作业运行的Reducer任务数一致,程序员可以透过落到实处自定义的Partitioner调节输出的笔录由哪些Reducer管理,默许使用的是HashPartitioner。程序猿还足以因此Job.setCombinerClass(Class)钦点2个combiner来施行中间输出的本地聚合,那促进减弱Mapper到Reducer的数额传输。Mapper的中游输出经过排序后一而再保存为(key-len,
key,value-len,
value)的格式,应用程序能够由此Configuration调控是不是将中等输出进行压缩,以及使用何种压缩方式,相关的多少个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable,
Writable,
Context)管理该任务的价值对,应用程序能够覆盖cleanup(Context)方法达成任何索要的清总管业。

MapReduce框架为种种由作业的InputFormat生成的InputSplit运行三个map职责,因而总的map职责数量由输入数据大小决定,更标准说是由输入文件总的块数决定。即使可以为较少使用CPU的map职分在节点上安装300个map职责,但各样节点更符合并行运营十-玖1伍个map职责。由于职责的启航需求开销一些光阴,所以义务的运行最佳至少须求一分钟,因为只要任务运行的时日很少,整个作业的日子将超越54%消耗在职分的树立方面。

Reducer将享有同样键的1组中间值下降为1组越来越小数码的值,举个例子合并单词的数目等。3个功课运维的Reducer数量能够由此Job.setNumReduceTasks(int)恐怕mapred-site.xml中的参数mapreduce.job.reduces设置,可是更推荐前者,因为可以由技士决定运维多少个reducer,而后者更加多的是提供了一种暗许值。工程师使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key,
(list of values)>调用reduce(WritableComparable,
Iterable<Writable>,
Context)方法,同Mapper同样,技师也得以覆盖cleanup(Context)方法钦命需求的清理专门的职业。

Reducer的管理进度首要回顾四个级次:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取具备Mapper输出的连锁分区。在Sort阶段,框架依照键分组Reducer的输入(分裂的mapper或者输出一样的键)。Shuffle和sort是同时开始展览的,获取Mapper的出口后然后统1它们。在reduce阶段,调用reduce(WritableComparable,
Iterable<Writable>管理<key, (list of
values)>对。Reducer的输出平常通过Context.write(WritableComparable,Writable)写入文件系统,举例HDFS,当然也能够透过动用DBOutputFormat将出口写入数据库。Reducer的出口是未经排序的。

假如不必要Reducer,能够使用Job.setNumReduceTasks(int)将Reducer的数目设置为0(假若不选择该办法设置Reducer的数码,由于mapreduce.job.reduces暗许为一,会运行3个Reducer),在那种状态下,Mapper的出口将向来写入FileOutputFormat.setOutput帕特h(Job,Path)钦定的门道中,并且MapReduce框架不会对Mapper的出口进行排序。

1经在拓展reduce之前想使用与分组中间键时分歧的可比规则,能够经过Job.setSortComparatorClass(Class)内定不相同的Comparator。也便是Job.setGroupingComparatorClass(Class)调节了何等对中级输出分组,而Job.setSortComparatorClass(Class)调节了在将数据传入reduce在此之前开始展览的第3回分组。

分化于Mapper的数额由输入文件的大大小小鲜明,Reducer的多少能够由技士鲜明设置,那么设置有个别Reducer能够达到规定的标准较好地功效呢?Reducer的数据限制为:(0.95
~1.75 ) * 节点数量 *
各样节点上最大的器皿数。参数yarn.scheduler.minimum-allocation-mb设置了各类容器可伸手的细微内部存款和储蓄器,那么最大容器数可依据总的内部存款和储蓄器除以该参数总括得出。当使用0.75时,全体的Reducer会被立刻加载,并当Mapper实现时先河传输Mapper的输出。使用一.7伍时,极快的节点将成功它们第二轮的天职,然后加载第二波职责,这样对负载平衡具备越来越好的功能。扩展Reducer的数据即便增添了框架费用,但扩充了负载平衡和消沉了停业的财力。上边的百分比因子比总的Reducer数量稍微一些些,感觉预测实施的职分和倒闭的天职保留一点点的Reducer槽,也正是事实上的Reducer数量为地点公式得出的多少增加保留的Reducer数量。

CentOS安装和配置Hadoop二.二.0 

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 1二.十 +Hadoop 壹.2.一本子集群配置

Ubuntu上搭建Hadoop情况(单机形式+伪布满情势)

Ubuntu下Hadoop情状的安顿

单机版搭建Hadoop情形图像和文字教程详解

搭建Hadoop意况(在Winodws意况下用编造机虚拟五个Ubuntu系统开展搭建)

MapReduce允许程序猿能够轻便地编写并行运营在科学普及集群上拍卖大批量多少的先后,确定保障程序的周转平稳可信和…

考虑难点

MapReduce允许工程师能够轻便地编写并行运营在大面积集群上管理大批量数目标次第,确定保证程序的运行稳固可信赖和富有容错管理技能。技师编写的运转在MapReduce上的应用程序称为作业(job),Hadoop既协理用Java编写的job,也支撑任何语言编写的课业,比方Hadoop
Streaming(shell、python)和Hadoop
Pipes(c++)。Hadoop-2.X不再保留Hadoop-壹.X版本中的JobTracker和TaskTracker组件,但那并不意味Hadoop-2.X不再帮助MapReduce作业,相反Hadoop-二.X通过唯1的主ResourceManager、各类节点3个的从NodeManager和每种应用程序三个的MRAppMaster保留了对MapReduce作业的向后卓殊。在新本子中MapReduce作业照旧由Map和Reduce职责组成,Map照旧接收由MapReduce框架将输入数据分割为数据块,然后Map职责以完全并行的办法管理这么些数据块,接着MapReduce框架对Map任务的出口举办排序,并将结果做为Reduce职责的输入,最终由Reduce职责输出最后的结果,在漫天实践进度中MapReduce框架负担职责的调节,监察和控制和重复实践停业的天职等。

               
如故那句话,看旁人写的的三番五次感觉心累,代码1贴,①打包,扔到Hadoop上跑一次就完了了????写个测试样例程序(MapReduce中的Hello
World)还要那样麻烦!!!?,还本地打Jar包,传到Linux上,最后再用jar命令运维jar包敲三回in和out参数,作者去,作者是受不了了,作者很捉急,美高梅手机版4858 1

MapReduce总结

万般计算节点和积攒节点是一样的,MapReduce框架会有效地将职责计划在累积数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过达成也许接续合适的接口或类提供了map和reduce函数,那三个函数肩负Map职分和Reduce职务。作业客户端将编制好的课业提交给ResourceManager,而不再是JobTracker,ResourceManager担负将作业遍布到从节点上,调解和监察和控制作业,为作业客户端提供景况和检查判断消息。

                笔者就想驾驭MapReduce的干活原理,而知晓原理后,笔者就想在本地用Java程序跑3回整个MapReduce的计量进程,这几个很难吗?
搜遍全网,没察觉多少个是友善想要的(也有希望漏掉了),都以能够参照的,不过零零散散不对胃口,不相符入门级“游戏用户”像本身一样的人,最终,下定狠心,看录像搜集素材,从头到尾的捋一下MapReduce的规律,以及怎么着在地点,通过编写制定map和reduce函数对多个文本文件中的单词进行出现次数的总结,并将结果输出到HDFS文件系统上

MapReduce

  1. MapReduce的定义
    MapReduce是壹种编制程序模型,
    用于大规模数据集(大于一TB)的并行运算。它将布满式磁盘读写的主题材料举办抽象,并转变为对二个数据集(由键/值对构成)的计量,该总括由
    mapreduce 两有个别构成。

  2. MapReduce编制程序模型流行的四个技术上面的原委:

  • MapReduce选拔无共享大规模集群系统。集群系统有着能够的性价比和可伸缩性。
  • MapReduce模型简单、易于了解、易于使用。它不但用于拍卖大规模数据,而且能够将洋洋累赘的细节隐藏起来(比如,自动化并行、负载均衡和灾备管理等),十分大地简化了程序猿的付出专门的学问,而且多量多少管理难点,包含不少机械学习和数目开掘算法,都可以行使MapReduce实现。
  • 固然基本的MapReduce模型只提供三个进程性的编制程序接口,但在海量数据情况、供给确定保证可伸缩性的前提下,通过运用方便的询问优化和目录技术,MapReduce还可以够提供很好的数额管理品质。
  1. MapReduce和关系型数据库的可比
MapReduce和关系型数据库之间的另一个区别在于他们所操作的数据集的结构化的程度,这在第一篇的文章已经讨论过。  
需要强调的是,MapReduce输入的键和值并不是数据固有的属性,而是由分析数据的人员来选择的。
  1. MapReduce的特点:
  • 软件框架
  • 并行管理
  • 保障且容错
  • 大规模集群
  • 海量数据集
  1. Map 和 Reduce
    Hadoop框架使用Mapper将数据管理成二个个的<key,value>键值对,在互连网节点间对其张开整理(shuffle),然后利用Reducer管理数据并展开末段输出。
  • Map
    映射的情趣,轻便的话就是把二个输入映射为1组(四个)斩新的多寡,而不去改动原来的数量。
    Mapper担当的是。把纷纭的天义务解为多少个“轻巧的职务”来拍卖。“轻松的职责”包涵三层含义:

    1. 数量或计算的范围相对原职务要大大压缩
    2. 左右总结原则,职分会分配到存放着所需数据的节点上海展览中心开测算
    3. 那么些小职责能够并行总结互相间大概从不借助关系
  • Reduce
    化简的情趣,便是把经过Map得到的一组数据通过有些方法(化简)归十分之一想要的输出值。

    所以说,MapReduce的思想正是:分而治之

MapReduce框架只管理<key,
value>键值对,也正是将作业的输入视为一些键值对并出口键值对。做为键值的类必须能够被MapReduce框架类别化,因而须要贯彻Writable接口,常用的IntWritable,LongWritable和Text都以兑现该接口的类。做为键的类除了那一个之外要贯彻Writable接口外,还亟需实现WritableComparable接口,落成该接口首要为了推进排序,上边提到的多个类也都完结了该接口。

 

MapReduce的专门的学业机制

MapReduce运维图解1

MapReduce运维图解二

上边,我们来分析MapReduce作业的运转搭飞机制

  1. 作业的付出:客户端通过JobClient.runJob()来交付二个学业到jobtracker,JobClient程序逻辑如下:
  • 向Jobtracker请求二个新的job id (JobTracker.getNewJobId());
  • 自己冲突作业的输出表达,如已存在抛错误给客户端;计算作业的输入分片;
  • 将运维作业所须要的能源(包蕴作业jar文件,配置文件和计算机才具切磋所得的输入分片)复制到jobtracker的文件系统中以jobid命名的目录下。作业jar别本较多(mapred.submit.replication
    = 十);
  • 报告jobtracker作业希图实施 (submit job)。

粗略来讲吧,就是

  • 经过JobClient提交,与JobTracker通讯获得3个jar的积存路线和JobId
  • 反省输入输出的门道
  • 测算分片的新闻
  • 将作于所急需的财富(jar,配置文件,计算机技能商讨所得的输入分片)赋值到以作业ID命名的HDFS上
  • 告知JobTracker作业计划试行
  1. 作业的初叶化
  • job
    tracker接收到对其submitJob()方法的调用后,将其放入当中队列,交由job
    scheduler实行调节,并对其进张开首化,包罗成立1个正值运作作业的目的(封装职分和笔录信息)。
  • 为了成立职务运转列表,job
    scheduler首先从共享文件系统中获得JobClient已总结好的输入分片音信,然后为每一个分片创设2个map职务;创制的reduce职责数量由JobConf的mapred.reduce.task属性决定,schedule成立相应数据的reduce职责。职务此时被钦赐ID。

简易的话呢,正是

  • JobTracker配置好Job须要的财富后,JobTracker就会开端化作业
  • 伊始化首要做的是将Job放入贰个内部的队列,让配置好的作业调解器能调治到那么些作业,作业调解器会初阶化那些job。
  • 开首化就是开创贰个正在运维的job对象(封装职分和著录音讯),以便JobTracker追踪job的景色和进程。
  • 开首化达成后,作业调整器会博得输入分片音讯(input
    split),每一个分片成立三个map任务。
  1. 职务的分红
  • jobtacker应该先选用哪个job来运营?这么些由job scheduler来决定
  • jobtracker怎么着选取tasktracker来运营选中作业的天职吗?看下边
  • 每种tasktracker定时发送心跳给jobtracker,告知本身还活着,是还是不是足以承受新的任务。
    jobtracker以此来支配将职务分配给何人(如故选拔心跳的重临值与tasktracker通讯)。
    种种tasktracker会有定位数量的天职槽来处理map和reduce(举个例子二,表示tasktracker能够而且运行多少个map和reduce),由机械内核的数额和内部存储器大小来决定。job
    tracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。
  • jobtracker选用哪个tasktracker来运作map职责急需思量互连网地方,它会挑选叁个离输入分片较近的tasktracker,优先级是数码本地化(data-local)–>机架本地化(rack-local)。
  • 对于reduce职分,未有啥正儿捌经来摘取哪位tasktracker,因为不能思量数据的本地化。map的输出始终是急需经过整治(切分排序合并)后透过网络传输到reduce的,大概四个map的出口会切分出部分送给3个reduce,所以reduce职责大可不必选拔和map同样或近来的机械上。
  1. 职分的实践
  • tasktracker分配到1个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的地头文件系统(jar本地化用来运转JVM)。同时将应用程序所须要的百分百文件从布满式缓存复制到本地磁盘。
  • 接下去tasktracker为任务新建三个本土职业目录,并把jar文件的始末解压到这些文件夹下。
  • tasktracker新建多少个taskRunner实例来运行该任务。
    TaskRunner运转一个新的JVM来运转每种职分,以便客户的map/reduce不会潜移默化tasktracker守护进程。但在分歧任务之间重用JVM依旧或然的。
    子进度经过umbilical接口与父进度张开通讯。任务的子进度每隔几秒便告诉父进度的速度,直到职分完毕。
  1. 进度和情景的翻新
  • 一个学业和各样职分都有贰个情景新闻,包蕴:作业或职分的运作状态(running,
    successful, failed),map和reduce的速度,计数器值,状态音信或描述。
  • task会定期向tasktracker汇报执市价况,tasktracker会定期收集所在集群上的有所task新闻,并想JobTracker汇报.JobTracker会依照全部tasktracker汇报上来的音讯举办聚焦。
  • 这一个音信透过自然的日子间隔由child JVM –> task tracker –> job
    tracker汇集。job
    tracker将产生三个标明全体运转作业及其任务状态的大局视图。你能够透过Web
    UI查看。同时JobClient通过每秒查询jobtracker来得到最新气象。
  1. 学业的做到
  • JobTracker是在吸收到最终3个职务到位后,才将职分标识为”成功”。并将数据结果写入到HDFS上。
  1. 学业的失利
  • JobTracker失败:JobTracker战败那是极致惨重的1种职务败北,退步机制–它是二个单节点故障,由此,作业注定退步。(hadoop二.0缓和了)。
  • tasktracker失利:tasktracker战败崩溃了会甘休向jobt发送心跳消息,并且JobTracker会将tasktracker从等待的职务池中移除,将该职分转移到其余的地点实践.obTracker会将tasktracker加入到黑名单。
  • task战败:map或reduce运转失利,会向tasktracker抛出13分,职务挂起.

在简约介绍了MapReduce框架后,下边深远学习框架中的三个重差不离念:Mapper和Reducer,正如上文提到了,它们组成了MapReduce作业并担负完毕实际的事情逻辑管理。

               注:
本文最终,还会附着一套自身写的HDFS的公文操作Java
API,使用起来也很有利,API还在不停的完善..

MapReduce职业关系到的陆个对象

  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,那正是技师完毕的行事。
  2. JobTracker:早先化作业,分配作业,与TaskTracker通信,和睦度个作业的执行。
  3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上实施Map或Reduce职责,TaskTracker和JobTracker的两样有个很重大的方面,正是在施行职责时候TaskTracker可以有n八个,JobTracker则只会有三个(JobTracker只好有3个就和hdfs里namenode一样存在单点故障,小编会在前边的mapreduce的相干难题里讲到那个难点的)。
  4. HDFS:保存作业的多少、配置音信等等,最终的结果也是保存在hdfs下面
    <small>jobtracker的单点故障:
    jobtracker和hdfs的namenode同样也设有单点故障,
    单点故障一向是hadoop被人诟病的大难题,
    为何hadoop的做的文件系统和mapreduce计算框架都以高容错的,可是最注重的管制节点的故障机制却这样不好,小编认为注重是namenode和jobtracker在骨子里运维中都以在内部存款和储蓄器操作,而做到内部存储器的容错就比较复杂了,惟有当内部存款和储蓄器数据被持久化后容错才好做,namenode和jobtracker都得以备份本人持久化的文书,但是那个持久化都会有延迟,由此真正出故障,任然不能够全部回复,别的hadoop框架里含有zookeeper框架,zookeeper能够整合jobtracker,用几台机械同时安顿jobtracker,保险一台出故障,有壹台立即能填补上,然而这种办法也迫于苏醒正在跑的mapreduce任务。
    </small>

Mapper是独立的职分,将输入记录转变为中等记录,即对输入的键值对举行拍卖,并出口为一组中间键值对,输出的键值对应用context.write(WritableComparable,
Writable)方法收罗起来,中间记录的键值类型不必与输入记录的键值类型同样,实际上也多次是不一致的。一条输入记录经由Mapper管理后只怕输出为0条要么多条当中记录。比方,即使输入记录不满足专门的学问供给(未有包罗特定的值恐怕隐含了特定的值)的话,可以一直回到,则会输出0条记下,此时Mapper起了过滤器的效果。

 

MapRedece作业的管理流程

MapRedece作业的拍卖流程图解

依照时间顺序蕴涵:

  • 输入分片(input split)
  • map阶段
  • combiner阶段
  • shuffle阶段和
  • reduce阶段。
  1. 输入分片(input split):
    在进行map计算在此以前,mapreduce会根据输入文件总括输入分片(input
    split),每种输入分片(input split)针对贰个map职务
    输入分片(input
    split)存款和储蓄的决不数据笔者,而是四个分片长度和一个记录数据的职位的数组,输入分片(input
    split)往往和hdfs的block(块)关系异常的细致
    <small>设若我们设定hdfs的块的大大小小是6四mb,即使大家输入有三个文件,大小分别是三mb、65mb和1二七mb,那么mapreduce会把三mb文件分为二个输入分片(input
    split),陆5mb则是三个输入分片(input
    split)而1二七mb也是八个输入分片(input split)
    即咱们假如在map总计前做输入分片调解,举个例子合并小文件,那么就会有陆个map任务将实行,而且每一种map试行的数量大小不均,这么些也是mapreduce优化总结的二个关键点。

    </small>
  2. Map阶段:
    技术员编写好的map函数了,因而map函数效用相对好调整,而且貌似map操作都以本地化操作也等于在多少存款和储蓄节点上张开。
  3. Combiner阶段:
    combiner阶段是技士能够采用的,combiner其实也是壹种reduce操作。
    Combiner是多个本地化的reduce操作,它是map运算的持续操作,首即使在map计算出中间文件前做一个简易的统一重复key值的操作。
    <small>譬如我们对文本里的单词频率做总结,map计算时候若是碰到四个hadoop的单词就会记录为一,不过那篇作品里hadoop恐怕会现出n数十次,那么map输出文件冗余就会众多,由此在reduce计算前对一样的key做二个合并操作,那么文件会变小,那样就巩固了宽带的传输作用,终归hadoop总计力宽带能源往往是一个钱打二十三个结的瓶颈也是Infiniti珍重的能源,可是combiner操作是有高危机的,使用它的标准化是combiner的输入不会影响到reduce计算的末尾输入,
    比如:借使计算只是求总的数量,最大值,最小值能够使用combiner,但是做平均值计算使用combiner的话,最终的reduce总计结果就会出错。
    </small>
  4. shuffle阶段:
    将map的出口作为reduce的输入的长河即是shuffle了。
  5. reduce阶段:
    和map函数同样也是程序猿编写的,最后结出是积累在hdfs上的。

随后MapReduce框架将与给定键相关联的装有中等值分组,然后传递给Reducer。用户能够因此Job.setGroupingComparatorClass(Class)方法钦赐Comparator来支配分组。Mapper的出口被排序然后依据Reducer分区,总的分区数与作业运行的Reducer任务数同样,程序员能够通过兑现自定义的Partitioner调整输出的笔录由哪个Reducer管理,私下认可使用的是HashPartitioner。程序猿还足以由此Job.setCombinerClass(Class)内定五个combiner来推行中间输出的地点聚合,那有助于削减Mapper到Reducer的数码传输。Mapper的中间输出经过排序后总是保存为(key-len,
key,value-len,
value)的格式,应用程序能够经过Configuration调整是或不是将中间输出举办压缩,以及采纳何种压缩方式,相关的多少个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable,
Writable,
Context)管理该职责的市场股票总值对,应用程序能够覆盖cleanup(Context)方法落成其余供给的清理专门的学业。


Combiner深刻了然

MapReduce流程

在上述进度中,我们来看至少几个属性瓶颈:

(一)假如我们有10亿个数据,Mapper会生成10亿个键值对在网络间打开传输,但即使大家只是对数据求最大值,那么很显著的Mapper只必要输出它所知晓的最大值就能够。那样做不仅能够缓慢解决网络压力,同样也能够小幅提升程序效能。
  总结:网络带宽严重被占下落程序功效

(贰)借使使用美利坚联邦合众国专利数量聚焦的国家①项来论述数据倾斜这几个概念,那样的多寡远远不是壹致性的大概说平衡布满的,由于大多专利的国度都属于U.S.,那样不但Mapper中的键值对、中间阶段(shuffle)的键值对等等,大繁多的键值对终极集集中于3个10足的Reducer之上,总结任务分配不够均衡,从而大大降低程序的性质。
  总结:单一节点承载过重下跌程序品质

在MapReduce编制程序模型中,在Mapper和Reducer之间有3个卓殊首要的机件,它消除了上述的天性瓶颈难点,它就是Combiner

壹与mapper和reducer不一致的是,combiner没有暗许的贯彻,必要显式的装置在conf中才有功力。
②并不是持有的job都适用combiner,唯有操作满意结合律的才可安装combiner。
combine操作看似于:opt(opt(一, 2, 3), opt(4, 5,
陆))。假如opt为求和、求最大值的话,能够选拔,然则要是是求平均值的话,则不适用。

因为每1个map都可能会发生大批量的本地输出,Combiner的效劳就是对map端的出口先做3回联合,以收缩在map和reduce节点之间的数额传输量,以提升网络IO品质。是MapReduce的一种优化花招。

Combiner总结:
在事实上的Hadoop集群操作中,大家是由多台主机一齐开始展览MapReduce的,
纵然参预规约(Combiner)操作,每1台主机会在reduce从前进行3回对本机数据的清规戒律,
接下来在经过集群开始展览reduce操作,这样就会大大节省reduce的小运,
就此加速MapReduce的管理速度

MapReduce框架为各类由作业的InputFormat生成的InputSplit运行3个map职分,由此总的map职分数量由输入数据大小决定,更可信说是由输入文件总的块数决定。就算可感到较少使用CPU的map职责在节点上设置300个map职务,但各种节点更符合并行运转十-一百个map任务。由于职责的起步需求开销一些日子,所以职分的运作最棒至少要求一分钟,因为只要职责运维的时光很少,整个作业的年华将大多数消耗在职务的树立方面。

 

Partitioner理解

Map阶段

<small>step一.三正是3个分区操作。通过前边的读书大家知晓Mapper最后管理的键值对<key,
value>,是亟需送到Reducer去联合的,合并的时候,有同等key的键/值对会送到同多个Reducer节点中展开归并。哪个key到哪些Reducer的分红进程,是由Partitioner规定的。在有个别集群应用中,比如布满式缓存集群中,缓存的数额差不离都以靠哈希函数来张开数量的均匀布满的,在Hadoop中也不例外。
</small>

MapReduce的使用者日常会内定Reduce任务和Reduce职务输出文件的多少(Tucson)。
用户在当中key上利用分区函数来对数码举办分区,之后在输入到后续职务试行进度。四个暗中认可的分区函数式使用hash方法(比方大规模的:hash(key)
mod Odyssey)进行分区。hash方法能够发出十分平衡的分区。
暗许的分区函数HashPartitioner

Partitioner小结:分区Partitioner重要意义在于以下两点

  • 依附业务需求,爆发多个出口文件
  • 多少个reduce职务并发运行,进步全部job的运维功能

当地提交MapReduce作业至集群,大额学习day_五。Reducer将持有一样键的壹组中间值下落为1组更加小数目的值,举个例子合并单词的数目等。3个功课运营的Reducer数量能够由此Job.setNumReduceTasks(int)大概mapred-site.xml中的参数mapreduce.job.reduces设置,但是更推荐前者,因为可以由程序员决定运行多少个reducer,而后人越来越多的是提供了一种默许值。程序员使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key,
(list of values)>调用reduce(WritableComparable,
Iterable<Writable>,
Context)方法,同Mapper同样,程序猿也足以覆盖cleanup(Context)方法钦赐须要的清管事人业。

 

Shuffle理解

Reduce阶段

Shuffle是什么
本着八个map职责的输出遵照分化的分区(Partition)通过互联网复制到分歧的reduce职务节点上,那个进度就称作为Shuffle。

Hadoop的shuffle进度就算从map端输出到reduce端输入之间的进度,那1段应该是Hadoop中最基本的有个别,因为涉及到Hadoop中最华贵的网络能源,所以shuffle进度中会有过多方可调和的参数,也有不少政策能够商讨

map进度的出口是写入本地球磁性盘而不是HDFS,可是一发轫数据并不是一贯写入磁盘而是缓冲在内部存款和储蓄器中,缓存的裨益正是减弱磁盘I/O的开销,提升合并和排序的进度。又因为暗许的内部存款和储蓄器缓冲大小是100M(当然那个是足以配备的),所以在编制map函数的时候要尽量裁减内部存款和储蓄器的施用,为shuffle进程预留越多的内部存款和储蓄器,因为该进程是最耗费时间的进程

再来详细梳理下全体工艺流程

Map端

  1. 美高梅手机版4858 ,在map端首先是InputSplit,在InputSplit中包蕴DataNode中的数据,每1个InputSplit都会分配3个Mapper职务,Mapper职分完成后发生<K二,V贰>的出口,那几个输出先存放在缓存中,各样map有八个环形内部存款和储蓄器缓冲区,用于存储职务的出口。暗中认可大小100MB(io.sort.mb属性),一旦达到阀值0.捌(io.sort.spil
    l.percent),三个后台线程就把内容写到(spill)Linux本地球磁性盘中的钦赐目录(mapred.local.dir)下的新建的一个溢出写文件。
  2. 写磁盘前,要进行partition、sort和combine等操作。通过分区,将分裂品种的多寡分开处理,之后对分裂分区的数据开始展览排序,假使有Combiner,还要对排序后的数量进行combine。等末梢记录写完,将全体溢出文件合并为1个分区且排序的文本。
  3. 最终将磁盘中的数据送到Reduce中,图中Map输出有多个分区,有1个分区数据被送到图示的Reduce职务中,剩下的五个分区被送到其余Reducer职务中。而图示的Reducer职责的任何的八个输入则出自其余节点的Map输出。

Reduce端

  1. Copy阶段:Reducer通过Http格局得到输出文件的分区。
      reduce端大概从n个map的结果中获取数据,而那几个map的施行进程不尽同样,当在那之中贰个map运转甘休时,reduce就会从JobTracker中获得该新闻。map运行截至后TaskTracker会获得音信,进而将音讯汇报给JobTracker,reduce定期从JobTracker获取该新闻,reduce端私下认可有七个数据复制线程从map端复制数据
  2. Merge阶段:若是产生三个磁盘文件会开始展览合并。
      从map端复制来的数目首先写到reduce端的缓存中,同样缓存占用达到一定阈值后会将数据写到磁盘中,同样会进展partition、combine、排序等经过。假设产生了多少个磁盘文件还会展开合并,最后一遍联合的结果作为reduce的输入而不是写入到磁盘中
    三.Reducer的参数:最后将合并后的结果作为输入传入Reduce职务中

总计:当Reducer的输入文件规定后,整个Shuffle操作才最后截至。之后正是Reducer的进行了,最终Reducer会把结果存到HDFS上。

Reducer的管理进程首要归纳八个阶段:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取具有Mapper输出的连带分区。在Sort阶段,框架遵照键分组Reducer的输入(不一致的mapper恐怕输出相同的键)。Shuffle和sort是还要拓展的,获取Mapper的出口后然后统1它们。在reduce阶段,调用reduce(WritableComparable,
Iterable<Writable>管理<key, (list of
values)>对。Reducer的输出平常通过Context.write(WritableComparable,Writable)写入文件系统,比方HDFS,当然也足以因此运用DBOutputFormat将出口写入数据库。Reducer的出口是未经排序的。

效果:

Hadoop的压缩

Shuffle进程中来看,map端在写磁盘的时候使用压缩的点子将map的出口结果开始展览削减是贰个回落互联网开辟很得力的诀窍

在Java中设置输出压缩

reduce端输出压缩使用了Codec中的Gzip算法,也足以行使bzip2算法

若是不必要Reducer,能够利用Job.setNumReduceTasks(int)将Reducer的多寡设置为0(如若不使用该方法设置Reducer的数据,由于mapreduce.job.reduces默以为一,会运行2个Reducer),在那种气象下,Mapper的输出将一直写入FileOutputFormat.setOutputPath(Job,Path)钦赐的不二等秘书技中,并且MapReduce框架不会对Mapper的输出举行排序。

 

MapReduce排序分组

排序: 在Hadoop默许的排序算法中,只会针对key值进行排序

自定义排序:

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

自定义类型MyNewKey完成了WritableComparable的接口,
该接口中有1个compareTo()措施,当对key举行相比时会调用该措施,而大家将其改为了大家友好定义的可比规则,从而完毕大家想要的职能

分组:在Hadoop中的暗中认可分组规则中,也是基于Key实行的,会将同1key的value放到一个集结中去

自定义分组:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

自定义了一个分组比较器MyGroupingComparator,该类达成了RawComparator接口,而RawComparator接口又落成了Comparator接口,那五个接口的概念:

设若在拓展reduce在此之前想利用与分组中间键时分裂的可比规则,能够透过Job.setSortComparatorClass(Class)内定区别的Comparator。也正是Job.setGroupingComparatorClass(Class)调控了如何对中等输出分组,而Job.setSortComparatorClass(Class)调节了在将数据传入reduce在此以前开始展览的第二回分组。

1、  HelloWorld文本

Hadoop数据类型

Hadoop
MapReduce操作的是键值对,但那些键值对并不是Integer、String等专门的工作的Java类型。为了让键值对能够在集群上移动,Hadoop提供了一些兑现了WritableComparable接口的大旨数据类型,以便用这么些类型定义的数据能够被系列化举办互联网传输、文件存款和储蓄与大小相比。

  • 值:仅会被归纳的传递,必须兑现Writable或WritableComparable接口。
  • 键:在Reduce阶段排序时索要开始展览相比较,故只能兑现WritableComparable接口。
描述
BooleanWritable 标准布尔变量的封装
ByteWritable 单字节数的封装
DoubleWritable 双字节数的封装
FloatWritable 浮点数的封装
IntWritable 整数的封装
LongWritable Long的封装
NullWritable 无键值时的占位符
Text 使用UTF-8格式的文本封装

分化于Mapper的数据由输入文件的大小分明,Reducer的数量能够由程序猿分明设置,那么设置有些Reducer能够高达较好地作用啊?Reducer的数额限制为:(0.玖伍
~1.75 ) * 节点数量 *
各类节点上最大的容器数。参数yarn.scheduler.minimum-allocation-mb设置了种种容器可伸手的小不点儿内部存款和储蓄器,那么最大容器数可依赖总的内部存款和储蓄器除以该参数总计得出。当使用0.75时,全部的Reducer会被立马加载,并当Mapper实现时起首传输Mapper的输出。使用一.7伍时,非常快的节点将做到它们第三轮的天职,然后加载第1波职责,那样对负载平衡具有越来越好的效应。扩张Reducer的数目就算扩展了框架花费,但扩展了负载平衡和下落了倒闭的本金。上边的比例因子比总的Reducer数量稍微一丢丢,感到预测实施的职务和波折的职分保留少些的Reducer槽,也正是实在的Reducer数量为地点公式得出的数据增加保留的Reducer数量。

 

CentOS设置和布置Hadoop二.二.0 
http://www.linuxidc.com/Linux/2014-01/94685.htm

a b c d
e f a c
a c b f

Ubuntu
13.04上搭建Hadoop环境
http://www.linuxidc.com/Linux/2013-06/86106.htm

 

Ubuntu 12.10 +Hadoop 一.2.1本子集群配置
http://www.linuxidc.com/Linux/2013-09/90600.htm

 

Ubuntu上搭建Hadoop情形(单机格局+伪布满格局)
http://www.linuxidc.com/Linux/2013-01/77681.htm

美高梅手机版4858 2

Ubuntu下Hadoop情况的布局
http://www.linuxidc.com/Linux/2012-11/74539.htm

 

单机版搭建Hadoop景况图像和文字化教育程详解
http://www.linuxidc.com/Linux/2012-02/53927.htm

 

搭建Hadoop意况(在Winodws环境下用编造机虚拟三个Ubuntu系统开始展览搭建)
http://www.linuxidc.com/Linux/2011-12/48894.htm

 

愈多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13


本文永远更新链接地址:http://www.linuxidc.com/Linux/2014-11/109286.htm

 

美高梅手机版4858 3

 

 

2、上传至HDFS的intput目录下

 

 

美高梅手机版4858 4

 

 

 


 

 

三、客户端提交Job,推行MapReduce

 

 

A、map的出口结果

 

 

美高梅手机版4858 5

 

 

 


B、reduce的输入结果(上一步map的输出)

 

 

美高梅手机版4858 6

 

 

 


 

 

C、reduce的计量结果(代码完毕细节先权且忽略,文章中会讲到)

 

 

 美高梅手机版4858 7

 

 

 


 

 

D、最终,待职务总体实现,交由HDFS进行理并了结果的文书写入

 

 

美高梅手机版4858 8

 

 

美高梅手机版4858 9

 

 


 

 

美高梅手机版4858 10

 

 

 

MapReduce中的分区暗中认可是哈希分区,可是大家也足以团结写demo来重写Partitioner类的getPartiton方法,如下:

 

 

 

美高梅手机版4858 11

 

 

分区规则定后,大家需求钦赐客户端Job的map
task的分区类并设置reducer的个数,如下

 

 

美高梅手机版4858 12

 

 

 

最终,提交Job跑二回MapReduce的效应如下:

 

 

美高梅手机版4858 13

 

 

 

咱们分别下载文件*.*-00000和*.*-00001至本土,并进行结果证实,效果如下:

 

 

分区0对应的reduce结果文件如下:

 

 

美高梅手机版4858 14

 

 

 

分区壹对应的reduce结果文件如下:

 

 

 

美高梅手机版4858 15

 

 

 

时至明日,作者用成效图的秘诀,给我们演示了瞬间MapReduce的前前后后到底是怎样进展map和reduce的,个中爆发了如何,最后又生出了怎么,很直观,可是,注意,效果图只好补助你掌握MapReduce最后能干什么,具体MapReduce内部的做事规律如何,作者上边会三番五次讲到,而且,博文的尾声,作者会附上本篇博文演示要用到的全demo。

 

 

 


 

 

 

 

一、什么是MapReduce

 

 

大家要数体育场合里面包车型大巴有着书,你数1号书架,作者数二号书架,他数三号书架…那就叫Map

明天大家把全体人总括的书籍数加在(归并)一齐,那就叫“Reduce”

合起来正是MapReduce【布满式大数量管理功效】!!!

详尽的请查看:MapReduce是一种编制程序模型,用于大规模数据集(大于一TB)的相互运算

 

 

 

二、MapReduce执行进度

 

 

 

 

美高梅手机版4858 16

 

补充:

 

1、JobTracker  对应于
NameNode,TaskTracker 对应于 DataNode。

贰、JobTracker是三个master服务,软件运转以往JobTracker接收Job,担当调整Job的每2个子职责task运营于TaskTracker上,并监督它们,要是开掘有波折的task就再一次运维它。一般景况相应把JobTracker陈设在单身的机器上。

 

 

Client        
:客户端提交3个Job给JobTracker

JobTracker
:调治Job的每一个mapper和reducer,并运营在TaskTracker上,并监督它们

Mapper    
 :得到符合自个儿的InputSplit(输入内容),实行map后,输出MapOutPut(map的输出内容)

Reducer     :
得到MapOutPut作为和煦的ReduceInput,实行reduce计算,最后输出OutPut结果

 

 

 

 

 

叁、MapReduce专门的学问规律

 

 

 

 

 美高梅手机版4858 17

 

 

 

注:reducer发轫reduce以前,一定要等待mapper达成map后才具初阶

 

首先步:Job读取存在于
HDFS文件系统上的文书作为Mapper的输入内容(key,value)并透过特殊的处理(map编制程序落成)交由Mapper
Task

第叁步:Mapper
Task对map后的数据开始展览shuffle(洗牌,重组),蕴含分区、排序或合并(combine)

其三步:Mapper把shuffle后的输出结果(key,values)提须求相应的Reducer
Task,由Reducer取走

第六步:Reducer得到上一步Mapper的出口结果,进行reduce(客户端编制程序落成)

第6步:Reducer落成reduce计算后,将结果写入HDFS文件系统,差异的reducer对应不一致的结果文件

 

 

强行的图(自身画的)如下

 

 

美高梅手机版4858 18

 

 

 

 

4、Shuffle进度简要介绍

 

 

 

 

 美高梅手机版4858 19

 

 

 

     
上海体育场面必要专注的就是shuffle进程中的分区,图中很直观的辨证了,mapper最后会把key-value键值对数码(输入数据)从缓存中拿出(缓存数据溢出)并基于分区规则举办磁盘文件的写入(注意:那里的磁盘文件不是HDFS文件系统上的文件,且写入文件的剧情早已是排序过的了),同时会对两样分区的key-value数据文件举办一个集结,最后分给分化的Reduce职务进展reduce处理,要是有八个Mapper,则Reducer从Map端获取的剧情要求再行张开联合(把属于不一样的Mapper但属于同二个分区的输出的结果开始展览联合,并在reduce端也举办shuffle进度,写入磁盘文件,最终举行reduce总计,reduce总计的结果最终以文件的款型出口到HDFS文件系统中)

 

 

 

 

 

五、Map端的Shuffle过程

 

 

 

 

 

 美高梅手机版4858 20

    

 须要领悟的是:

 

1、缓存的轻重是足以安装的(mapreduce.task.io.sort.mb,暗中认可100M)

二、溢出比(缓存使用率有叁个软阈值
== mapreduce.map.sort.spill.percent,暗中同意0.80),当超过阈值时,溢出作为会在后台起3个线程执行从而使Map职责不会因为缓存的溢出而被堵塞。但纵然达到硬限制,Map职务会被卡住,直到溢骑行为终止

 

 

 

陆、如何编写map和reduce函数

 

 

 

 

        到这一步,如若您对MapReduce的做事规律已经调整了,那么接下去,编写客户端程序,利用MapReduce的测算作用,落成公文文件中单词的出现次数的总结,将会是轻便的。

       

     
 首先,大家要求二个mapper(职分),其次是reducer(职务),有了多少个职分后,大家需求创制3个Job(作业),将mapper和reducer关联起来,并交付至Hadoop集群,由集群中的JobTracker进行mapper和reducer职分的调治,并最后成功数据的图谋工作。

     
 因而,简单发掘,光有mapper和reducer职分,是无力回天进展MapReduce(布满式大数目总括)的,那里大家须求写多个类,贰个是兑现Map的类,四个是贯彻Reduce的类,还有叁个正是付诸作业的主类(Client
Main Class)

      

     
 由于博主的Hadoop版本是3.壹.0的,因而,为了兼顾三.X之下的Hadoop集群意况能够在下边提供的demo中能够跑起来,特将本文中关系到的Hadoop正视换来了二.7.X的版本,如下:

 

 

 

小心:不要选用老式的hadoop-core(1.二.一)注重,否则会现出各个意料之外的的标题

       

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
</dependency>
<!-- common 依赖 tools.jar包 -->
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.8</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

 

 

 

(1)编写Mapper

 

 

美高梅手机版4858 21

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper 原型 : Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * 
 * KEYIN    : 默认情况下,是mr框架所读到的一行文本内容的起始偏移量,Long,
 *            但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
 * VALUEIN  : 默认情况下,是mr框架所读到的一行文本的内容(Java String 对应 Hadoop中的Text)
 * KEYOUT   : 用户自定义逻辑处理完成之后输出数据中的key,在此处是单词(String),同上用Text
 * VALUEOUT : 用户自定义逻辑处理完成之后输出数据中的value,在这里是单词的次数:Integer,对应Hadoop中的IntWritable
 * 
 * mapper的输入输出参数的类型必须和reducer的一致,且mapper的输出是reducer的输入
 * 
 * @blob   http://blog.csdn.net/appleyk
 * @date   2018年7月3日15:41:13
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{


    /**
     * map实现数据拆分的操作
     * 本操作主要进行Map的数据处理
     * 在Mapper父类里面接受的内容如下:
     * LongWritable:文本内容的起始偏移量
     * Text:每行的文本数据(行内容)
     * Text:每个单词分解后的统计结果
     * IntWritable:输出记录的结果
     */
     @Override
     protected void map(LongWritable key, Text value,Context context)
             throws IOException, InterruptedException {

         System.out.println("文本内容的起始偏移量:"+key);
         String line = value.toString()    ;//取出每行的数据
         String[] result  = line.split(" ");//按照空格进行数据拆分
         //循环单词
         for (int i = 0 ;i <result.length ; i++){

           //针对每一个单词,构造一个key-value
            System.out.println("key-value : <"+new Text(result[i])+","+new IntWritable(1)+">");


           /**
            * 将每个单词的key-value写入到输入输出上下文对象中
            * 并传递给mapper进行shuffle过程,待所有mapper task完成后交由reducer进行对号取走
            */
             context.write(new Text(result[i]), new IntWritable(1));
         }

         /**        map端的shuffle过程(大致简单的描述一下)
          *                       |
          *                       |  放缓存(默认100M,溢出比是0.8,即80M满进行磁盘写入并清空,
          *                       |  剩余20M继续写入缓存,二者结合完美实现边写缓存边溢写(写磁盘))
          *                       V
          *               <b,1>,<c,1>,<a,1>,<a,1>
          *                         
          *                       |
          *                       | 缓存写满了,开始shuffle(洗牌、重组)  == 包括分区,排序,以及可进行自定的合并(combine)
          *                       V     
          * 写入磁盘文件(not hdfs)并进行文件归并,成一个个的大文件 <a,<1,1>>,<b,1>,<c,1>   
          * 
          *                         |
          *                         |
          *                         V
          *   每一个大文件都有各自的分区,有几个分区就对应几个reducer,随后文件被各自的reducer领走
          *   
          *           !!! 这就是所谓的mapper的输入即是reducer的输出 !!!
          */
     }
}

 

 

 

 

 

 

(2)编写Reducer

 

 

 

美高梅手机版4858 22

 

  

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 进行合并后数据的最终统计
 * 本次要使用的类型信息如下:
 * Text:Map输出的文本内容
 * IntWritable:Map处理的个数
 * Text:Reduce输出文本
 * IntWritable:Reduce的输出个数
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {

    //mapper的输出是reducer的输入,因此,这里打一下reducer的接收内容            
    List<Integer> list = new ArrayList<>();

    int sum = 0;//记录每个单词(key)出现的次数
        for (IntWritable value : values) {
            //从values集合里面取出key的单个频率数量(其实就是1)进行叠加
            int num = value.get();
            sum += num;
            list.add(num);

        }

       /**
    * mapper会把一堆key-value进行shuffle操作,其中涉及分区、排序以及合并(combine)
    * 注:上述shuffle中的的合并(combine)区别于map最终的的合(归)并(merge)
    * 比如有三个键值对:<a,1>,<b,1>,<a,1>
    * combine的结果:<a,2>,<b,1>      == 被reducer取走,数据小
    * merage 的结果;<a,<1,1>>,<b,1>  == 被reducer取走,数据较大(相比较上述combine来说)
    * 注:默认combiner是需要用户自定义进行开启的,所以,最终mapper的输出其实是归并(merage)后的的结果
    * 
    * 所以,下面的打印其实就是想看一下mapper在shuffle这个过程后的merage结果(一堆key-values)
    */
    System.out.println("key-values :<"+key+","+list.toString().replace("[", "<")
                .replace("]", ">")+">");

    //打印一下reduce的结果
    System.out.println("reduce计算结果 == key-value :<"+key+","+new IntWritable(sum)+">");
    //最后写入到输入输出上下文对象里面,传递给reducer进行shuffle,待所有reducer task完成后交由HDFS进行文件写入
    context.write(key, new IntWritable(sum));


    }
}

 

 

(3)编写Partition分区类(要是急需修改Map暗中认可的哈希分区规则的话)

 

 

美高梅手机版4858 23

 

 

 

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.mapreduce.Partitioner;
 4 
 5 public class PartitionTest extends Partitioner<Text, IntWritable> {
 6 
 7     /**
 8      * key          : map的输出key 
 9      * value        : map的输出value 
10      * numReduceTask: reduce的task数量
11      * 返回值,指定reduce,从0开始
12      * 比如,分区0交由reducer0拿走
13      */
14     @Override
15     public int getPartition(Text key, IntWritable value, int numReduceTask) {
16         
17         if (key.toString().equals("a")) {
18             //如果key的值等于a,则将其分区指定为0,对应第一个reducer拿走进行reduce
19             return 0;
20         } else {
21             return 1;
22         }
23     }
24 }

 

 

 

 

(4)编写Job类(Main Class)

 

 

美高梅手机版4858 24

 

 

 

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 7 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 8 
 9 import com.appleyk.hdfs.mapper.WordCountMapper;
10 import com.appleyk.hdfs.part.PartitionTest;
11 import com.appleyk.hdfs.reducer.WordCountReducer;
12 
13 /**
14  * Client端,提交作业
15  * @author yukun24@126.com
16  * @blob   http://blog.csdn.net/appleyk
17  * @date   2018年7月3日-上午9:51:49
18  */
19 public class WordCountApp {
20 
21     public static void main(String[] args) throws Exception{
22         
23         Configuration conf = new Configuration();
24         //配置uri
25         conf.set("fs.defaultFS", "hdfs://192.168.142.138:9000");
26     
27         //创建一个作业,作业名"wordCount",作用在Hadoop集群上(remote)
28         Job job = Job.getInstance(conf, "wordCount");
29         
30         /**
31          * 设置jar包的主类(如果样例demo打成Jar包扔在Linux下跑任务,
32          * 需要指定jar包的Main Class,也就是指定jar包运行的主入口main函数)
33          */
34         job.setJarByClass(WordCountApp.class);
35         
36         //设置Mapper 任务的类(自己写demo实现map)
37         job.setMapperClass(WordCountMapper.class);
38         //设置Reducer任务的类(自己写demo实现reduce)
39         job.setReducerClass(WordCountReducer.class);
40 
41         //指定mapper的分区类
42         //job.setPartitionerClass(PartitionTest.class);
43         
44         //设置reducer(reduce task)的数量(从0开始)
45         //job.setNumReduceTasks(2);
46         
47         
48         //设置映射输出数据的键(key)  类(型)
49         job.setMapOutputKeyClass(Text.class);
50         //设置映射输出数据的值(value)类(型)
51         job.setMapOutputValueClass(IntWritable.class);
52 
53         //设置作业(Job)输出数据的键(key)  类(型)   == 最后要写入到输出文件里面
54         job.setOutputKeyClass(Text.class);
55         //设置作业(Job)输出数据的值(value)类(型)   == 最后要写入到输出文件里面
56         job.setOutputValueClass(IntWritable.class);
57 
58         //设置输入的Path列表(可以是单个文件也可以是多个文件(目录表示即可))
59         FileInputFormat.setInputPaths (job, new Path("hdfs://192.168.142.138:9000/input" ));
60         //设置输出的目录Path(确认输出Path不存在,如存在,请先进行目录删除)
61         FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.142.138:9000/output"));
62 
63         //将作业提交到集群并等待它完成。
64         boolean bb =job.waitForCompletion(true);
65         
66         if (!bb) {
67             System.out.println("Job作业执行失败!");
68         } else {
69             System.out.println("Job作业执行成功!");
70         }
71     }
72 
73 }

 

 

 

 

 

(伍)运营main方法,提交作业

 

 

出现极度:

 

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

 

 

 

出于作业是在本土(Windows)跑的,因而,那里遇到五个地面IO读写权限的标题,具体代码能够见NativeIO这一个Java类的源码,在那里:

 

 

美高梅手机版4858 25

 

 

 

 

将源代码全体拷贝出来,在品种下新建1个同包名同类名称的公文如下:

 

 

美高梅手机版4858 26

 

 

 

 

 

开垦后,修改代码如下(去掉验证):

 

 

美高梅手机版4858 27

 

 

 

 

(六)再度运维main方法,提交作业

 

 

重现格外:

 

 

org.apache.hadoop.security.AccessControlException:

Permission denied: user=Administrator, access=WRITE,inode="/":root:supergroup:drwxr-xr-x

 

 

 

     
 意思是再说,我日前应用的user是Windows下的Administrator,可是在Hadoop的HDFS文件系统中,未有那个用户,因而,我想用Administrator那些用户向HDFS文件系统Write的时候出现权力不足的异常,因为HDFS文件系统根目录下的文本对别的用户来讲,不抱有w和r的权柄

     
 原本把mapreduce程序打包放在集群中跑是无须操心用户的hdfs权限难点的,然而,笔者一同先说了,作者不想那么麻烦,无非正是Hadoop开启了HDFS文件系统的权柄验证效率,笔者给它关了(开放)不就行了,由此,笔者决定直接在hdfs-site.xml配置文件里进行权力验证的修改,加多内容如下:

 

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

 

 

 

美高梅手机版4858 28

 

 

保存后,重启Hadoop集群

 

先stop,再start

 

 

美高梅手机版4858 29

 

 

 

 

(柒)再度运转main方法,提交作业

 

 

16:55:44.385 [main] INFO org.apache.hadoop.mapreduce.Job -  map 100% reduce 100%
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.387 [main] INFO org.apache.hadoop.mapreduce.Job - Job job_local539916280_0001 completed successfully
16:55:44.388 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:758)
16:55:44.407 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 35
 File System Counters
  FILE: Number of bytes read=560
  FILE: Number of bytes written=573902
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=46
  HDFS: Number of bytes written=24
  HDFS: Number of read operations=13
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=3
  Map output records=12
  Map output bytes=72
  Map output materialized bytes=102
  Input split bytes=112
  Combine input records=0
  Combine output records=0
  Reduce input groups=6
  Reduce shuffle bytes=102
  Reduce input records=12
  Reduce output records=6
  Spilled Records=24
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=3
  Total committed heap usage (bytes)=605028352
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=23
 File Output Format Counters 
  Bytes Written=24
16:55:44.407 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
Job作业执行成功!

 

 

 

ok,至此,本地实施mapreduce作业已经完成,接下去就是查看我们要的结果了

 

 

 

 

(捌)利用Java HDFS
API,张开/output/part-r-00000文件内容,输出到调控台

 

 

 

美高梅手机版4858 30

 

 

 

a b c d
e f a c
a c b f

 

 

 

美高梅手机版4858 31

 

 

 

 

 

 

七、GitHub项目地址

 

 

 

Java HDFS API ,达成文件的贮存和访问
并顺便MapReduce作业,本地提交作业至集群落成Word Count的计量

 

 

美高梅手机版4858 32

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 美高梅手机版4858 版权所有