Data Preprocessing with PySpark¶
In this post, we will introduce ourselves to pyspark
, a framework that allows us to work with big data. Similar to how we did in my first machine learning project post
Introduction¶
In this post, we'll introduce ourselves to pyspark
, working on a commonly used classification problem; the titanic. Our focus will be to learn the basics of how to work with pyspark
when we work on machine learning projects. We'll split this little project into two parts;
Spark over Pandas¶
In the previous post, we used pandas
for working with tabular data. pandas
is indeed quite convenient to use as it has a very rich functionality to work with tabular data, pyspark
in comparison is much simplier, however it offers the user to work with big data
, which pandas
tends to strugle with.
Some advantages of pyspark
over pandas
:
- Scalability: PySpark is designed to handle large datasets that cannot be processed on a single machine. It can distribute the processing of data across a cluster of machines, which makes it suitable for big data applications. Pandas, on the other hand, is limited by the memory of a single machine.
- Speed: PySpark is faster than pandas when dealing with large datasets. This is because PySpark uses distributed computing and can process data in parallel across multiple machines. Pandas, on the other hand, is limited by the processing power of a single machine.
- Machine learning: PySpark has built-in machine learning libraries such as MLlib and MLflow, which makes it easy to perform machine learning tasks on large datasets. Pandas does not have built-in machine learning libraries.
Overall, pyspark
is a better choice for big data applications that require distributed computing and machine learning capabilities. Pandas is suitable for smaller datasets that can be processed on a single machine.
Start Spark Session¶
First of all, we create a spark
session, importing SparkSession
from pyspark.sql
and creating an instance, to which we will refence to when reading our data
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
# create spark session
spark = SparkSession.builder\
.appName('titanic')\
.getOrCreate()
Loading Data¶
The dataset can be found from this source, which we can simply download using wget
in Jupyter
pyspark
supports a variety of input formats, to load a csv
file, we can call spark.read
. When reading our table, we should ideally specify the data types. We'll see below, which types are loaded when we don't specify them.
- Another alternative is to automaticaly detect suitable types for each column, we can do this by writing inferSchema=True
in either .csv
or .options
- Or we can specify our own schema using .schema(schema)
, like something shown below:
from pyspark.sql.types import StructTypes
schema = StructType() \
.add("PassengerId",IntegerType(),True) \
.add("Name",StringType(),True) \
.add("Fare",DoubleType(),True) \
.add("Decommisioned",BooleanType(),True)
Let's read our dataset, that contains a header, which requires header=True
, like in pandas
df = spark.read.csv('train.csv',header=True)
df.show(1,vertical=True)
# -RECORD 0---------------------------
# PassengerId | 1
# Survived | 0
# Pclass | 3
# Name | Braund, Mr. Owen ...
# Sex | male
# Age | 22.0
# SibSp | 1
# Parch | 0
# Ticket | A/5 21171
# Fare | 7.25
# Cabin | null
# Embarked | S
# only showing top 1 row
Data Preprocessing¶
DataFrame Column Types¶
Like in pandas
, we can call the method .dtypes
, to show the column types. Default column type interpretations aren't always ideal, so its useful to load your own schema
df.dtypes
# [('PassengerId', 'string'),
# ('Survived', 'string'),
# ('Pclass', 'string'),
# ('Name', 'string'),
# ('Sex', 'string'),
# ('Age', 'string'),
# ('SibSp', 'string'),
# ('Parch', 'string'),
# ('Ticket', 'string'),
# ('Fare', 'string'),
# ('Cabin', 'string'),
# ('Embarked', 'string')]
DataFrame Statistics¶
Like in pandas
, we can utilise method describe
, in order to show column statistics.
df.describe(['Sex','Age']).show()
# +-------+------+------------------+
# |summary| Sex| Age|
# +-------+------+------------------+
# | count| 891| 714|
# | mean| null| 29.69911764705882|
# | stddev| null|14.526497332334035|
# | min|female| 0.42|
# | max| male| 9|
# +-------+------+------------------+
Show Missing Data¶
If we want to count the missing data in all our columns we can do the following:
df.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# +-----------+--------+------+---+---+-----+-----+----+--------+-----------+-----+
# |PassengerId|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|Family_Size|Alone|
# +-----------+--------+------+---+---+-----+-----+----+--------+-----------+-----+
# | 0| 0| 0| 0| 0| 0| 0| 0| 2| 0| 0|
# +-----------+--------+------+---+---+-----+-----+----+--------+-----------+-----+
We can show rows with missing data using .where
and .f.col('column').isNull()
age_miss = df.where(f.col('Age').isNull())
age_miss.show(5)
# +-----------+--------+------+--------------------+------+----+-----+-----+------+------+-----+--------+
# |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch|Ticket| Fare|Cabin|Embarked|
# +-----------+--------+------+--------------------+------+----+-----+-----+------+------+-----+--------+
# | 6| 0| 3| Moran, Mr. James| male|null| 0| 0|330877|8.4583| null| Q|
# | 18| 1| 2|Williams, Mr. Cha...| male|null| 0| 0|244373| 13.0| null| S|
# | 20| 1| 3|Masselmani, Mrs. ...|female|null| 0| 0| 2649| 7.225| null| C|
# | 27| 0| 3|Emir, Mr. Farred ...| male|null| 0| 0| 2631| 7.225| null| C|
# | 29| 1| 3|"O'Dwyer, Miss. E...|female|null| 0| 0|330959|7.8792| null| Q|
# +-----------+--------+------+--------------------+------+----+-----+-----+------+------+-----+--------+
Dropping Irrelovant Columns¶
We can decide to remove columns that we won't be needing in our project by calling .drop
, which is the same in pandas
df = df.drop('Ticket','Name','Fare','Cabin')
df.show(5)
# +-----------+--------+------+------+---+-----+-----+--------+
# |PassengerId|Survived|Pclass| Sex|Age|SibSp|Parch|Embarked|
# +-----------+--------+------+------+---+-----+-----+--------+
# | 1| 0| 3| male| 22| 1| 0| S|
# | 2| 1| 1|female| 38| 1| 0| C|
# | 3| 1| 3|female| 26| 0| 0| S|
# | 4| 1| 1|female| 35| 1| 0| S|
# | 5| 0| 3| male| 35| 0| 0| S|
# +-----------+--------+------+------+---+-----+-----+--------+
Adding Columns to DataFrame¶
Column additions do however work a little differently, to add a column we add .withColumn
df = df.withColumn('FamilySize',f.col('SibSp') + f.col('Parch') + 1)
df.show(5)
# +-----------+--------+------+------+---+-----+-----+--------+----------+
# |PassengerId|Survived|Pclass| Sex|Age|SibSp|Parch|Embarked|FamilySize|
# +-----------+--------+------+------+---+-----+-----+--------+----------+
# | 1| 0| 3| male| 22| 1| 0| S| 2.0|
# | 2| 1| 1|female| 38| 1| 0| C| 2.0|
# | 3| 1| 3|female| 26| 0| 0| S| 1.0|
# | 4| 1| 1|female| 35| 1| 0| S| 2.0|
# | 5| 0| 3| male| 35| 0| 0| S| 1.0|
# +-----------+--------+------+------+---+-----+-----+--------+----------+
We can also define a condition, based on which we'll create a unique feature
ndf = ndf.withColumn('M',f.col('Sex') == 'male')
ndf = ndf.withColumn('F',f.col('Sex') == 'female')
ndf = ndf.drop('sex')
Data Imputation¶
Data imputation can be done via fillna
, we pass a dictionary containing key,value pair for column name and value respectively
av_age = df.select(f.avg(f.col('age')))
av_age.show()
# +-----------------+
# | avg(age)|
# +-----------------+
# |29.69911764705882|
# +-----------------+
# To convert to python types, we can write:
av_age.collect()[0][0] # 29.69911764705882
ndf = df.fillna({'age':av_age.collect()[0][0]})
ndf.show(5)
# +-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+-----+--------+
# |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
# +-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+-----+--------+
# | 1| 0| 3|Braund, Mr. Owen ...| male| 22.0| 1| 0| A/5 21171| 7.25| null| S|
# | 2| 1| 1|Cumings, Mrs. Joh...|female| 38.0| 1| 0| PC 17599|71.2833| C85| C|
# | 3| 1| 3|Heikkinen, Miss. ...|female| 26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
# | 4| 1| 1|Futrelle, Mrs. Ja...|female| 35.0| 1| 0| 113803| 53.1| C123| S|
# | 5| 0| 3|Allen, Mr. Willia...| male| 35.0| 0| 0| 373450| 8.05| null| S|
# +-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+-----+--------+
Conclusion¶
Let's review what we have covered in this post:
- We learned how to drop columns, using .drop
- We learned how to extract statistical data from our dataframe, using .select
and functions f.avg('column')
- We known how to fill missing data in different columns using a single value with a dictionary; f.fillna({'column':'value'})
- Add or replace a column, using f.withColumn
- StringIndexer(inputCol,outputCol).fit(data)
- convert categorical into a numerical representation
- Once we are done with our feature matrix, we can convert all the relevant features into a single feature that will be used as input into the model using VectorAssembler(inputCols,outputCol).transform(data)