PySpark Time Series Pipelines¶
Today's post covers the following:
- Basic pipeline conversion of timestamp to unix time
- Lag feature combination pipelines
- Aggregation based statistics pipelines
Basic Pipeline¶
Convert timestamp data to unix time format and use it as the only feature in the VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import pyspark.sql.functions as f
spark = SparkSession\
.builder\
.getOrCreate()
# Example DataFrame with timestamp and target value
data = spark.createDataFrame([
("2025-07-21 10:00:00", 100.0),
("2025-07-21 11:00:00", 110.0),
("2025-07-21 12:00:00", 115.0),
("2025-07-21 13:00:00", 120.0)
], ["timestamp", "value"])
data.show(5)
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|2025-07-21 10:00:00|100.0|
|2025-07-21 11:00:00|110.0|
|2025-07-21 12:00:00|115.0|
|2025-07-21 13:00:00|120.0|
+-------------------+-----+
# convert timestamp to unix time & convert integer to double format
data = data.withColumn('timestamp_num',f.unix_timestamp('timestamp').cast('double'))
data.show()
+-------------------+-----+-------------+
| timestamp|value|timestamp_num|
+-------------------+-----+-------------+
|2025-07-21 10:00:00|100.0| 1.7530812E9|
|2025-07-21 11:00:00|110.0| 1.7530848E9|
|2025-07-21 12:00:00|115.0| 1.7530884E9|
|2025-07-21 13:00:00|120.0| 1.753092E9|
+-------------------+-----+-------------+
# assemble features
assembler = VectorAssembler(inputCols=['timestamp_num'],outputCol='features')
# Define Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="value")
# Build pipeline
pipeline = Pipeline(stages=[assembler, lr])
# Train model
model = pipeline.fit(data)
# Make predictions
predictions = model.transform(data)
predictions.select("timestamp", "value", "prediction").show()
+-------------------+-----+------------------+
| timestamp|value| prediction|
+-------------------+-----+------------------+
|2025-07-21 10:00:00|100.0| 101.4994629053399|
|2025-07-21 11:00:00|110.0|107.99982096813619|
|2025-07-21 12:00:00|115.0|114.50017903093249|
|2025-07-21 13:00:00|120.0|121.00053709419444|
+-------------------+-----+------------------+
Lag/Rolling Pipeline¶
Incorporating lag features into the pipeline before defining the vector assembler, such as rolling mean and maximum, as well as taking the differences between different shifts in time series using Window
windowSpec = Window.partitionBy("user").orderBy("event_time").rowsBetween(Window.unboundedPreceding, Window.currentRow)
from pyspark.sql.functions import col, lag, avg, max as smax
from pyspark.sql.window import Window
# Sample data with timestamp and target value
data = [
("2025-07-21 10:00:00", 100.0),
("2025-07-21 11:00:00", 110.0),
("2025-07-21 12:00:00", 115.0),
("2025-07-21 13:00:00", 120.0),
("2025-07-21 14:00:00", 130.0),
("2025-07-21 15:00:00", 125.0),
("2025-07-21 16:00:00", 135.0),
("2025-07-21 17:00:00", 140.0),
]
# create & convert string to timestamp
df = spark.createDataFrame(data, ["timestamp", "value"]) \
.withColumn("timestamp", col("timestamp").cast("timestamp"))
df.show(5)
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|2025-07-21 10:00:00|100.0|
|2025-07-21 11:00:00|110.0|
|2025-07-21 12:00:00|115.0|
|2025-07-21 13:00:00|120.0|
|2025-07-21 14:00:00|130.0|
+-------------------+-----+
# define a window function with orderBy ... over ( ... order by timestamp asc)
fwindow = Window.orderBy("timestamp")
# define a rolling window
rwindow = Window.orderBy("timestamp").rowsBetween(-2, 0)
df = df.withColumn('lag_1',f.lag('value',1).over(fwindow))\
.withColumn('lag_2',f.lag('value',2).over(fwindow))\
.withColumn('lag_3',f.lag('value',3).over(fwindow))
df.show()
+-------------------+-----+------------------+-------------+-----+-----+-----+
| timestamp|value| rolling_mean_3|rolling_max_3|lag_1|lag_2|lag_3|
+-------------------+-----+------------------+-------------+-----+-----+-----+
|2025-07-21 10:00:00|100.0| 100.0| 100.0| NULL| NULL| NULL|
|2025-07-21 11:00:00|110.0| 105.0| 110.0|100.0| NULL| NULL|
|2025-07-21 12:00:00|115.0|108.33333333333333| 115.0|110.0|100.0| NULL|
|2025-07-21 13:00:00|120.0| 115.0| 120.0|115.0|110.0|100.0|
|2025-07-21 14:00:00|130.0|121.66666666666667| 130.0|120.0|115.0|110.0|
|2025-07-21 15:00:00|125.0| 125.0| 130.0|130.0|120.0|115.0|
|2025-07-21 16:00:00|135.0| 130.0| 135.0|125.0|130.0|120.0|
|2025-07-21 17:00:00|140.0|133.33333333333334| 140.0|135.0|125.0|130.0|
+-------------------+-----+------------------+-------------+-----+-----+-----+
# Delta features: current value minus lag_1
df = df.withColumn("delta_1", col("value") - col("lag_1"))
df.show(5)
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
| timestamp|value| rolling_mean_3|rolling_max_3|lag_1|lag_2|lag_3|delta_1|
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
|2025-07-21 10:00:00|100.0| 100.0| 100.0| NULL| NULL| NULL| NULL|
|2025-07-21 11:00:00|110.0| 105.0| 110.0|100.0| NULL| NULL| 10.0|
|2025-07-21 12:00:00|115.0|108.33333333333333| 115.0|110.0|100.0| NULL| 5.0|
|2025-07-21 13:00:00|120.0| 115.0| 120.0|115.0|110.0|100.0| 5.0|
|2025-07-21 14:00:00|130.0|121.66666666666667| 130.0|120.0|115.0|110.0| 10.0|
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
# rolling mean and max
df = df.withColumn("rolling_mean_3", avg("value").over(rwindow)) \
.withColumn("rolling_max_3", smax("value").over(rwindow))
df.show(5)
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
| timestamp|value| rolling_mean_3|rolling_max_3|lag_1|lag_2|lag_3|delta_1|
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
|2025-07-21 10:00:00|100.0| 100.0| 100.0| NULL| NULL| NULL| NULL|
|2025-07-21 11:00:00|110.0| 105.0| 110.0|100.0| NULL| NULL| 10.0|
|2025-07-21 12:00:00|115.0|108.33333333333333| 115.0|110.0|100.0| NULL| 5.0|
|2025-07-21 13:00:00|120.0| 115.0| 120.0|115.0|110.0|100.0| 5.0|
|2025-07-21 14:00:00|130.0|121.66666666666667| 130.0|120.0|115.0|110.0| 10.0|
+-------------------+-----+------------------+-------------+-----+-----+-----+-------+
# Drop any rows with nulls introduced by lagging
df = df.na.drop()
# Assemble features into a vector for ML regression
assembler = VectorAssembler(
inputCols=["lag_1", "lag_2", "lag_3", "delta_1", "rolling_mean_3", "rolling_max_3"],
outputCol="features"
)
# Assemble features into a vector for ML regression
assembler = VectorAssembler(
inputCols=["lag_1", "lag_2", "lag_3", "delta_1", "rolling_mean_3", "rolling_max_3"],
outputCol="features"
)
# Use linear regression model
lr = LinearRegression(featuresCol="features", labelCol="value")
# Build pipeline with assembler and model
pipeline = Pipeline(stages=[assembler, lr])
# Fit the pipeline model
model = pipeline.fit(df)
# Show predictions on training data
predictions = model.transform(df)
predictions.select("timestamp", "value", "prediction").show(truncate=False)
+-------------------+-----+------------------+
|timestamp |value|prediction |
+-------------------+-----+------------------+
|2025-07-21 13:00:00|120.0|120.0000001508051 |
|2025-07-21 14:00:00|130.0|130.00000023988125|
|2025-07-21 15:00:00|125.0|124.9999995468485 |
|2025-07-21 16:00:00|135.0|134.99999985059745|
|2025-07-21 17:00:00|140.0|140.00000021186773|
+-------------------+-----+------------------+
Window Aggregation Group Statistics¶
Another approach is to instead ot pyspark.sql.Window
utilise pyspark.sql.functions
's f.window,
which allows us to use time interval aggregation statistics instead of just time shifting stats
f.window("column", "interval")
from pyspark.sql.functions import window, avg
# Sample data with timestamp and target value
data = [
("2025-07-21 10:00:00", 100.0),
("2025-07-21 11:00:00", 110.0),
("2025-07-21 12:00:00", 115.0),
("2025-07-21 13:00:00", 120.0),
("2025-07-21 14:00:00", 130.0),
("2025-07-21 15:00:00", 125.0),
("2025-07-21 16:00:00", 135.0),
("2025-07-21 17:00:00", 140.0),
]
# create & convert string to timestamp
df = spark.createDataFrame(data, ["timestamp", "value"]) \
.withColumn("timestamp", col("timestamp").cast("timestamp"))
# Windowed aggregation (e.g., 30-minute average) [interval! ]
agg_df = df.groupBy(window("timestamp", "30 minutes")).agg(avg("value").alias("avg_value"))
# Feature assembler for ML
assembler = VectorAssembler(inputCols=["avg_value"], outputCol="features")
# Linear regression model for forecasting
lr = LinearRegression(featuresCol="features", labelCol="avg_value")
# Pipeline assembling
pipeline = Pipeline(stages=[assembler, lr])
# Fit the model
model = pipeline.fit(agg_df)
# Predict and show results
predictions = model.transform(agg_df)
predictions.show(truncate=False)
+------------------------------------------+---------+--------+------------------+
|window |avg_value|features|prediction |
+------------------------------------------+---------+--------+------------------+
|{2025-07-21 10:00:00, 2025-07-21 10:30:00}|100.0 |[100.0] |99.99999999999962 |
|{2025-07-21 11:00:00, 2025-07-21 11:30:00}|110.0 |[110.0] |109.9999999999998 |
|{2025-07-21 12:00:00, 2025-07-21 12:30:00}|115.0 |[115.0] |114.99999999999989|
|{2025-07-21 13:00:00, 2025-07-21 13:30:00}|120.0 |[120.0] |119.99999999999997|
|{2025-07-21 14:00:00, 2025-07-21 14:30:00}|130.0 |[130.0] |130.00000000000014|
|{2025-07-21 15:00:00, 2025-07-21 15:30:00}|125.0 |[125.0] |125.00000000000006|
|{2025-07-21 16:00:00, 2025-07-21 16:30:00}|135.0 |[135.0] |135.00000000000023|
|{2025-07-21 17:00:00, 2025-07-21 17:30:00}|140.0 |[140.0] |140.0000000000003 |
+------------------------------------------+---------+--------+------------------+
Thank you for reading!
Any questions or comments about the posts below can be addressed to the mldsai-info channel or to me directly shtrauss2, on shtrausslearning or shtrausslearning