Notes in Pyspark
init, stop
Common init setup for SparkSession
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import time
import cPickle as pickle
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master('yarn').appName('linziheng:video') \
.config('spark.yarn.queue','userb') \
.config('spark.executor.memory','8g') \
.config('spark.executor.instances','40') \
.config('spark.sql.warehouse.dir','hdfs:///user/userb/linziheng') \ # add this line
.getOrCreate()
Some issue with spark session, spark sql
Known issue in spark 2.0.1 https://issues.apache.org/jira/browse/SPARK-17810 https://issues.apache.org/jira/browse/SPARK-15034 * https://docs.rapidminer.com/latest/radoop/troubleshooting/known-errors.html
we solve it by adding config in spark session builder:
.config('spark.sql.warehouse.dir','hdfs:///user/userb/linziheng')
Stop SparkSession
spark.stop()
Read data
RDD, Dataframe, Dataset
- https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
Convert to RDD to get more handle to data
Some issue when parsing csv data
the first row contains some unexpected data -> we solve it by filtering with
df.filter( length( df['timestamp'] ) < 100 )
df_sample = spark.read.option("header", "false").csv('hdfs:///user/userb/linziheng/original_data/result48.tar.gz')
Rename column names
oldColumns = df_sample.schema.names
newColumns = ["timestamp", "userid", "webid", "user_agent", "videoid"]
df_sample = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]),
xrange(len(oldColumns)), df)
print df basic info
df.printSchema()
print "row number: " + str( df.count() )
df.show(5)
Output
output as csv
- coalesce(n) means output results in n file
a3.coalesce(1).write.csv('/user/userb/linziheng/cikmToJournal/feature_value_all', header=True)
Convert column to list
l = df.select('colname').rdd.map(lambda row : row[0]).collect()
UDF, UDAF
UDF (user define function) test
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def func(i):
return i.strip()
func_udf = udf(func, StringType())
a = df.withColumn("test", func_udf(df.videoid))
Pyspark cannot use UDAF (user define agg function)
- Problem
- UDAF is not supported in PySpark
- Solution
- collect_list + UDF = UDAF
- https://stackoverflow.com/questions/46187630/how-to-write-pyspark-udaf-on-multiple-columns
- Use
collect_set(col)
andcollect_list(col)
first, then use a UDF
- Use Scala, Java to write UDAF, then import in Python
- collect_list + UDF = UDAF
# example usage
# first: groupby and collect_list
a = df.groupBy(df.userid).agg(count('videoid').alias('records_count'), \
countDistinct('webid').alias('webid_count'), \
collect_list(df.webid).alias('webid_list'), \
collect_list(df.videoid).alias('videoid_list'), \
collect_list(df.unix_timestamp).alias('timestamp_list'))
# then: apply udf to collected list
a2 = a.withColumn('avg_viewing_time_second', calcAvgViewingTime_udf(a.videoid_list, a.timestamp_list)) \
.withColumn('migrate_prob', calcMigrateProb_udf(a.webid_list, a.timestamp_list)) \
.select(['webid_count', 'records_count', 'avg_viewing_time_second', 'migrate_prob'])
Basic spark.sql.functions
unix_timestamp
- http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.unix_timestamp
pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
'''
Convert time string with given pattern (‘yyyy-MM-dd HH:mm:ss’, by default) to Unix time stamp (in seconds), using the default timezone and the default locale, return null if fail.
if timestamp is None, then it returns current timestamp.
'''
df = df.withColumn('unix_timestamp', unix_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))
substring
- Note: in PySpark, substring is 1-index based
pyspark.sql.functions.substring(str, pos, len)
'''
Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type
'''
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]
df2 = spark.createDataFrame([('abcd',)], ['s',])
df2.show()
df2.select(substring(df2.s, 3, 1).alias('s')).collect()
orderBy
asc, desc
df = df.orderBy( [asc('colname1'), desc('colname2')] )