# versions
import IPython
print("pyspark version:" + str(sc.version))
print("Ipython version:" + str(IPython.__version__))
pyspark version:1.6.1 Ipython version:4.2.0
# agg
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.agg({"amt":"avg"})
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-------------------+ | avg(amt)| +-------------------+ |0.20000000000000004| +-------------------+
# alias
from pyspark.sql.functions import col
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.alias('transactions')
x.show()
y.show()
y.select(col("transactions.to")).show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+ | to| +-----+ | Bob| |Carol| | Dave| +-----+
# cache
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
print(x.count()) # first action materializes x in memory
print(x.count()) # later actions avoid IO overhead
3 3
# coalesce
x_rdd = sc.parallelize([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)],2)
x = sqlContext.createDataFrame(x_rdd, ['from','to','amt'])
y = x.coalesce(numPartitions=1)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())
2 1
# collect
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.collect() # creates list of rows on driver
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]
# columns
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.columns #creates list of column names on driver
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ ['from', 'to', 'amt']
# corr
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.corr(col1="amt",col2="fee")
x.show()
print(y)
+-----+-----+---+-----+ | from| to|amt| fee| +-----+-----+---+-----+ |Alice| Bob|0.1|0.001| | Bob|Carol|0.2| 0.02| |Carol| Dave|0.3| 0.02| +-----+-----+---+-----+ 0.866025403784
# count
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
print(x.count())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ 3
# cov
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.cov(col1="amt",col2="fee")
x.show()
print(y)
+-----+-----+---+-----+ | from| to|amt| fee| +-----+-----+---+-----+ |Alice| Bob|0.1|0.001| | Bob|Carol|0.2| 0.02| |Carol| Dave|0.3| 0.02| +-----+-----+---+-----+ 0.00095
# crosstab
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.crosstab(col1='from',col2='to')
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-------+----+-----+---+ |from_to|Dave|Carol|Bob| +-------+----+-----+---+ | Bob| 0| 1| 0| | Alice| 0| 0| 1| | Carol| 1| 0| 0| +-------+----+-----+---+
# cube
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Alice","Carol",0.2)], ['from','to','amt'])
y = x.cube('from','to')
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show()
y.max().show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| |Alice|Carol|0.2| +-----+-----+---+ <pyspark.sql.group.GroupedData object at 0x7fc5383bcc50> +-----+-----+-------------------+ | from| to| sum(amt)| +-----+-----+-------------------+ |Alice|Carol| 0.2| |Alice| Bob| 0.1| |Alice| null|0.30000000000000004| | null|Carol| 0.2| | null| Bob| 0.1| | null| null|0.30000000000000004| +-----+-----+-------------------+ +-----+-----+--------+ | from| to|max(amt)| +-----+-----+--------+ |Alice|Carol| 0.2| |Alice| Bob| 0.1| |Alice| null| 0.2| | null|Carol| 0.2| | null| Bob| 0.1| | null| null| 0.2| +-----+-----+--------+
# describe
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.describe().show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-------+-------------------+ |summary| amt| +-------+-------------------+ | count| 3| | mean|0.20000000000000004| | stddev|0.09999999999999998| | min| 0.1| | max| 0.3| +-------+-------------------+
# distinct
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.distinct()
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| | Bob|Carol|0.2| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Carol| Dave|0.3| |Alice| Bob|0.1| +-----+-----+---+
# drop
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.drop('amt')
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+ | from| to| +-----+-----+ |Alice| Bob| | Bob|Carol| |Carol| Dave| +-----+-----+
# dropDuplicates / drop_duplicates
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Bob","Carol",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropDuplicates(subset=['from','to'])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| | Bob|Carol|0.3| | Bob|Carol|0.2| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Alice| Bob|0.1| +-----+-----+---+
# dropna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropna(how='any',subset=['from','to'])
x.show()
y.show()
+-----+-----+----+ | from| to| amt| +-----+-----+----+ | null| Bob| 0.1| | Bob|Carol|null| |Carol| null| 0.3| | Bob|Carol| 0.2| +-----+-----+----+ +----+-----+----+ |from| to| amt| +----+-----+----+ | Bob|Carol|null| | Bob|Carol| 0.2| +----+-----+----+
# dtypes
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.dtypes
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [('from', 'string'), ('to', 'string'), ('amt', 'double')]
# explain
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.agg({"amt":"avg"}).explain(extended = True)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ == Parsed Logical Plan == 'Aggregate ['avg(amt#296) AS avg(amt)#297] +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1 == Analyzed Logical Plan == avg(amt): double Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297] +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1 == Optimized Logical Plan == Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297] +- Project [amt#296] +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1 == Physical Plan == TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Final,isDistinct=false)], output=[avg(amt)#297]) +- TungstenExchange SinglePartition, None +- TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Partial,isDistinct=false)], output=[sum#301,count#302L]) +- Project [amt#296] +- Scan ExistingRDD[from#294,to#295,amt#296]
# fillna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3)], ['from','to','amt'])
y = x.fillna(value='unknown',subset=['from','to'])
x.show()
y.show()
+-----+-----+----+ | from| to| amt| +-----+-----+----+ | null| Bob| 0.1| | Bob|Carol|null| |Carol| null| 0.3| +-----+-----+----+ +-------+-------+----+ | from| to| amt| +-------+-------+----+ |unknown| Bob| 0.1| | Bob| Carol|null| | Carol|unknown| 0.3| +-------+-------+----+
# filter
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.filter("amt > 0.1")
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
# first
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.first()
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ Row(from=u'Alice', to=u'Bob', amt=0.1)
# flatMap
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.flatMap(lambda x: (x[0],x[2]))
print(y) # implicit coversion to RDD
y.collect()
PythonRDD[227] at RDD at PythonRDD.scala:43
[u'Alice', 0.1, u'Bob', 0.2, u'Carol', 0.3]
# foreach
from __future__ import print_function
# setup
fn = './foreachExampleDataFrames.txt'
open(fn, 'w').close() # clear the file
def fappend(el,f):
'''appends el to file f'''
print(el,file=open(f, 'a+') )
# example
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
print (foreachExample.read())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ None Row(from=u'Carol', to=u'Dave', amt=0.3) Row(from=u'Bob', to=u'Carol', amt=0.2) Row(from=u'Alice', to=u'Bob', amt=0.1)
# foreachPartition
from __future__ import print_function
#setup
fn = './foreachPartitionExampleDataFrames.txt'
open(fn, 'w').close() # clear the file
def fappend(partition,f):
'''append all elements in partition to file f'''
print([el for el in partition],file=open(f, 'a+'))
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.foreachPartition(lambda x: fappend(x,fn)) # writes into foreachPartitionExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
print (foreachExample.read())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ None [] [Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]
# freqItems
x = sqlContext.createDataFrame([("Bob","Carol",0.1), \
("Alice","Dave",0.1), \
("Alice","Bob",0.1), \
("Alice","Bob",0.5), \
("Carol","Bob",0.1)], \
['from','to','amt'])
y = x.freqItems(cols=['from','amt'],support=0.8)
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.1| |Alice| Dave|0.1| |Alice| Bob|0.1| |Alice| Bob|0.5| |Carol| Bob|0.1| +-----+-----+---+ +--------------+-------------+ |from_freqItems|amt_freqItems| +--------------+-------------+ | [Alice]| [0.1]| +--------------+-------------+
# groupBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from')
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| |Alice|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ <pyspark.sql.group.GroupedData object at 0x7fc53831f5d0>
# groupBy(col1).avg(col2)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from').avg('amt')
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| |Alice|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-------------------+ | from| avg(amt)| +-----+-------------------+ |Carol| 0.3| |Alice|0.15000000000000002| +-----+-------------------+
# head
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.head(2)
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]
# intersect
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Alice",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.intersect(y)
x.show()
y.show()
z.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Alice|0.2| |Carol| Dave|0.1| +-----+-----+---+ +-----+---+---+ | from| to|amt| +-----+---+---+ |Alice|Bob|0.1| +-----+---+---+
# isLocal
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.isLocal()
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ False
# join
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',20),("Bob",40),("Dave",80)], ['name','age'])
z = x.join(y,x.to == y.name,'inner').select('from','to','amt','age')
x.show()
y.show()
z.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+---+ | name|age| +-----+---+ |Alice| 20| | Bob| 40| | Dave| 80| +-----+---+ +-----+----+---+---+ | from| to|amt|age| +-----+----+---+---+ |Carol|Dave|0.3| 80| |Alice| Bob|0.1| 40| +-----+----+---+---+
# limit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.limit(2)
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| +-----+-----+---+
# map
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.map(lambda x: x.amt+1)
x.show()
print(y.collect()) # output is RDD
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [1.1, 1.2, 1.3]
# mapPartitions
def amt_sum(partition):
'''sum the value in field amt'''
yield sum([el.amt for el in partition])
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.mapPartitions(lambda p: amt_sum(p))
x.show()
print(x.rdd.glom().collect()) # flatten elements on the same partition
print(y.collect())
print(y.glom().collect())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)], []] [0.6000000000000001, 0] [[0.6000000000000001], [0]]
# na
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.na # returns an object for handling missing values, supports drop, fill, and replace methods
x.show()
print(y)
y.drop().show()
y.fill({'from':'unknown','to':'unknown','amt':0}).show()
y.fill(0).show()
+-----+-----+----+ | from| to| amt| +-----+-----+----+ | null| Bob| 0.1| | Bob|Carol|null| |Carol| null| 0.3| | Bob|Carol| 0.2| +-----+-----+----+ <pyspark.sql.dataframe.DataFrameNaFunctions object at 0x7fc538392b90> +----+-----+---+ |from| to|amt| +----+-----+---+ | Bob|Carol|0.2| +----+-----+---+ +-------+-------+---+ | from| to|amt| +-------+-------+---+ |unknown| Bob|0.1| | Bob| Carol|0.0| | Carol|unknown|0.3| | Bob| Carol|0.2| +-------+-------+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | null| Bob|0.1| | Bob|Carol|0.0| |Carol| null|0.3| | Bob|Carol|0.2| +-----+-----+---+
# orderBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.orderBy(['from'],ascending=[False])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Carol| Dave|0.3| | Bob|Carol|0.2| |Alice| Bob|0.1| +-----+-----+---+
# persist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.persist(storageLevel=StorageLevel(True,True,False,True,1)) # StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)
x.show()
x.is_cached
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
True
# printSchema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.printSchema()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ root |-- from: string (nullable = true) |-- to: string (nullable = true) |-- amt: double (nullable = true)
# randomSplit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.randomSplit([0.5,0.5])
x.show()
y[0].show()
y[1].show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +----+---+---+ |from| to|amt| +----+---+---+ +----+---+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
# rdd
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rdd
x.show()
print(y.collect())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]
# registerTempTable
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.registerTempTable(name="TRANSACTIONS")
y = sqlContext.sql('SELECT * FROM TRANSACTIONS WHERE amt > 0.1')
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
# repartition
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.repartition(3)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())
4 3
# replace
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.replace('Dave','David',['from','to'])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol|David|0.3| +-----+-----+---+
# rollup
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rollup(['from','to'])
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show()
y.max().show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ <pyspark.sql.group.GroupedData object at 0x7fc5382f7850> +-----+-----+------------------+ | from| to| sum(amt)| +-----+-----+------------------+ |Alice| Bob| 0.1| | Bob|Carol| 0.2| |Alice| null| 0.1| |Carol| Dave| 0.3| | Bob| null| 0.2| |Carol| null| 0.3| | null| null|0.6000000000000001| +-----+-----+------------------+ +-----+-----+--------+ | from| to|max(amt)| +-----+-----+--------+ |Alice| Bob| 0.1| | Bob|Carol| 0.2| |Alice| null| 0.1| |Carol| Dave| 0.3| | Bob| null| 0.2| |Carol| null| 0.3| | null| null| 0.3| +-----+-----+--------+
# sample
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.sample(False,0.5)
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +----+-----+---+ |from| to|amt| +----+-----+---+ | Bob|Carol|0.2| +----+-----+---+
# sampleBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Alice","Alice",0.3), \
('Alice',"Dave",0.4),("Bob","Bob",0.5),("Bob","Carol",0.6)], \
['from','to','amt'])
y = x.sampleBy(col='from',fractions={'Alice':0.1,'Bob':0.9})
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| |Alice|Carol|0.2| |Alice|Alice|0.3| |Alice| Dave|0.4| | Bob| Bob|0.5| | Bob|Carol|0.6| +-----+-----+---+ +----+-----+---+ |from| to|amt| +----+-----+---+ | Bob| Bob|0.5| | Bob|Carol|0.6| +----+-----+---+
# schema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.schema
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))
# select
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.select(['from','amt'])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+---+ | from|amt| +-----+---+ |Alice|0.1| | Bob|0.2| |Carol|0.3| +-----+---+
# selectExpr
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.selectExpr(['substr(from,1,1)','amt+10'])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +----------------+----------+ |substr(from,1,1)|(amt + 10)| +----------------+----------+ | A| 10.1| | B| 10.2| | C| 10.3| +----------------+----------+
# show
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
# sort
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.sort(['to'])
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol|Alice|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Carol|Alice|0.3| |Alice| Bob|0.1| | Bob|Carol|0.2| +-----+-----+---+
# sortWithinPartitions
x = sqlContext.createDataFrame([('Alice',"Bob",0.1,1),("Bob","Carol",0.2,2),("Carol","Alice",0.3,2)], \
['from','to','amt','p_id']).repartition(2,'p_id')
y = x.sortWithinPartitions(['to'])
x.show()
y.show()
print(x.rdd.glom().collect()) # glom() flattens elements on the same partition
print(y.rdd.glom().collect())
+-----+-----+---+----+ | from| to|amt|p_id| +-----+-----+---+----+ |Alice| Bob|0.1| 1| | Bob|Carol|0.2| 2| |Carol|Alice|0.3| 2| +-----+-----+---+----+ +-----+-----+---+----+ | from| to|amt|p_id| +-----+-----+---+----+ |Alice| Bob|0.1| 1| |Carol|Alice|0.3| 2| | Bob|Carol|0.2| 2| +-----+-----+---+----+ [[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2), Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2)]] [[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2), Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2)]]
# stat
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.stat
x.show()
print(y)
print(y.corr(col1="amt",col2="fee"))
+-----+-----+---+-----+ | from| to|amt| fee| +-----+-----+---+-----+ |Alice| Bob|0.1|0.001| | Bob|Carol|0.2| 0.02| |Carol| Dave|0.3| 0.02| +-----+-----+---+-----+ <pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7fc5382f7a50> 0.866025403784
# subtract
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.subtract(y)
x.show()
y.show()
z.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.1| +-----+-----+---+ +-----+----+---+ | from| to|amt| +-----+----+---+ |Carol|Dave|0.3| +-----+----+---+
# take
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.take(num=2)
x.show()
print(y)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ [Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]
# toDF
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toDF("seller","buyer","amt")
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +------+-----+---+ |seller|buyer|amt| +------+-----+---+ | Alice| Bob|0.1| | Bob|Carol|0.2| | Carol| Dave|0.3| +------+-----+---+
# toJSON
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.toJSON()
x.show()
print(y.collect())
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol|Alice|0.3| +-----+-----+---+ [u'{"from":"Alice","to":"Bob","amt":0.1}', u'{"from":"Bob","to":"Carol","amt":0.2}', u'{"from":"Carol","to":"Alice","amt":0.3}']
# toPandas
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toPandas()
x.show()
print(type(y))
y
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ <class 'pandas.core.frame.DataFrame'>
from | to | amt | |
---|---|---|---|
0 | Alice | Bob | 0.1 |
1 | Bob | Carol | 0.2 |
2 | Carol | Dave | 0.3 |
# unionAll
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2)], ['from','to','amt'])
y = sqlContext.createDataFrame([("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.unionAll(y)
x.show()
y.show()
z.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Carol| Dave|0.1| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| | Bob|Carol|0.2| |Carol| Dave|0.1| +-----+-----+---+
# unpersist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
x.count()
x.show()
print(x.is_cached)
x.unpersist()
print(x.is_cached)
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ True False
# where (filter)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.where("amt > 0.1")
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+---+ | from| to|amt| +-----+-----+---+ | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+
# withColumn
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",None),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumn('conf',x.amt.isNotNull())
x.show()
y.show()
+-----+-----+----+ | from| to| amt| +-----+-----+----+ |Alice| Bob| 0.1| | Bob|Carol|null| |Carol| Dave| 0.3| +-----+-----+----+ +-----+-----+----+-----+ | from| to| amt| conf| +-----+-----+----+-----+ |Alice| Bob| 0.1| true| | Bob|Carol|null|false| |Carol| Dave| 0.3| true| +-----+-----+----+-----+
# withColumnRenamed
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumnRenamed('amt','amount')
x.show()
y.show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +-----+-----+------+ | from| to|amount| +-----+-----+------+ |Alice| Bob| 0.1| | Bob|Carol| 0.2| |Carol| Dave| 0.3| +-----+-----+------+
# write
import json
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.write.mode('overwrite').json('./dataframeWriteExample.json')
x.show()
# read the dataframe back in from file
sqlContext.read.json('./dataframeWriteExample.json').show()
+-----+-----+---+ | from| to|amt| +-----+-----+---+ |Alice| Bob|0.1| | Bob|Carol|0.2| |Carol| Dave|0.3| +-----+-----+---+ +---+-----+-----+ |amt| from| to| +---+-----+-----+ |0.1|Alice| Bob| |0.2| Bob|Carol| |0.3|Carol| Dave| +---+-----+-----+