PySpark Daily December Summary I¶
Something I decided would be fun to do on a daily basis; write pyspark code everyday and post about it, this is mainly because I don't use it as often as I would like, so this is my motivation. If you too want to join in, just fork the notebook (on Kaggle) and practice various pyspark codings everyday! Visit my telegram channel if you have any questions or just post them here!
Here I will be posting summaries that cover roughtly 10 days worth of posts that I make on Kaggle, so that would equate to three posts a month
30/11/2023 ❯❯❯ Setting data types via custom schema¶
Today's post is about schemes. PySpark tends to mimic a lot of SQL database aspects. Its standard practice to define a table scheme for our dataframe when either creating a dataframe or reading files
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Create a Spark session
spark = SparkSession.builder.appName("schema").getOrCreate()
# Define the schema using StructType and StructField
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("salary", FloatType(), True)
])
# Create a DataFrame with the defined schema
data = [("Alice", 28, 100000.0),
("Bob", 35, 120000.0)]
df = spark.createDataFrame(data, schema)
# Show the DataFrame with the defined schema
df.show()
# +-----+---+--------+
# | name|age| salary|
# +-----+---+--------+
# |Alice| 28|100000.0|
# | Bob| 35|120000.0|
# +-----+---+--------+
01/12/2023 ❯❯❯ Creating Table View for Spark DataFrame¶
Using SQL requests via spark.sql, you can work with the data the same way you would when working with databases, this is convenient for people who are more used to SQL based notation when working with data
# Create a Spark session
spark = SparkSession.builder.appName("schema").getOrCreate()
# Create a Pyspark DataFrame from a list of tuples
data = [
("2020-01-01", 10),
("2020-01-02", 20),
("2020-01-03", 30),
("2020-01-04", 40),
("2020-01-05", 50)
]
df = spark.createDataFrame(data, ["date", "value"])
# Register the DataFrame as a temporary table
df.createOrReplaceTempView("date_table")
# lets preview our table
spark.sql('select * from date_table').show()
# +----------+-----+
# | date|value|
# +----------+-----+
# |2020-01-01| 10|
# |2020-01-02| 20|
# |2020-01-03| 30|
# |2020-01-04| 40|
# |2020-01-05| 50|
# +----------+-----+
# Perform the rolling mean calculation using SQL notation
request = """
SELECT date,
value,
AVG(value) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS rolling_mean
FROM date_table
ORDER BY date
"""
result = spark.sql(request)
result.show()
# +----------+-----+------------+
# | date|value|rolling_mean|
# +----------+-----+------------+
# |2020-01-01| 10| 15.0|
# |2020-01-02| 20| 20.0|
# |2020-01-03| 30| 30.0|
# |2020-01-04| 40| 40.0|
# |2020-01-05| 50| 45.0|
# +----------+-----+------------+
- If you wanted to replicate the same request using pyspark functions, you'd need to know what functionality to import
- For this problem we need to import from sql.functions and sql.window, so SQL notation is definitely convenient
- This is a big positive for pyspark, because you can do data analysis using big data without needing to know the library component imports (such as pyspark.sql.window import Window)
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
# Assuming you have a DataFrame called 'df' with columns 'value' and 'timestamp'
windowSpec = Window.orderBy("date").rowsBetween(-1, 1)
rollingMean = avg(df["value"]).over(windowSpec)
result = df.select(df["date"], df["value"], rollingMean.alias("rolling_mean"))
result.show()
# +----------+-----+------------+
# | date|value|rolling_mean|
# +----------+-----+------------+
# |2020-01-01| 10| 15.0|
# |2020-01-02| 20| 20.0|
# |2020-01-03| 30| 30.0|
# |2020-01-04| 40| 40.0|
# |2020-01-05| 50| 45.0|
# +----------+-----+------------+
02/12/2023 ❯❯❯ Reading Simple CSV files¶
As mentioned before, PySpark assigns StringType to each column when reading csv file. Having played with read.csv and pandas' read_csv, I definitely would say that pandas offers much more options when importing. I tend to actually prefer to read the data with default settings and make adjustments after import.
spark = SparkSession.builder.getOrCreate()
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv') # defaults to string types
# DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string]
Some useful things to note when reading csv file:
- If your data contains a header; set header=True
- If you want to automatically determine column types and set them; set inferSchema=True
- To add an option to
.csv
, add it before.csv
by using.option
, we can set different settings for reading csv files here - Set the delimiter, eg. via
.option('delimiter',';')
if you data is separated by ';'
# a header is present in the data
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True).show(1)
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# |_c0| transaction_id| timestamp| product_id|category|customer_type|unit_price|quantity|total|payment_type|
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# | 0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...| fruit| gold| 3.99| 2| 7.98| e-wallet|
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# automatically assign data types to columns
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True,inferSchema=True)
# slightly different format, specify the delimiter that splits columns
spark.read.option('delimiter',',')\
.option('header',True)\
.option('inferSchema',True)\
.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv').show(5)
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# |_c0| transaction_id| timestamp| product_id|category|customer_type|unit_price|quantity|total|payment_type|
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# | 0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...| fruit| gold| 3.99| 2| 7.98| e-wallet|
# | 1|931ad550-09e8-4da...|2022-03-06 10:33:59|ad81b46c-bf38-41c...| fruit| standard| 3.99| 1| 3.99| e-wallet|
# | 2|ae133534-6f61-4cd...|2022-03-04 17:20:21|7c55cbd4-f306-4c0...| fruit| premium| 0.19| 2| 0.38| e-wallet|
# | 3|157cebd9-aaf0-475...|2022-03-02 17:23:58|80da8348-1707-403...| fruit| gold| 0.19| 4| 0.76| e-wallet|
# | 4|a81a6cd3-5e0c-44a...|2022-03-05 14:32:43|7f5e86e6-f06f-45f...| fruit| basic| 4.49| 2| 8.98| debit card|
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# only showing top 5 rows
# limit the number of loaded rows of data
spark.read.option('delimiter',',')\
.option('header',True)\
.option('inferSchema',True)\
.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv')\
.limit(10).show()
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# |_c0| transaction_id| timestamp| product_id|category|customer_type|unit_price|quantity|total|payment_type|
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
# | 0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...| fruit| gold| 3.99| 2| 7.98| e-wallet|
# | 1|931ad550-09e8-4da...|2022-03-06 10:33:59|ad81b46c-bf38-41c...| fruit| standard| 3.99| 1| 3.99| e-wallet|
# | 2|ae133534-6f61-4cd...|2022-03-04 17:20:21|7c55cbd4-f306-4c0...| fruit|
# ...
# +---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
from pyspark.sql.types import DateType, StringType, FloatType, IntegerType, TimestampType
# Define the schema using StructType and StructField
schema = StructType([
StructField("_c0", IntegerType(), True), # as the data has a , at the start of each row
StructField("transaction_id", StringType(), True),
StructField("timestamp", DateType(), True), # read the column as a DateType, not TimestampType
StructField("product_id", StringType(), True),
StructField("category", StringType(), True),
StructField("customer_type", StringType(), True),
StructField("unit_price", FloatType(), True),
StructField("quantity", IntegerType(), True),
StructField("total", FloatType(), True),
StructField("payment_type", StringType(), True)
])
df = spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True,inferSchema=False,schema=schema)
df.show()
# +---+--------------------+----------+--------------------+--------+-------------+----------+--------+-----+------------+
# |_c0| transaction_id| timestamp| product_id|category|customer_type|unit_price|quantity|total|payment_type|
# +---+--------------------+----------+--------------------+--------+-------------+----------+--------+-----+------------+
# | 0|a1c82654-c52c-45b...|2022-03-02|3bc6c1ea-0198-46d...| fruit| gold| 3.99| 2| 7.98| e-wallet|
# | 1|931ad550-09e8-4da...|2022-03-06|ad81b46c-bf38-41c...| fruit| standard| 3.99| 1| 3.99| e-wallet|
# | 2|ae133534-6f61-4cd...|2022-03-04|7c55cbd4-f306-4c0...| fruit| premium| 0.19| 2| 0.38| e-wallet|
# ...
# +---+--------------------+----------+--------------------+--------+-------------+----------+--------+-----+------------+
03/12/2023 ❯❯❯ Knowing your PySpark Types¶
To set StructFields and define a type, we should know which types are available to us in pyspark
- StringType: Represents string values.
- IntegerType: Represents integer values.
- LongType: Represents long integer values.
- FloatType: Represents float values.
- DoubleType: Represents double values.
- BooleanType: Represents boolean values.
- DateType: Represents date values.
- TimestampType: Represents timestamp values.
- ArrayType: Represents arrays of elements with a specific data type.
- MapType: Represents key-value pairs with specific data types for keys and values.
- StructType: Represents a structure or record with multiple fields.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import MapType, StringType
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame with a column of MapType
data = [(1, {"name": "John", "age": "30"}),
(2, {"name": "Jane", "age": "25"})]
df = spark.createDataFrame(data, ["id", "info"])
# DataFrame[id: bigint, info: map<string,string>]
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [("Alice", ["apple", "banana", "orange"]),
("Bob", ["grape", "kiwi"]),
("Charlie", ["watermelon"])]
# Define the schema with ArrayType
spark.createDataFrame(data, ["name", "fruits"])
# DataFrame[name: string, fruits: array<string>]
04/12/2023 ❯❯❯ Timestamp Zone Consideration¶
If your column is of type datetime (TimestampType), here's how you can use it with different timezones, so you can make the necessary adjustments if needed
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame with a timestamp column (UTC time)
data = [("2022-01-01 12:00:00",)]
df = spark.createDataFrame(data, ["timestamp"])
# if timestamp is UTC
df_tz = df.withColumn("timestamp_with_ny", from_utc_timestamp(df.timestamp, "America/New_York"))
df_tz = df_tz.withColumn("timestamp_with_moscow", from_utc_timestamp(df.timestamp, "Europe/Moscow"))
df_tz.show()
# if timestamp is local
df_utc = df_tz.withColumn("timestamp_utc_ny", to_utc_timestamp(df_tz.timestamp_with_ny, "America/New_York"))
df_utc = df_tz.withColumn("timestamp_utc_moscow", to_utc_timestamp(df_tz.timestamp_with_moscow, "Europe/Moscow"))
df_utc.show()
# +-------------------+-------------------+---------------------+
# | timestamp| timestamp_with_ny|timestamp_with_moscow|
# +-------------------+-------------------+---------------------+
# |2022-01-01 12:00:00|2022-01-01 07:00:00| 2022-01-01 15:00:00|
# +-------------------+-------------------+---------------------+
# +-------------------+-------------------+---------------------+--------------------+
# | timestamp| timestamp_with_ny|timestamp_with_moscow|timestamp_utc_moscow|
# +-------------------+-------------------+---------------------+--------------------+
# |2022-01-01 12:00:00|2022-01-01 07:00:00| 2022-01-01 15:00:00| 2022-01-01 12:00:00|
# +-------------------+-------------------+---------------------+--------------------+
05/12/2023 ❯❯❯ Making a linear model¶
The process of creating models differs a little bit to how one would go about it in sklearn. Once we have a dataframe that contains all our features & target variable df
, we need to assemble them into a vectorised format using VectorAssembler, to do so we need to define the inputCols and outputCol (which will assemble all our input feature data). Loading the relevant model from the library pyspark.ml, we then need to define inputCols (which is the ouput column of the VectorAssembler) and outputCol arguments
Initialised the model (LinearRegression), we call the method fit and define it as a variable (which is different to sklearn). To use the model for prediction, we need to transform the new data into the same vectorised format using the assembler to create new_data, and use model.transform(new_data) to make the prediction
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
# Create a SparkSession
spark = SparkSession.builder.appName("LinearRegressionExample").getOrCreate()
# Sample dataset (two features & target variable)
data = [(1, 2, 3),
(2, 4, 6),
(3, 6, 9),
(4, 8,12),
(5,10,15)]
df = spark.createDataFrame(data, ["feature1", "feature2", "target"])
# Prepare the data for modeling
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df = assembler.transform(df)
df
# Create and fit the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="target")
model = lr.fit(df)
# Make predictions on new data
new_data = spark.createDataFrame([(6, 12)], ["feature1", "feature2"])
new_data = assembler.transform(new_data)
predictions = model.transform(new_data)
predictions.show()
# +--------+--------+----------+----------+
# |feature1|feature2| features|prediction|
# +--------+--------+----------+----------+
# | 6| 12|[6.0,12.0]| 18.0|
# +--------+--------+----------+----------+
06/12/2023 ❯❯❯ Filter rows that contain item in array column¶
PySpark contains a special function array_contains which allows you to check if a specified value exists in an array column. It returns a boolean value indicating whether the array contains the specified value
from pyspark.sql import SparkSession
from pyspark.sql.functions import array_contains
# Create a SparkSession
spark = SparkSession.builder.appName("filter_rows").getOrCreate()
# Example of ArrayType
data = [("Alice", ["apple", "banana", "orange"]),
("Bob", ["grape", "kiwi"]),
("Charlie", ["watermelon"])]
# Define the schema with ArrayType
df = spark.createDataFrame(data, ["name", "fruits"])
# DataFrame[name: string, fruits: array<string>]
# Filter rows where the array column contains a specific element
filtered_df = df.where(array_contains(df.fruits, "orange"))
# Show the filtered DataFrame
filtered_df.show()
# +-----+--------------------+
# | name| fruits|
# +-----+--------------------+
# |Alice|[apple, banana, o...|
# +-----+--------------------+
We can add a new column and confirm what array_contains does
test = df.withColumn('contains',array_contains(df.fruits, "orange")).show()
# +-------+--------------------+--------+
# | name| fruits|contains|
# +-------+--------------------+--------+
# | Alice|[apple, banana, o...| true|
# | Bob| [grape, kiwi]| false|
# |Charlie| [watermelon]| false|
# +-------+--------------------+--------+
07/12/2023 ❯❯❯ SQL like functions (SELECT)¶
Select columns from PySpark DataFrame, similar to SELECT in SQL
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "San Francisco")]
df = spark.createDataFrame(data, ["Name", "Age", "City"])
df.show()
# +-------+---+-------------+
# | Name|Age| City|
# +-------+---+-------------+
# | Alice| 25| New York|
# | Bob| 30| Los Angeles|
# |Charlie| 35|San Francisco|
# +-------+---+-------------+
# Select specific columns from the DataFrame
selected_df = df.select("Name", "City")
selected_df.show()
# +-------+-------------+
# | Name| City|
# +-------+-------------+
# | Alice| New York|
# | Bob| Los Angeles|
# |Charlie|San Francisco|
# +-------+-------------+
08/12/2023 ❯❯❯ SQL like functions (WHERE)¶
Filter rows in PySpark DataFrame, similar to WHERE in SQL
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "San Francisco")]
df = spark.createDataFrame(data, ["Name", "Age", "City"])
df.show()
# +-------+---+-------------+
# | Name|Age| City|
# +-------+---+-------------+
# | Alice| 25| New York|
# | Bob| 30| Los Angeles|
# |Charlie| 35|San Francisco|
# +-------+---+-------------+
from pyspark.sql import functions as f
# Filter rows based on a condition (any of the following notations)
# filtered_df = df.filter(df.Age > 30)
# filtered_df = df.filter(df['Age'] > 30)
# filtered_df = df.filter(f.col('Age') > 30)
filtered_df = df.filter((f.col('Age') > 30) | (df.Age == 'Charlie'))
filtered_df.show()
# +-------+---+-------------+
# | Name|Age| City|
# +-------+---+-------------+
# |Charlie| 35|San Francisco|
# +-------+---+-------------+
09/12/2023 ❯❯❯ SQL like functions (GROUP BY)¶
Simple single column based group by operations with agg functionality options
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Alice", 25, "New York"),
("Bob", 30, "London"),
("Charlie", 35, "New York"),
("Dave", 40, "London")]
df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()
# +-------+---+--------+
# | name|age| city|
# +-------+---+--------+
# | Alice| 25|New York|
# | Bob| 30| London|
# |Charlie| 35|New York|
# | Dave| 40| London|
# +-------+---+--------+
from pyspark.sql.functions import avg, count, expr
# Group the DataFrame by the 'city' column
grouped_df = df.groupBy("city")
# Perform aggregation on the grouped DataFrame
# result = grouped_df.agg({"age": "avg"}) # dictionary format (single)
# result = grouped_df.agg({"age": "avg", "name": "count"}) # dictionary format (multiple)
# result = grouped_df.agg(avg(df.age), count(df.name)) # column object format
# result = grouped_df.agg(expr("avg(age)"), expr("count(name)")) # sql expression format
# giving alias
result = grouped_df.agg(avg(df.age).alias("average_age"),
count(df.name).alias("name_count"))
# Show the result
result.show()
# +--------+-----------+----------+
# | city|average_age|name_count|
# +--------+-----------+----------+
# |New York| 30.0| 2|
# | London| 35.0| 2|
# +--------+-----------+----------+
Summary | PySpark Daily December I¶
Lets try to summarise everything important from these posts
- Reading a CSV file using spark.read.options(X).csv('data.csv')
- Define a custom schema with spark.read.csv('data.csv',schema=schema)
- Scheme format: schema = StructType([StructField("name",Type, True)])
-
Some important Types, which can be imported from pyspark.sql.types
- StringType: Represents string values
- IntegerType: Represents integer values
- FloatType: Represents float values
- BooleanType: Represents boolean values
- DateType: Represents date values.
- TimestampType: Represents timestamp values.
- StructType: Represents a structure or record with multiple fields
-
Automatically define column types using spark.read.csv('data.csv',inferSchema=True)
- Create SQL table view using spark.createOrReplaceTempView('name')
- Interact with data using SQL via spark.sql(query)
- Selecting columns in dataframe df.select(X), works with aggregations like in SQL (see window functions)
- Filtering dataframe based on column condition df.where(X) | df.filter(X)
- Adding a new column: df.withColumn('name',X)
- Renaming columns: df.withColumnRenamed('A','B')
- Rearranging index index, same as pandas notation df.agg({"col": "aggregation"})
Training models:
-
Create a vectorised assembly of features
- assembler = VectorAssembler(inputCols=[columns], outputCol="output")
- df = assembler.transform(df)
-
Train Model
- lr = LinearRegression(featuresCol="output", labelCol="target")
- model = lr.fit(df)
-
Use model for prediction
- new_data = assembler.transform(new_data)
- predictions = model.transform(new_data)
Window Functions:
- Import from pyspark.sql.window import Window
- Define a window Window.rowsBetween(-1, 1)
- Aggregation over a window rollingMean = avg(data).over(windowSpec)
- Use aggregation with select: df.select(df["date"], df["value"], rollingMean) or withColumn etc
Thank you for reading!
Any questions or comments about the above post can be addressed on the mldsai-info channel or to me directly shtrauss2, on shtrausslearning or shtrausslearning or simply below!