Exploring Taxi Trip Data¶
В этом посте мы будем использовать spark
для раздедовательного анализа поездак такси для данных их города Чикаго. Когда данных становится слишком много, pandas
работает медленее чем spark
, поэтому будем использовать этот инструмент для разведовательного анализа!
Введение¶
Spark over Pandas¶
pyspark
может быть быстрее pandas
благодаря использованию распределенных вычислений и оптимизации выполнения запросов. В отличие от pandas
, который работает на одной машине и обрабатывает данные в памяти, spark
может распределять задачи на кластере из сотен или тысяч узлов, что позволяет обрабатывать большие объемы данных и ускорять выполнение запросов. Кроме того, pyspark
использует оптимизацию выполнения запросов, которая позволяет минимизировать время выполнения запросов и ускорять обработку данных.
spark
имеет несколько преимуществ перед pandas
:
- Масштабируемость: Spark может обрабатывать данные в масштабах, которые Pandas не может обработать. Spark может работать с данными, которые не помещаются в память одной машины, используя распределенные вычисления.
- Распределенные вычисления: Spark может работать на кластерах из сотен или тысяч узлов, что позволяет распределять задачи и ускорять обработку данных.
- Быстродействие: Spark может обрабатывать данные быстрее, чем Pandas, благодаря оптимизации выполнения запросов и использованию распределенных вычислений.
Будем использовать python
, в этой среде можно установть spark
с помощью pip install pyspark
Цель задачи¶
В этом посте мы проведем разведовательный анализ данных; зададим себе ряд вопросов на которые мы найдем ответ в наших данных
Данные¶
В компании есть ряд таксистов которые регистриуют поедки, и если в ней работают много таксистов, то данные о поездках очень стремительно набираются. Соответсвенно для разведовательного анализа больших данных будем использовать spark
.
Данные поездок можно найти по ссылке, они в формате parquet
(бинарный формат), нам так же понадобится еще одна таблица из базы данных, ее можно найти сдесь
Этот датасет собранный на основе данных Chicago Taxi Rides 2016
Схема данны:
|--
|-- trip_start_timestamp = время начала поездки
|-- trip_end_timestamp = время окончания поездки
|-- trip_seconds = время длительности поездки в секундах
|-- trip_miles = мили проиденные во время поездки
|-- fare = транспортные расходы
|-- tips = назначенные чаевые
|-- trip_total = общая стоимость поездки (Итоговая с учетом чаевых и расходов)
|-- payment_type = тип оплаты
SparkSession¶
Начинаем spark
сессию на локальном компе:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder \
.appName('Task') \
.getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "GMT+3")
Подгружаем данные c помощью read.parquet
taxi = spark.read.parquet('taxi_data.parquet')
taxi.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
Количество Партиции¶
В spark
мы можем распараллеливать нашу коллекцию и паралельно выполнять задачи, и выполнять задачи на мощьных кластерах
Мы загрузили данные которые находяться в одном parquet
, но мы так же можем работать и с папками в которых хранятся в отдельных файлах (частями)
Чтобы показать количество действующих партиции; сдесь 2 партиции
Мы можем перераспределить количество партиции для нашей задачи
Если мы сейчас сохраним parquet
, то наш файл сохранился бы в папке частями. Воспользуемся repartition
для увеличения количество партиции, а coalesce
для уменьшения
# df.repartition(16) -> increase split
# df.rdd.getNumPartitions() = 2
# df.coalesce(1) -> decrease split (eg. assemble into one)
Разведовательный Анализ¶
Все готово, давайте теперь будем задавать себе вопросы которые нас интересуют!
Посчитайте количество загруженных строк¶
Мы загрузили данные без схемы типов, посмотрим все ли нас устраивает, если нет то мы можем добавить schema
при чтении данных
taxi.printSchema()
root
# |-- taxi_id: integer (nullable = true)
# |-- trip_start_timestamp: timestamp (nullable = true)
# |-- trip_end_timestamp: timestamp (nullable = true)
# |-- trip_seconds: integer (nullable = true)
# |-- trip_miles: double (nullable = true)
# |-- fare: double (nullable = true)
# |-- tips: double (nullable = true)
# |-- trip_total: double (nullable = true)
# |-- payment_type: string (nullable = true)
Чему равна корреляция и ковариация между длиной маршрута и ценой за поездку?¶
У dataframe
есть методы cov
и corr
, подробнее corr & cov
- Длина маршрута
trip_miles
, цена поездкиtrip_total
print(f"correlation: {round(taxi.corr('trip_miles','trip_total'),5)}")
print(f"covariance: {round(taxi.cov('trip_miles','trip_total'),5)}")
# correlation: 0.44816
# covariance: 71.96914
Найдите количество, среднее, cреднеквадратическое отклонение, минимум и максимум для длины маршрута и цены за поездку?¶
№3 Найдите количество, среднее, cреднеквадратическое отклонение, минимум и максимум для длины маршрута и цены за поездку? Подробнее describe
taxi.describe().show()
#+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------+
#|summary| taxi_id| trip_seconds| trip_miles| fare| tips| trip_total|payment_type|
#+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------+
#| count| 2539547| 2540178| 2540677| 2540672| 2540672| 2540672| 2540712|
#| mean|4370.670382946249| 801.015424509621|3.0005873828090266|13.248720862039738|1.5209281087837443|15.913560215564042| null|
#| stddev|2513.977996552665|1199.4924572375417| 5.25716922943536|22.579448541941893|2.7990862329785924|30.546699217618237| null|
#| min| 0| 0| 0.0| 0.0| 0.0| 0.0| Cash|
#| max| 8760| 86399| 900.0| 9276.62| 422.0| 9276.69| Way2ride|
#+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------+
Найдите самый непопулярный вид оплаты¶
taxi.groupBy('payment_type').count().orderBy('count', ascending=True).show()
# +------------+-------+
# |payment_type| count|
# +------------+-------+
# | Way2ride| 3|
# | Pcard| 878|
# | Prcard| 968|
# | Dispute| 1842|
# | Unknown| 5180|
# | No Charge| 12843|
# | Credit Card|1108843|
# | Cash|1410155|
# +------------+-------+
Найдите идентификатор taxi_id
таксиста выполнившего наибольшее число заказов¶
taxi.groupby('taxi_id').count().orderBy('count',ascending=False).show(5)
# +-------+-----+
# |taxi_id|count|
# +-------+-----+
# | 316| 2225|
# | 6591| 2083|
# | 5071| 2080|
# | 8740| 2067|
# | 6008| 2033|
# +-------+-----+
Чему равна средняя цена среди поездок, оплаченных наличными?¶
Подробней where
- Наличные (
Cash
) filter
orwhere
для выбора подвыборки
cash = taxi.filter(f.col('payment_type') == 'Cash').show(1)
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|fare|tips|trip_total|payment_type|
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7|31.0| 0.0| 31.0| Cash|
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
cash = taxi.filter(f.col('payment_type') == 'Cash')
taxi.groupBy(['payment_type']).agg(f.mean('trip_total')).show()
# cash.groupBy(['payment_type']).agg(f.mean('trip_total')).show()
# +------------+------------------+
# |payment_type| avg(trip_total)|
# +------------+------------------+
# | Credit Card| 20.88679166806002|
# | No Charge| 14.35603753017208|
# | Unknown|13.079387794515267|
# | Prcard|11.893749999999997|
# | Cash|12.035261470840307|
# | Dispute|14.686368078175896|
# | Way2ride| 4.793333333333333|
# | Pcard|10.450512528473803|
# +------------+------------------+
Сколько таксистов проехало больше 1000 миль за все время выполнения заказов?¶
driver_distances = taxi.groupby(['taxi_id']).agg(f.sum('trip_miles').alias('distance')).orderBy('distance',ascending=False)
driver_distances.filter(f.col('distance') > 1000).count()
# 2860
Сколько миль проехал пассажир в самой долгой поездке?¶
taxi2 = taxi.dropna()
taxi2.orderBy(['trip_seconds'],ascending=False).show(5)
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|fare|tips|trip_total|payment_type|
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
# | 4161| 2016-11-14 16:00:00|2016-11-15 16:00:00| 86399| 0.0|3.25| 0.0| 3.25| Cash|
# | 5667| 2016-11-04 21:30:00|2016-11-05 21:30:00| 86399| 0.0|3.25| 0.0| 4.75| Cash|
# | 1954| 2016-11-03 00:15:00|2016-11-04 00:15:00| 86399| 0.0|3.25| 0.0| 3.25| Cash|
# | 4219| 2016-11-08 16:00:00|2016-11-09 16:00:00| 86392| 0.0|3.25| 0.0| 3.25| Cash|
# | 4551| 2016-11-03 16:15:00|2016-11-04 16:15:00| 86389| 0.0|3.25| 0.0| 3.25| Cash|
# +-------+--------------------+-------------------+------------+----------+----+----+----------+------------+
Каков средний заработок всех таксистов?¶
Отсеките неизвестные машины (не определенный taxi_id).
# we have null taxi_id
taxi.select('taxi_id').distinct().orderBy('taxi_id').show(4)
# +-------+
# |taxi_id|
# +-------+
# | null|
# | 0|
# | 3|
# | 5|
# +-------+
# drop rows in taxi_id that are null
taxi_cleaned = taxi.dropna(how='all',subset=('taxi_id'))
taxi_cleaned.show()
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+
Удаляем все строки в taxi_id
в которых у нас есть пробелы
# mean total income of all drivers
driver_total.select('total_driver').agg(f.mean('total_driver')).show()
# +-----------------+
# |avg(total_driver)|
# +-----------------+
# |8218.856265256327|
# +-----------------+
# driver total for each taxi_id
driver_total = taxi_cleaned.groupby('taxi_id').agg(f.sum('trip_total').alias('total_driver'))
driver_total.show()
# +-------+------------------+
# |taxi_id| total_driver|
# +-------+------------------+
# | 3997|10372.800000000003|
# | 6620|13823.139999999994|
# | 4900| 9867.160000000002|
# | 7833| 8439.289999999999|
# | 1829| 9979.389999999998|
# +-------+------------------+
Сколько поездок начиналось в самый загруженный час?¶
Используйте функцию hour
# extact hour from date, like datetime in pandas
# dont' use select and then join; use withColumn('name',operation)
col_hour = taxi.withColumn('hour',f.hour('trip_start_timestamp'))
col_hour.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type|hour|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card| 23|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card| 7|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash| 16|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card| 8|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash| 18|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----+
col_hour.groupby('hour').count().orderBy('count',ascending=False).show(5)
# +----+------+
# |hour| count|
# +----+------+
# | 18|181127|
# | 19|173779|
# | 17|169886|
# | 16|156519|
# | 20|146602|
# +----+------+
Сколько поездок началось во второй четверти суток?¶
taxi_add = taxi.withColumn('start_hour',f.hour('trip_start_timestamp'))
taxi_add.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type|start_hour|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card| 23|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card| 7|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash| 16|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card| 8|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash| 18|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
after_six = taxi_add.filter((f.col('start_hour') >= 6) & (f.col('start_hour') <= 11))
after_six.count()
# df.groupBy((hour('trip_start_timestamp')).alias('day')).count()
# .where(col('day').between(6,11)).groupBy().sum().show()
# 538737
Найдите топ три даты, в которые было суммарно больше всего чаевых?¶
Вам может понадобится конвертация типов cast
from pyspark.sql.types import DateType
# extract date only from type [timestamp] -> [date]
# we use f.to_date ( f.col('column), 'order' )
taxi_add = taxi.withColumn("date",f.to_date(f.col("trip_end_timestamp"),"yyyy-MM-dd"))
taxi_add.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type| date|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card|2016-12-16|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card|2016-12-12|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash|2016-12-16|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card|2016-12-16|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash|2016-12-03|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+
# convert/case data type date to string
# taxi_add.select(taxi_add.date.cast("string").alias('date_str')).show() # don't do this
taxi_add = taxi_add.withColumn('date_str',f.col('date').cast("string"))
taxi_add.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+----------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type| date| date_str|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+----------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card|2016-12-16|2016-12-16|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card|2016-12-12|2016-12-12|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash|2016-12-16|2016-12-16|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card|2016-12-16|2016-12-16|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash|2016-12-03|2016-12-03|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+----------+----------+
# now use groupby date (which is a string) & find aggregate sum of all trips
taxi_add.groupby(['date_str']).agg(f.sum('tips').alias('total')).orderBy('total',ascending=False).show(3)
# +----------+------------------+
# | date_str| total|
# +----------+------------------+
# |2016-11-03|110102.37000000013|
# |2016-11-09|106187.87999999986|
# |2016-11-16| 99993.77000000038|
# +----------+------------------+
Сколько было заказов в дату с наибольшим спросом?¶
# we need to use starting time
taxi_add = taxi.withColumn("date",f.to_date(f.col("trip_start_timestamp"),"yyyy-MM-dd"))
taxi_add = taxi_add.withColumn('date_str',f.col('date').cast("string"))
taxi_add.groupby(['date_str']).count().orderBy('count',ascending=False).show(5)
# +----------+-----+
# | date_str|count|
# +----------+-----+
# |2016-11-03|61259|
# |2016-12-16|59137|
# |2016-12-09|58583|
# |2016-12-15|57084|
# |2016-11-04|56800|
# +----------+-----+
Подгрузите данные о марках машин из датасета taxi_cars_data.parquet
# load taxi_id car_modl type
df_car = spark.read.parquet('taxi_cars_data.parquet')
df_car.show(5)
# +-------+-------------------+
# |taxi_id| car_model|
# +-------+-------------------+
# | 1159| Toyota Prius|
# | 7273|Ford Crown Victoria|
# | 2904| Honda Civic|
# | 3210| Ford Fusion|
# | 2088| Toyota Camry|
# +-------+-------------------+
Какая марка машины самая распрастранненая среди таксистов?¶
Подробнее split
Мы можем разделить колонку по некому критерии (delimiter), добавим данные в новою колонку newest
# we are given the name and model; we need to extract the maker
# like in pandas (.str.split())
df_car.withColumn('newest',f.split(f.col('car_model')," "))
df_car.show(5)
# +-------+-------------------+--------------------+
# |taxi_id| car_model| newest|
# +-------+-------------------+--------------------+
# | 1159| Toyota Prius| [Toyota, Prius]|
# | 7273|Ford Crown Victoria|[Ford, Crown, Vic...|
# | 2904| Honda Civic| [Honda, Civic]|
# | 3210| Ford Fusion| [Ford, Fusion]|
# | 2088| Toyota Camry| [Toyota, Camry]|
# +-------+-------------------+--------------------+
# add columns with each split
df_car_add = df_car.withColumn('maker', f.split(f.col('car_model'), ' ').getItem(0)) \
.withColumn('model', f.split(f.col('car_model'), ' ').getItem(1))
df_car_add.show(5)
# +-------+-------------------+------+------+
# |taxi_id| car_model| maker| model|
# +-------+-------------------+------+------+
# | 1159| Toyota Prius|Toyota| Prius|
# | 7273|Ford Crown Victoria| Ford| Crown|
# | 2904| Honda Civic| Honda| Civic|
# | 3210| Ford Fusion| Ford|Fusion|
# | 2088| Toyota Camry|Toyota| Camry|
# +-------+-------------------+------+------+
df_car_add.groupby('maker').agg(f.count('maker').alias('total')).orderBy('total',ascending=False).show(5)
# +---------+-----+
# | maker|total|
# +---------+-----+
# | Ford| 1484|
# | Hyundai| 792|
# | Toyota| 691|
# |Chevrolet| 473|
# | Kia| 265|
# +---------+-----+
Сколько раз и какая модель машин чаще всего встречается в поездках?¶
Подробнее join
taxi_add = taxi.join(df_car_add,taxi.taxi_id == df_car_add.taxi_id)
taxi_add.show(5)
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+-------+-------------------+-------+-------+
# |taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare|tips|trip_total|payment_type|taxi_id| car_model| maker| model|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+-------+-------------------+-------+-------+
# | 5240| 2016-12-15 23:45:00|2016-12-16 00:00:00| 900| 2.5|10.75|2.45| 14.7| Credit Card| 5240| Toyota Corolla| Toyota|Corolla|
# | 1215| 2016-12-12 07:15:00|2016-12-12 07:15:00| 240| 0.4| 5.0| 3.0| 9.5| Credit Card| 1215|Ford Crown Victoria| Ford| Crown|
# | 3673| 2016-12-16 16:30:00|2016-12-16 17:00:00| 2400| 10.7| 31.0| 0.0| 31.0| Cash| 3673| Ford Taurus| Ford| Taurus|
# | 5400| 2016-12-16 08:45:00|2016-12-16 09:00:00| 300| 0.0| 5.25| 2.0| 7.25| Credit Card| 5400| Toyota Corolla| Toyota|Corolla|
# | 1257| 2016-12-03 18:45:00|2016-12-03 18:45:00| 360| 0.3| 5.0| 0.0| 5.0| Cash| 1257| Hyundai Elantra|Hyundai|Elantra|
# +-------+--------------------+-------------------+------------+----------+-----+----+----------+------------+-------+-------------------+-------+-------+
taxi_add.groupBy('car_model').agg(f.count('car_model').alias('orders')).orderBy('orders',ascending=False).show(5)
# +-------------------+------+
# | car_model|orders|
# +-------------------+------+
# |Ford Crown Victoria|388682|
# | Hyundai Accent|150764|
# | Kia Ceed|143649|
# | Hyundai Sonata|141570|
# | Ford Mondeo|135466|
# +-------------------+------+
Выводы¶
Кратко о том что мы использовали для разведовательного анализа
- Сорреляция и ковариация
.corr('column A','column B')
- Как и в pandas есть метод
.descibe()
- Группировать данные можно с
.groupBy
и сортировать.orderBy
- taxi.groupBy('payment_type').count().orderBy('count', ascending=True).show()
- Фильтрация данных (подвыборка) с
.filter
или.where
(как в SQL) - cash = taxi.filter(f.col('payment_type') == 'Cash').show(1)
- Так же можно отметить выбор колонок с
f.col('column')
- functions так же имеет функции для агрегации (eg. f.mean, f.max, f.min, f.sum)
- taxi.groupBy(['payment_type']).agg(f.mean('trip_total')).show()
- добавляем alias для новой колонке
.agg(f.sum('trip_miles').alias('name'))
- Есть taxi.dropna() как и в pandas
- taxi_cleaned = taxi.dropna(how='all',subset=('taxi_id'))
- Добавляем колонку в действующий dataframe
- taxi.withColumn('hour',f.hour('trip_start_timestamp'))
- f.hour из datetime выводит отдельно количество частов
- f.to_date из datetime выводит отдельно дату f.to_date(f.col("trip_end_timestamp"),"yyyy-MM-dd")
- Меняем тип данных в колонке с
f.cast
- taxi_add.withColumn('date_str',f.col('date').cast("string"))
- Разбиваем string с
f.split
- df_car.withColumn('newest',f.split(f.col('car_model')," ")).show(5)
join
для таблиц- df1.join(df2,df1.taxi_id == df2.taxi_id)