PySpark Pipelines¶
Todays post covers the following:
- Missing data treatment classification pipeline
- Feature scaling using ScandardScaler classification pipeline
- TF-IDF corpus classification pipeline
- PCA dimensionality reduction classification pipeline
Missing Data Treatment Pipeline¶
Simple data imputation treatment pipeline in pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Sample data with missing values (None or null)
data = [
(1, "red", 10.0, None, 0),
(2, "blue", None, 20.0, 1),
(3, "green", 30.0, 30.0, 0),
(4, None, 40.0, 40.0, 1),
(5, "blue", 50.0, 50.0, 0)
]
df = spark.createDataFrame(data, ["id", "color", "feature1", "feature2", "label"])
# Step 1: Impute missing numeric values with mean
imputer = Imputer(
inputCols=["feature1", "feature2"],
outputCols=["feature1_imputed", "feature2_imputed"]
).setStrategy("mean")
# Step 2: Handle missing categorical values by filling with a placeholder
df = df.fillna({"color": "missing"})
# Step 3: StringIndexer for categorical column
indexer = StringIndexer(inputCol="color", outputCol="color_index")
# Step 4: OneHotEncoder for categorical feature
encoder = OneHotEncoder(inputCols=["color_index"], outputCols=["color_ohe"])
# Step 5: Assemble all features into a single vector
assembler = VectorAssembler(
inputCols=["color_ohe", "feature1_imputed", "feature2_imputed"],
outputCol="features"
)
# Step 6: Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
# Step 7: Logistic Regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")
# Build pipeline with all stages
pipeline = Pipeline(stages=[imputer, indexer, encoder, assembler, scaler, lr])
# Fit pipeline
model = pipeline.fit(df)
predictions = model.transform(df)
# Show results
predictions.select("id", "color", "feature1", "feature2", "label", "prediction", "probability").show(truncate=True)
+---+-------+--------+--------+-----+----------+--------------------+
| id| color|feature1|feature2|label|prediction| probability|
+---+-------+--------+--------+-----+----------+--------------------+
| 1| red| 10.0| NULL| 0| 0.0|[0.99999999167350...|
| 2| blue| NULL| 20.0| 1| 1.0|[1.11267794732560...|
| 3| green| 30.0| 30.0| 0| 0.0|[0.99999999391649...|
| 4|missing| 40.0| 40.0| 1| 1.0|[8.02598294965153...|
| 5| blue| 50.0| 50.0| 0| 0.0|[0.99999999102672...|
+---+-------+--------+--------+-----+----------+--------------------+
Feature Scaling Pipline¶
Once we have our assembler, we can scale our features
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Sample data with features and label
data = [
(1, 25.0, 50000.0, 0),
(2, 35.0, 60000.0, 1),
(3, 45.0, 70000.0, 0),
(4, 20.0, 40000.0, 1)
]
df = spark.createDataFrame(data, ["id", "age", "income", "label"])
# Step 1: Assemble features into a vector column
assembler = VectorAssembler(inputCols=["age", "income"],
outputCol="features")
# Step 2: StandardScaler to standardize features
scaler = StandardScaler(inputCol="features",
outputCol="scaled_features",
withMean=True, withStd=True)
# Step 3: Logistic Regression using scaled features
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")
# Create pipeline with stages
pipeline = Pipeline(stages=[assembler,
scaler,
lr])
# Fit pipeline on training data & predict
model = pipeline.fit(df)
predictions = model.transform(df)
# Show predictions
predictions.select("id", "label", "prediction", "scaled_features").show()
+---+-----+----------+--------------------+
| id|label|prediction| scaled_features|
+---+-----+----------+--------------------+
| 1| 0| 0.0|[-0.5637345210021...|
| 2| 1| 0.0|[0.33824071260127...|
| 3| 0| 0.0|[1.24021594620466...|
| 4| 1| 1.0|[-1.0147221378038...|
+---+-----+----------+--------------------+
TF-IDF Pipline¶
TF-IDF machine learning pipeline using pyspark
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.pipeline import Pipeline
# Sample data: (id, text, label)
data = [
(0, "spark is great for big data processing", "positive"),
(1, "hadoop is an old technology", "negative"),
(2, "spark and hadoop are big data tools", "positive"),
(3, "I dislike slow processing", "negative")
]
df = spark.createDataFrame(data, ["id", "text", "label"])
# Step 1: Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# Step 2: Remove stopwords (optional but recommended)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# Step 3: TF feature extraction
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
# Step 4: IDF to get TF-IDF features
idf = IDF(inputCol="rawFeatures", outputCol="features")
# Step 5: Convert string labels to numeric indices
labelIndexer = StringIndexer(inputCol="label", outputCol="labelIndex")
# Step 6: Define classifier (e.g., Logistic Regression)
lr = LogisticRegression(featuresCol="features", labelCol="labelIndex")
# Build pipeline
pipeline = Pipeline(stages=[tokenizer,
remover,
hashingTF,
idf,
labelIndexer,
lr])
# Train model
model = pipeline.fit(df)
# Predict on training data (or new data)
predictions = model.transform(df)
predictions.select("text", "label", "prediction").show()
+--------------------+--------+----------+
| text| label|prediction|
+--------------------+--------+----------+
|spark is great fo...|positive| 1.0|
|hadoop is an old ...|negative| 0.0|
|spark and hadoop ...|positive| 1.0|
|I dislike slow pr...|negative| 0.0|
+--------------------+--------+----------+
PCA Pipline¶
Dimensionality reduction machine learning classification pipeline example using pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, PCA, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Sample data loading (replace with actual data loading)
data = [
(7, 1.1, 0.3, 1.5, 0),
(8, 2.4, 1.7, 0.3, 1),
(9, 0.3, 2.5, 2.3, 0),
(10, 1.3, 0.4, 1.7, 1),
(11, 2.5, 1.8, 0.4, 0),
(12, 0.4, 2.6, 2.4, 1),
(13, 1.4, 0.5, 1.6, 0),
(14, 2.6, 1.6, 0.5, 1),
(15, 0.5, 2.7, 2.5, 0),
(16, 1.6, 0.6, 1.4, 1),
(17, 2.7, 1.5, 0.6, 0),
(18, 0.6, 2.8, 2.6, 1),
(19, 1.7, 0.7, 1.3, 0),
(20, 2.8, 1.4, 0.7, 1)
]
df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])
# Assemble features into a vector
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="raw_features")
# Optional: Scale features
scaler = StandardScaler(inputCol="raw_features", outputCol="features")
# PCA to reduce dimensionality
pca = PCA(k=2,
inputCol="features",
outputCol="pca_features")
# Use PCA features for classification
lr = LogisticRegression(featuresCol="pca_features", labelCol="label")
# Create pipeline
pipeline = Pipeline(stages=[assembler,
scaler,
pca,
lr])
# Train-test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Fit pipeline
model = pipeline.fit(train_df)
# Predict
predictions = model.transform(test_df)
# Evaluate
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")