Notes in Pyspark

init, stop

Common init setup for SparkSession

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import time
import cPickle as pickle
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master('yarn').appName('linziheng:video') \
            .config('spark.yarn.queue','userb') \
            .config('spark.executor.memory','8g') \
            .config('spark.executor.instances','40') \
            .config('spark.sql.warehouse.dir','hdfs:///user/userb/linziheng') \ # add this line
            .getOrCreate()

Some issue with spark session, spark sql

Known issue in spark 2.0.1 https://issues.apache.org/jira/browse/SPARK-17810 https://issues.apache.org/jira/browse/SPARK-15034 * https://docs.rapidminer.com/latest/radoop/troubleshooting/known-errors.html

we solve it by adding config in spark session builder:

.config('spark.sql.warehouse.dir','hdfs:///user/userb/linziheng')

Stop SparkSession

spark.stop()

Read data

RDD, Dataframe, Dataset

Convert to RDD to get more handle to data

Some issue when parsing csv data

the first row contains some unexpected data -> we solve it by filtering with

df.filter( length( df['timestamp'] ) < 100 )
df_sample = spark.read.option("header", "false").csv('hdfs:///user/userb/linziheng/original_data/result48.tar.gz')

Rename column names

oldColumns = df_sample.schema.names
newColumns = ["timestamp", "userid", "webid", "user_agent", "videoid"]

df_sample = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), 
                   xrange(len(oldColumns)), df)
df.printSchema()
print "row number: " + str( df.count() )
df.show(5)

Output

output as csv

a3.coalesce(1).write.csv('/user/userb/linziheng/cikmToJournal/feature_value_all', header=True)

Convert column to list

l = df.select('colname').rdd.map(lambda row : row[0]).collect()

UDF, UDAF

UDF (user define function) test

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def func(i):
    return i.strip()
func_udf = udf(func, StringType())
a = df.withColumn("test", func_udf(df.videoid))

Pyspark cannot use UDAF (user define agg function)

# example usage
# first: groupby and collect_list
a = df.groupBy(df.userid).agg(count('videoid').alias('records_count'), \
                              countDistinct('webid').alias('webid_count'), \
                              collect_list(df.webid).alias('webid_list'), \
                              collect_list(df.videoid).alias('videoid_list'), \
                              collect_list(df.unix_timestamp).alias('timestamp_list'))
# then: apply udf to collected list
a2 = a.withColumn('avg_viewing_time_second', calcAvgViewingTime_udf(a.videoid_list, a.timestamp_list)) \
      .withColumn('migrate_prob', calcMigrateProb_udf(a.webid_list, a.timestamp_list)) \
      .select(['webid_count', 'records_count', 'avg_viewing_time_second', 'migrate_prob'])

Basic spark.sql.functions

unix_timestamp

pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
''' 
Convert time string with given pattern (‘yyyy-MM-dd HH:mm:ss’, by default) to Unix time stamp (in seconds), using the default timezone and the default locale, return null if fail.

if timestamp is None, then it returns current timestamp.
'''
df = df.withColumn('unix_timestamp', unix_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))

substring

pyspark.sql.functions.substring(str, pos, len)
'''
Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type
'''
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]

df2 = spark.createDataFrame([('abcd',)], ['s',])
df2.show()
df2.select(substring(df2.s, 3, 1).alias('s')).collect()

orderBy

asc, desc

df = df.orderBy( [asc('colname1'), desc('colname2')] )