本地运行Pyspark的一次奇妙之旅
历史重演
import sys
import os
common_dir=r'D:\code\pysparkCode\modules\\'
#common_dir='F:\code\pysparkCode\modules\\'
os.environ['JAVA_HOME']=common_dir+"jdk1.7.0_80"
os.environ['PYTHONPATH']='C:\ProgramData\Anaconda3\envs\py35\python.exe'
sys.path.append(os.environ['PYTHONPATH'])
os.environ['HADOOP_HOME']=common_dir+'hadoop-2.5.0-cdh5.3.6'
os.environ['SPARK_HOME']=common_dir+'spark-1.6.1-bin-2.5.0-cdh5.3.6'
本地进行环境目录配置,看着没有任何毛病。
from core import config
import numpy as np
from pyspark import SparkConf, SparkContext
# 创建上下文
conf = SparkConf().setMaster("local[3]").setAppName("wordcount")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
rdd1=sc.parallelize([
"spark hadoop hbase spark hbase",
'spark hadoop spark hbase',
'm1 spark hbase dl',
'avc de'
])
'''
map API给定的处理函数f:
接受一个输入参数(即RDD中的一条数据),返回值直接就是RDD中的一条新数据
'''
print('-'*10+' map '+'-'*10)
rdd2=rdd1.map(f=lambda line:line.strip().split(' '))
print('map RDD value Size:{}'.format(rdd2.count()))
print(rdd2.collect())
'''
flatMap API给定的处理函数f:要求函数f的返回值必须是list、tuple或者数组类型
接受一个输入参数(即RDD中的一条数据),返回值为“集合类型”,最终RDD中的元素就是返回集中的元素
也就是一条数据输入,多条数据输出
'''
print('-'*10+' flatMap 1 '+'-'*10)
rdd3=rdd1.flatMap(f=lambda line:line.strip().split(' '))
print('flatMap RDD value Size:{}'.format(rdd3.count()))
print(rdd3.collect())
# 只会做一层flat操作,不会做错层
print('-'*10+' flatMap 2 '+'-'*10)
rdd4=rdd1.flatMap(f=lambda line:map(lambda word:[word,1],line.strip().split(' ')))
print('flatMap RDD value Size:{}'.format(rdd4.count()))
print(rdd4.collect())
'''
filter API给定的处理函数f:要求函数f的返回值必须是boolean类型
接受一个输入参数值(即RDD中的一条数据),返回值为boolean类型
返回为True表示输入的数据值保留,为False表示数据删除
'''
print('-'*10+' filter 1 '+'-'*10)
rdd5=rdd1.filter(f=lambda line:len(line)>=10)
print('filter RDD value Size:{}'.format(rdd5.count()))
print(rdd5.collect())
print('-'*10+' filter 2 '+'-'*10)
rdd6=rdd4.filter(f=lambda word_tuple:word_tuple[0]=='spark')
print('filter RDD value Size:{}'.format(rdd6.count()))
print(rdd6.collect())
'''
reduceByKey API:要求RDD中的数据类型必须是Key/Value的形式
功能:对RDD中的元素按照key进行分组,然后对每一组数据使用给定的函数f进行数据聚合操作
最终API返回的值是:key/聚合value值(函数f)所形成的RDD
要求:给定的函数返回的数据类型必须是原始RDD中的value的数据类型
'''
print('-'*10+' reduceByKey '+'-'*10)
rdd7=rdd4.reduceByKey(lambda v1,v2:v1+v2)
print('reduceByKey RDD value Size:{}'.format(rdd7.count()))
print(rdd7.collect())
'''
groupByKey API:要求RDD中的数据类型必须是Key/Value的形式
功能:对RDD中的元素按照key进行分组,将相同key的value放在一起,形成一个容器
最终API返回的值是:key/value值(value形成的迭代器)所形成的RDD
'''
print('-'*10+' groupByKey '+'-'*10)
rdd8=rdd4.groupByKey()
print('groupByKey RDD value Size:{}'.format(rdd8.count()))
rdd8_collect=rdd8.collect()
print(rdd8_collect)
for k,vs in rdd8_collect:
print('Key={},Value=('.format(k),end=' ')
for v in vs:
print(v,end=',')
print(')')
'''
aggregateByKey API:要求RDD中的数据类型必须是Key/Value的形式
功能:对RDD中的元素按照key进行分组,将相同key的value值进行聚合操作,区别是:reduceByKey要求聚合之后的值必须和原始的value值类型一直,aggregateByKey对于聚合之后的值数据类型不做要求
zeroValue:对于每个key而言,对应的聚合初始化值
seqFunc:给定聚合临时聚合值和value值的函数,输入两个参数,函数返回新的聚合值,eg:a就是当前key对应的临时聚合值,b是当前key对应的一个value值
combFunc:给定聚合两个临时聚合只的函数,输入两个参数,函数返回新的聚合值。eg:c和d均是一个临时的聚合值
'''
print('-'*10+' aggregateByKey '+'-'*10)
rdd9=rdd4.aggregateByKey(zeroValue=0,seqFunc=lambda a,b:a+b,combFunc=lambda c,d:c+d)
print('aggregateByKey RDD value Size:{}'.format(rdd9.count()))
print(rdd9.collect())
print('-'*10+' aggregateByKey '+'-'*10)
rdd10=rdd9.aggregateByKey(zeroValue=np.array([]),seqFunc=lambda a,b:np.append(a,b),combFunc=lambda c,d:np.append(c,d))
print('aggregateByKey RDD value Size:{}'.format(rdd10.count()))
print(rdd10.collect())
print('='*100)
def print_iter(iter):
for k,v in iter:
print('key={},value={}'.format(k,v))
rdd10.foreachPartition(lambda iter:print_iter(iter))
总结了一下,spark core常用的api,之前运行每一小段,都能出结果。但是hdfs上的结果不太理想。
hdfs
日志生成出来,但是内容是空,什么原因呢?这个Replication也不对,在hdfs-site.xml 中,配置Replication为1了。
hdfs-site.xml
甚是奇怪,一开始想法既然能运行出结果,就无所谓了,但是知道运行这个 常常的代码的时候,就不对劲了。
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /spark/history/local-1553782399667.inprogress could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget(BlockManager.java:1503)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3124)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:636)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:188)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:476)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
at org.apache.hadoop.ipc.Client.call(Client.java:1411)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy12.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:391)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy13.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1473)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1290)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:536)
整个程序都能运行出代码结果,但是这个写日志这件事情,一直出错。
然后我想是不是服务器hdfs配置文件有问题?hdfs环境有问题,磁盘不够?尝试了重新生成hdfs环境,移动了hdfs本地目录,都没有解决。
困扰了好几天,经过友人的提醒,是不是这个本地的配置文件没有起作用。
我就抱着试一试的态度,在本地的hdfs-site.xml中写了点异常内容,然后执行程序,结果依旧可以运行,果然这个hdfs配置文件没有被读取到。
问题解决
问题分析:pyspark应该只读取spark目录下的conf的配置文件,并不会到hadoop下去读取配置文件。
解决方法:将本地的hadoop下的hdfs-site.xml文件移动到本地spark包下的conf中
程序运行,没有任何错误,完美~~~