Spark User-guide Summary - Basic Programming
Use pip
command to install pyspark
Simple operations in Spark shell
data abstraction, abstract a file to Spark DataFrame
textFile = spark.read.text("README.md") textFile.count() # Number of rows in this DataFrame textFile.first() # First row in this DataFrame linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # Transfer the DataFrame to a new one textFile.select(size(split(textFile.value, "\s+")).name("numWords")) .agg(max(col("numWords"))).collect() # select takes Col as para and create a new Col "numWords" # agg counts the most word of Col "numWords"
implementing MapReduce:
wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count() # explode transfers from line to word # groupby and count to count per word number wordCounts.collect()
hold data lines
in memory by using cache:
lines.cache()
Use Python file directly
To build PySpark package, first add to setup.py
$ YOUR_SPARK_HOME/bin/spark-submit --master local[4] SimpleApp.py
Write application through PySpark, named SimpleApp.py
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache() spark.stop()
To run this app, use $ YOUR_SPARK_HOME/bin/spark-submit --master local[4] SimpleApp.py
or if pyspark
is in Python package, just run python SimpleApp.py
Run Spark on Cluster
To access the cluster, use SparkContext
Object, which is initialized by SparkConf
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
In PySpark shell, there is already a SparkContext
Object named sc
Example, create a parallelized collection holding the numbers 1 to 5:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
then, we can use distData.reduce(lambda a, b: a + b)
as reduce opeartion
To import external dataset and use MapReduce
distFile = sc.textFile("data.txt") distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
or use function to map
def func() distFile.map(func)
To change the file saving format, see http://spark.apache.org/docs/...
To print the data on the single machine: rdd.foreach(println)
For cluster, rdd.foreach(println)
will only print the data on itself so that it can not be used on driver, rdd.collect().foreach(println)
will collect all the data so that it will go out of the memory, the safe way is:
rdd.take(100).foreach(println)
Key-value pairs
To count how many times each line occurs,
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
Operations like reduceByKey, sortByKey
only accept Key-value pairs as para
Shared variables
create broadcast variable
broadcastVar = sc.broadcast([1, 2, 3]) broadcastVar.value
create accumulator variable
accum = sc.accumulator(0) sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) # accum.value is 10
Note: sc.parallelize(),sc.textFile()
has same function, used on python internal data and external data respectively