Creating Machine Learning Models with PySpark¶
In this post, we will introduce ourselves to pyspark
, a framework that allows us to work with big data. Similar to how we did in my first machine learning project post. We are continuing on from the previous post PySpark Titanic Preprocessing, where we did some basic data preprocessing, here we will continue on with the modeling stage of our project.
Introduction¶
We'll continue on where we left of PySpark Titanic Preprocessing. In the last post, we focused on general preprocessing data, mostly data cleaning. In this post, we'll focus on finishing off data preprocessing, transformation steps that a required before passing the data to the model.
Preprocessing Summary¶
Let's summarise our preprocessing stages that we did last post:
- We learned how to drop columns that we won't be needing at all in our preprocessing using
.drop
- We learned how to extract statistical data from our dataframe, using
.select
and functionsf.avg('column')
- We known how to fill missing data in different columns using a single value with a dictionary;
f.fillna({'column':'value'})
- We know how to add or replace a column, using
f.withColumn
df = spark.read.csv('train.csv',header=True,inferSchema=True)
df = df.drop('Ticket','Name','Cabin')
av_age = df.select(f.avg(f.col('age')))
df = df.fillna({'age':round(av_age.collect()[0][0],2)})
df = df.withColumn('Family_Size',f.col('SibSp') + f.col('Parch')) # add values from two columns
df = df.withColumn('Alone',f.lit(0)) # fill all with 0
df = df.withColumn('Alone',f.when(df['Family_size'] == 0,1).otherwise(df['Alone'])) # conditional filling
df = df.drop('any')
df.show()
# +-----------+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
# |PassengerId|Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|Family_Size|Alone|
# +-----------+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
# | 1| 0| 3| male|22.0| 1| 0| 7.25| S| 1| 0|
# | 2| 1| 1|female|38.0| 1| 0|71.2833| C| 1| 0|
# | 3| 1| 3|female|26.0| 0| 0| 7.925| S| 0| 1|
# | 4| 1| 1|female|35.0| 1| 0| 53.1| S| 1| 0|
# | 5| 0| 3| male|35.0| 0| 0| 8.05| S| 0| 1|
# +-----------+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
String Indexing¶
We have left two columns which contain categorical (string) data, with which we want to work with in our modeling process; Sex
,Embarked
. As we saw in an exploratory data analysis from a previous post, these two features do contain data distributions, which allow us to distinguish between whether a passenger survived or not, which means it probably would help a model improve its accuracy. However these features will need to be modified in order for us to use them in our model.
In sklearn
there is a method called LabelEncoder. In pyspark, there is a method called StringIndexer, which work in a similar way.
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column,outputCol=column+'_index').fit(df) for column in ['Sex','Embarked']]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
df = df.drop('PassengerId','Embarked','Sex')
df.show()
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
# |Survived|Pclass| Age|SibSp|Parch| Fare|Family_Size|Alone|Sex_index|Embarked_index|
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
# | 0| 3|22.0| 1| 0| 7.25| 1| 0| 0.0| 0.0|
# | 1| 1|38.0| 1| 0|71.2833| 1| 0| 1.0| 1.0|
# | 1| 3|26.0| 0| 0| 7.925| 0| 1| 1.0| 0.0|
# | 1| 1|35.0| 1| 0| 53.1| 1| 0| 1.0| 0.0|
# | 0| 3|35.0| 0| 0| 8.05| 0| 1| 0.0| 0.0|
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
Once we are done indexing string columns, we need to remove them!
Combine Features¶
Once we are happy with all the features that we want to utilise in our model, we need to assemble them into a single column.
To do so we need to utilise method VectorAssembler
. We need to write the names of the input feature columns we want to use inputCols
and define the output feature name outputCol
, the resulting feature will be placed in the input dataframe.
from pyspark.ml.feature import VectorAssembler
feature = VectorAssembler(inputCols=df.columns[1:],
outputCol='features')
feature_vector = feature.transform(df)
feature_vector.show()
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+--------------------+
# |Survived|Pclass| Age|SibSp|Parch| Fare|Family_Size|Alone|Sex_index|Embarked_index| features|
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+--------------------+
# | 0| 3|22.0| 1| 0| 7.25| 1| 0| 0.0| 0.0|[3.0,22.0,1.0,0.0...|
# | 1| 1|38.0| 1| 0|71.2833| 1| 0| 1.0| 1.0|[1.0,38.0,1.0,0.0...|
# | 1| 3|26.0| 0| 0| 7.925| 0| 1| 1.0| 0.0|[3.0,26.0,0.0,0.0...|
# | 1| 1|35.0| 1| 0| 53.1| 1| 0| 1.0| 0.0|[1.0,35.0,1.0,0.0...|
# | 0| 3|35.0| 0| 0| 8.05| 0| 1| 0.0| 0.0|(9,[0,1,4,6],[3.0...|
# +--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+--------------------+
Train-Test Splitting¶
Once our data is ready, we should think of a strategy to confirm the accuracy of our model. Train-Test Splitting is a common strategy to verify how well a model generalises on data it wasn't trained on. In spark
, we can reference to the dataframe itself to split it using df.randomSplit
Training & Evaluation¶
Training & evaluation of different models follow the same template of actions, the only thing that changes is we load different models from spark.ml.classification
LogisticRegression¶
The first step is to load the relevant model from .ml.classification
, in this case we start with a simplistic LogisticRegression model, which is named the same as in sklearn. Inputs into the model instance require us to specify the vectorised feature columns featuresCol
and the target variable column, labelCol
The model should be fit
on training data and saved into varaible lrModel
, which is a little different to how you would do it in sklearn
.
from pyspark.ml.classification import LogisticRegression
# initialise model
lr = LogisticRegression(labelCol='Survived',
featuresCol='features')
# returns a transformer which is our model
lrModel = lr.fit(training_data)
Variable lrModel
can then be used to make a prediction on the test set, to get its generalisation score on new data,
we can see which rows of data matches by using .select
# make prediction on test set
lr_prediction = lrModel.transform(test_data)
lr_prediction.select(['prediction','Survived']).show(5)
# +----------+--------+
# |prediction|Survived|
# +----------+--------+
# | 1.0| 0|
# | 1.0| 0|
# | 1.0| 0|
# | 1.0| 0|
# | 0.0| 0|
# +----------+--------+
Finally, having the relevant prediction, we can evaluate the overall performance of the model using MulticlassClassificationEvaluator
One nuance that may seem odd is that we opted to use multiclass, even though our problem is a binary classification problem.
The reasoning can be explained by this post, which states that MulticlassClassificationEvaluator
utilises class weighting
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',
predictionCol='prediction',
metricName='accuracy')
evaluator.evaluate(lr_prediction)
# 0.7586206896551724
DecisionTree¶
A powerful binary tree based algorith, which is used by both gradient boosting and random forest:
from pyspark.ml.classification import DecisionTreeClassifier
# initialise model
dt = DecisionTreeClassifier(labelCol='Survived',
featuresCol='features')
# returns a transformer which is our model
dtModel = dt.fit(training_data)
dt_prediction = dtModel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',
predictionCol='prediction',
metricName='accuracy')
evaluator.evaluate(dt_prediction)
# 0.7448275862068966
RandomForest¶
One ensemble approach based on randomised generation of DecisionTrees
we can try is RandomForest
, which even is named the same as in sklearn
from pyspark.ml.classification import RandomForestClassifier
# initialise model
rf = RandomForestClassifier(labelCol='Survived',
featuresCol='features')
# returns a transformer which is our model
rfModel = rf.fit(training_data)
rf_prediction = rfModel.transform(test_data)
# evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',
predictionCol='prediction',
metricName='accuracy')
evaluator.evaluate(rf_prediction)
# 0.7586206896551724
GradientBoosting¶
Another enseble method which uses DecisionTrees
is Gradient Boosting, its name varies from that of sklearn
from pyspark.ml.classification import GBTClassifier
# initialise model
gb = GBTClassifier(labelCol='Survived',
featuresCol='features')
# returns a transformer which is our model
gbModel = gb.fit(training_data)
gb_prediction = gbModel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',
predictionCol='prediction',
metricName='accuracy')
evaluator.evaluate(gb_prediction)
# 0.7517241379310344
Saving & Loading Model¶
We have tested different models and found the one which gives us the best metric, which in our case is accuracy
. To save a model we need to save model.fit
. The best performing model in our case was RandomForest, so let's save rfModel
To load the model, we need to load the relevant module from classification
; RandomForestClassificationModel
, which is different from RandomForestClassifier
, and call the method .load('folder')
from pyspark.ml.classification import RandomForestClassificationModel
RandomForestClassificationModel.load('rf_model')
# RandomForestClassificationModel: uid=RandomForestClassifier_f17b9c33fe1c, numTrees=20, numClasses=2, numFeatures=9
Summary¶
Let's review what we have covered in this post:
- We learned how to drop columns, using .drop
- We learned how to extract statistical data from our dataframe, using .select
and functions f.avg('column')
- We known how to fill missing data in different columns using a single value with a dictionary; f.fillna({'column':'value'})
- Add or replace a column, using f.withColumn
- StringIndexer(inputCol,outputCol).fit(data)
- convert categorical into a numerical representation
- Once we are done with our feature matrix, we can convert all the relevant features into a single feature that will be used as input into the model using VectorAssembler(inputCols,outputCol).transform(data)
- To split the data into a training & validation dataset, we can use the dataframe method df.randomSplit
Training a model requires identical steps for whichever model we choose:
- Import the model class from pyspark.ml.classification
- Instantiate the model by specifying labelCol
and featuresCol
- Train the model using trained_model = model.fit(data)
- Use the model to make predictions using y_pred = trained_model.transform(data)
- Once we have both a model prediction and training labels, we can make an evaluation using an evaluator MulticlassClassificationEvaluator
with evaluator.evaluate(data)
- And to finish off our modeling state, we can save our model that we will use in production by saving trained_model.save('name')
and load with the relevant XModel.load()