值的<key, value>对传递给1 个reducer,在reduce 阶段,关系R 和关系S 中,所有tr[A]和ts[B] 相等的R 和S 的元组被包含在同一个reducer 的value 参数中,通过对该value 列表中所有元 组进行区分,找到等值连接结果所需的属性组,然后组合成所需的等值结果元组,再输出, 从而实现等值连接运算。下面将详细介绍基于MapReduce 的等值连接算法。通常情况下, 等值连接是维表和事实表之间的操作,这里假设关系R 为事实表,关系S 为维表。可以发 现,在reducer 被传递的参数中,一般只包含1 条维表的元组信息。找到这条元组后,与reducer 中的其他元组进行组合,即可完成关系S 与关系R 的等值连接运算。基于MapReduce 的等 值连接运算如下。 算法3 等值连接运算 输入:key 为元组所在文件的偏移地址,value 表示元组。 输出:key 为null,value 为实现等值连接的元组。 mapper(key, values) 1) 对value 进行处理,识别出所有的属性。 2) if values∈S then 3) key=values.tr[B]; 4) Context.write(ts[B],values); 5) else 6) key=values.tr[A]; 7) Context.write(tr[A],values); 8) end if reducer(key,list) 1) while(list.hasNext()) do 2) tmpTuple=list.next(); 3) if tmpTuple 属于维表S Then 4) S_Tuple=tmpTuple; 5) break 6) end if 7) end do 8) while(list.hasNext())do 9) Tuple=list.next(); 10) 将Tuple 与S_Tuple 的属性进行从新组合,生成value 11) Context.write(null, value)//将value 进行输出 12) end do 例1 实现如下功能,从用户浏览页面关系page_view 和用户关系user 中,将访问某网 页的用户年龄信息插入pv_users 表中,所对应的SQL 语句如下:INSERT INTO pv_users select pv.pageid, u.age FROM page_view pv, user u WHERE pv.userid=u.userid。 要实现的效果如图1 所示。 age gender 111 25 female 222 32 male page_id age 1 25 2 25 1 32 X = user page_view pv_users 1 9:08:01 2 9:08:13 1 9:08:14 pageid time 111 111 222 user_id user_id 图1 等值连接实例 Fig. 1 Instance of equijoin 对于该任务,根据上述等值连接算法,其MapReduce 的处理过程如图2 所示。 key value 111 111 222 1 9:08:01 2 111 9:08:13 1 222 9:08:14 age gender 111 female 222 32 male 25 user pv_users key value 111 <2,25> 222 < 2, 32> map key value 111 < 1,1> 111 < 1,2> 111 <2,25> key value 222 < 1, 1> 222 < 2, 32> shuffle sort age 1 25 2 25 age 1 32 reduce 111 page_id user_id time <1,1> <1,2> <1,1> page_id user_id page_id page_view 图2 一个等值连接实例的MapReduce 处理过程 Fig. 2 MapReduce process of an instance of equijoin 当从page_view 关系表中读取1 个元组时,map 将其转换为以user_id 为key,以page_id 和数字1(page_view 的代号)的组合为value 的<key,value>对;当从user 关系中读取1 个 元组时,map 将其转化为以user_id 为key,以数字2 和age 的组合为value 的<key,value>对, 其中的数字2 代表user 关系表;经过MapReduce 中的Shuffle /Sort 阶段后,相同key 的 <key,value>对被传送给同一个Reduce,可以发现,一个key 对应的value 列表中,只存在1 个值为<2, *>的value,这个value 包含了key 所需要的age 信息,所有<1, *>的value 包含 page_id 信息,将它们提取出来就可以得到最后的结果。 上文论述了关系表S 和R 的等值连接问题,下面将简要说明如何使用MapReduce 并行 计算模型实现多个关系的等值连接问题。假设有A、B、C3 个关系,有select a.key b.key c.key from A a,B b,C c where a.key=b.key and a.key=c.key 这样的任务,可以先计算出关系A 与关系 B 的连接结果,再将A 和B 的连接结果与关系C 进行连接,如图3 所示。 图3 基于MapReduce 的多关系等值连接实例 Fig. 3 MapReduce process of an instance of multiple relations equijoin 2 基于MapReduce 的关系型数据聚集运算 基于MapReduce 的关系型数据聚集运算主要是Group by 运算,在此基础上,有Count、 Sum、Average 等运算。 2.1 分组运算 分组运算主要是指将某关系的1 个或若干个属性值进行分组,值相同的为1 组,对分组 的元组进行信息的统计工作。 对查询结果分组的目的是为了找到聚集函数的作用对象。如果未对查询结果分组,聚集 函数将作用于整个查询数据集。分组后,聚集函数将作用于每个分组,即每个分组都有1 个函数值。 在MapReduce 运行机制中,如何实现SQL 中group by 相同的作用呢?首先从关系中读 取1 个元组,将其打包传递给mapper 函数,在mapper 函数中,通过提取分组的条件,即按 照那几个属性值进行分组,将这几个属性值进行组合作为key,value 与输入的value 相同, 由reduce 对相同key 的元组进行具体聚集运算。因为分组的过程主要在mapper 阶段完成, 这里给出相应的mapper 算法。 算法4 基于MapReduce 的分组运算 输入:key 为元组所在文件的偏移地址,value 表示元组。 输出:<key, value>,其中key 为任何值,value 为满足条件的元组。 函数mapper(key,values) 1) 对value 进行处理,识别出所有的属性。 2) 提取要分组的属性值,组合成新key,value 与输入不变。 3) Context.write(key, value);//将符合条件的元组存储到本地中间文件中。 2.2 计数运算 1) 计数(count)运算就是对关系表中符合条件的元组个数进行统计,如何使用 MapReduce 并行处理框架来实现它呢?首先由文件读取器读取1 条或者若干条元组,读取 操作完成之后,调用mapper 函数;然后统计出本次读取符合条件的元组的个数,将统计结 果传递给reducer 函数;最后由reducer 对全局的结果进行汇总。这里的技巧是使用了 combiner。首先由combiner 对本地的结果进行统计,可以减少网络数据的传输,combiner 算法的实现与reduce 算法相同,即由combiner 首先在本地mapper 算法输出<key, value>的 个数有统计的过程,当combiner 完成之后,将本地的求和的结果发送给reducer,最后由 reducer 计算出总个数。基于MapReduce 的统计元组个数算法如下。 算法5 基于MapReduce 的count 运算 输入:key 为元组所在文件的偏移地址,value 表示元组。 输出:<key, value>,其中key 为元组属性组,value 元组的个数。 mapper(key, value) 1) 对value 进行处理,识别出所有的属性。 2) 提取要分组的属性值,组合成新key。 3) Context.write(key,1)。 reducer(key, list) 1) Long count=0;//初始化计数器 2) while(list. hasNext())do 3) count ++ 4) Context.write(key,count)。 用户在使用上述算法进行运算提交MapReduce job 时,将job 的combiner 设置为reducer 后,可以有效地减少键值对在网络上的传输,从而提高整体的执行效率。 2.3 求和运算 对于基于MapReduce 的求总和(sum)运算,其执行的基本原理与count 相同,上文提高 运行效率的技巧在这里依然适用。基本原理仍然是首先由combiner 在本地进行求和,从而 减少键值对在网络上的传输,最后由reducer 将本地求和的结果再求和。基于MapReduce 的 求总和运算如下。 算法6 基于MapReduce 的求总和运算 输入:key 为元组所在文件的偏移地址,value 表示元组。 输出:<key, value>,其中key 为元组属性组,value 为key 对应属性之和。 mapper(key, value) 1) 对value 进行处理,识别出所有的属性。 2) 提取要分组的属性值,组合成新key,value 为待求和的属性Att。 3) Context.write(key, Att)。 reducer(key,list) 1) Long sum[ ]={0};//初始化属性和数组 2) while(list. hasNext())do 3) sum+=list.next().value; 4) Context.write(key,sum)。 对于基于MapReduce 的求均值(average)算法,将上文的2 个计数和求和算法合并即可。 3 实验 3.1 实验环境 集群系统由9 台PC 机组成,其中每台PC 机采用Intel 4 核2.0GHz 处理器、4GBRAM、 160GB7200 RPM IDE 硬盘,网络通信速率为1000Mbit/s。操作系统采用Redhat Linux,系统 内核版本2.6.18。Hadoop 版本为0.20.2。 实验采用的TPC-DS 版本为1.0.0d,用TPC-DS 提供的数据生成工具dsdgen 生成10、 30、50GB 的数据,每个数据集都有24 个数据表。 3.2 算法性能分析 统计关系表元组个数:任务为统计store_sales 的元组个数,通过增加计算节点,查看任 务完成的执行时间与增加节点的关系。其中store_sales 关系表的每个元组由137 个字符组成, 与实验任务相对应SQL 语句为select count(*) from store_sales。根据count 算法,将该任务 转化为MapReduce 程序以后,获得如图4 的实验结果。 图4 节点数与任务处理时间的关系 Fig. 4 Relationship between node numbers and running time 从图4 中容易发现,随着节点数的增加,任务完成的时间逐渐减少。随着节点的增加, 计算能力不断扩展,但是任务完成时间减少的幅度越来越小。造成这种现象的原因主要是 hadoop 的计算与数据的分布有关,即数据分布密集的节点,其计算能力才能被充分地利用。 当1 个数据节点上存放参与计算的数据比较少时,参与数据完成计算以后,通常情况下,该 节点的计算能力被闲置下来,这就造成了计算资源的浪费,所以应该极力避免这种情况。运 行任务之前,最好先执行start-balancer.sh。上文描述的这种情况,在后面的实验中也有不同 程度的反应。 1) 等值连接实验1。假设有如下查询任务,与其相对应的SQL 查询语句如下: SELECT i_item_id, avg(ss_quantity) , avg(ss_list_price), avg(ss_coupon_amt) , avg(ss_sales_price) FROM store_sales ss, customer_demographics cd, item i WHERE cd.cd_gender = "M" AND cd.cd_education_status = "College" AND ss.ss_cdemo_sk = cd.cd_demo_sk AND ss.ss_item_sk = i.i_item_sk GROUP BY i_item_id。 其中,store_sales 表有23 个属性,每个元组由137 个字符组成;item 表有22 个属性,每个 元组由333 个字符组成;customer_demographics 表有9 个属性,每个元组由30 个字符组成。 将该任务分解为2 个子任务,子任务1 完成customer_demographics 表符合条件的元组选择 和与store_sales 的连接任务,子任务2 完成子任务1 的连接结果与表item 的连接已经聚集 学术论文网Tag:代写代发论文 论文发表 职称论文发表 代写机械论文 |