python执行地图reduce

python执行mapreduce

执行mapreduce有两种方式,一种是原生Java写的mr,一种是直接使用Streaming方式,这种方式是在Java上面做了一个封装,可以通过其它语言调用Java原生的mr接口。

  • 优点
    • 可以使用自己喜欢的语言来编写MapReduce程序(换句话说,不必写Java XD)
    • 不需要像写Java的MR程序那样import一大堆库,在代码里做一大堆配置,很多东西都抽象到了stdio上,代码量显著减少
    • 因为没有库的依赖,调试方便,并且可以脱离Hadoop先在本地用管道模拟调试
  • 缺点
    • 只能通过命令行参数来控制MapReduce框架,不像Java的程序那样可以在代码里使用API,控制力比较弱,有些东西鞭长莫及
    • 因为中间隔着一层处理,效率会比较慢
下面是一个例子:

第一个文件test.dat:

0067011990999991950051507004...9999999N9+00001+99999999999... 
0043011990999991950051512004...9999999N9+00221+99999999999... 
0043011990999991950051518004...9999999N9-00111+99999999999... 
0043012650999991949032412004...0500001N9+01111+99999999999... 
0043012650999991949032418004...0500001N9+01121+99999999999...
0043012650999991949032418004...0500001N9+01221+99999999999...


第二个文件:map.py

#!usr/bin/python

import re
import sys
for line in sys.stdin:
    val=line.strip()
    (year,temp)=(val[15:19],val[40:45])
    print "%s\t%s" % (year,temp)




第三个文件:red.py

#!usr/bin/python

import sys
(last_key,max_val)=(None,0)
for line in sys.stdin:
  (key,val)=line.strip().split('\t')
  if last_key and last_key!=key:
    print '%s\t%s' % (last_key, max_val)
    (last_key, max_val)=(key,int(val))
  else:
    (last_key, max_val)=(key,max(max_val,int(val)))
if last_key:
  print '%s\t%s' % (last_key, max_val)


执行的mr命令:

hadoop jar /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/hadoop-streaming-2.5.0-cdh5.3.2.jar 
	-mapper 'python map.py' -file /usr/local/tables/map.py 
	-reducer 'python red.py' -file /usr/local/tables/red.py 
	-input /tmp/logs/test.dat 
	-output /tmp/test/


为了让Hadoop将程序分发给其他机器,需要再加一个-file参数用于指明要分发的程序放在哪里。