大数据系列3:用Python编写MapReduce
vi mapper.py
输入:
#!/usr/bin/env python
importsys
for linein sys.stdin:
line= line.strip()
words= line.split()
forword in words:
print'%s\t%s' % (word,1)
chmod +x mapper.py
vi reducer.py
输入:
#!/usr/bin/envpython
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count =line.split('\t', 1)
try:
count =int(count)
except ValueError:
continue
if current_word ==word:
current_count+= count
else:
ifcurrent_word:
print'%s\t%s' % (current_word, current_count)
current_count= count
current_word= word
if current_word:
print'%s\t%s' % (current_word, current_count)
chmod +x reducer.py
本地操作系统测试:
echo "foo fooquux labs foo bar quux" | ./mapper.py | sort | ./reducer.py
提交HADOOP集群运行:
hadoop jar hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar -input input -output output-streaming-python -mapper /home/ysc/mapper.py -reducer /home/ysc/reducer.py