Intro
Okay, I’m starting to get the hang of Spark. I’ve totally abused the Spark SQL capabilities so far, and it’s been extremely user friendly doing all this in Jupyter. Honestly, it’s not that large of a data set so I haven’t had too much complexity navigating the fields so far. There are not many features to build, but going through the date and time section, I could think of a few there that we should pull out for deeper analysis.
Let’s set up our environment and load the data as we had it last time again:
import os
os.system("sudo pip install findspark sql_magic")
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')
# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession
# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf, count, isnan, lit, sum, coalesce, concat, to_date, to_timestamp, when, date_format, unix_timestamp
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd
# Initiate SparkSession as "spark"
spark = SparkSession\
.builder\
.getOrCreate()
# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
"s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv",
header = True,
inferSchema = True
)
oldColumns = df.schema.names
newColumns = [
'COMPLAINT_NUMBER',
'COMPLAINT_START_DATE',
'COMPLAINT_START_TIME',
'COMPLAINT_END_DATE',
'COMPLAINT_END_TIME',
'REPORTED_DATE',
'OFFENSE_ID',
'OFFENSE_DESCRIPTION',
'OFFENSE_INTERNAL_CODE',
'OFFENSE_INTERNAL_DESCRIPTION',
'OFFENSE_RESULT',
'OFFENSE_LEVEL',
'JURISDICTION',
'BOROUGH',
'PRECINCT',
'SPECIFIC_LOCATION',
'PREMISE_DESCRIPTION',
'PARK_NAME',
'HOUSING_NAME',
'X_COORD_NYC',
'Y_COORD_NYC',
'LAT',
'LON',
'LAT_LON'
]
df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()
# Drop rows with any NA values in the specified columns
df_na_drop = df.na.drop(subset=[
'COMPLAINT_START_DATE',
'COMPLAINT_START_TIME',
'OFFENSE_DESCRIPTION',
'OFFENSE_RESULT',
'BOROUGH',
'PRECINCT',
'LAT',
'LON'
])
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_DATE', coalesce(df_na_drop['COMPLAINT_END_DATE'], df_na_drop['COMPLAINT_START_DATE']))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', coalesce(df_na_drop['COMPLAINT_END_TIME'], df_na_drop['COMPLAINT_START_TIME']))
# Combine date and time fields and create new timestamp field for COMPLAINT fields
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_START_TIMESTAMP',
to_timestamp(
concat(df_na_drop['COMPLAINT_START_DATE'], lit(' '), df_na_drop['COMPLAINT_START_TIME']),
'MM/dd/yyyy HH:mm:ss'
)
)
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_END_TIMESTAMP',
to_timestamp(
concat(df_na_drop['COMPLAINT_END_DATE'], lit(' '), df_na_drop['COMPLAINT_END_TIME']),
'MM/dd/yyyy HH:mm:ss'
)
)
# Convert REPORTED_DATE
df_na_drop = df_na_drop.withColumn(
'REPORTED_DATE_TIMESTAMP',
to_timestamp(
df_na_drop['REPORTED_DATE'],
'MM/dd/yyyy'
)
)
# List of crimes to keep
crimes_to_keep = [
'PETIT LARCENY',
'HARRASSMENT 2',
'ASSAULT 3 & RELATED OFFENSES',
'CRIMINAL MISCHIEF & RELATED OF',
'GRAND LARCENY',
'OFF. AGNST PUB ORD SENSBLTY &',
'DANGEROUS DRUGS',
'ROBBERY',
'BURGLUARY',
'FELONY ASSAULT'
]
# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
'OFFENSE_DESCRIPTION',
when(df_na_drop['OFFENSE_DESCRIPTION'].isin(crimes_to_keep), df_na_drop['OFFENSE_DESCRIPTION']).otherwise('OTHER')
)
# List of premises to keep
premises_to_keep = [
'STREET',
'RESIDENCE - APT. HOUSE',
'RESIDENCE-HOUSE',
'RESIDENCE - PUBLIC HOUSING',
'COMMERCIAL BUILDING',
'DEPARTMENT STORE',
'TRANSIT - NYC SUBWAY',
'CHAIN STORE',
'PUBLIC SCHOOL',
'GROCERY/BODEGA',
'RESTAURANT/DINER',
'BAR/NIGHT CLUB',
'PARK/PLAYGROUND'
]
# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
'PREMISE_DESCRIPTION',
when(df_na_drop['PREMISE_DESCRIPTION'].isin(premises_to_keep), df_na_drop['PREMISE_DESCRIPTION']).otherwise('OTHER')
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT * FROM df_na_drop LIMIT 10;
Timestamp Columns
Remember, in the last post, we cleaned up the COMPLAINT_START, COMPLAINT_END, and REPORTED_DATE columns. Through these 3 fields, alone, I can probably think of 5 or 6 features off the top of my head that we should build:
- Year, Month, Day, Day Of Week, and Hour of each of these dates (no hour for REPORTED_DATE)
- Flag of whether incident spanned any amount of time (vs a one-time incident where COMPLAINT_START = COMPLAINT_END)
- Length of the incident, if not one-time
- How far after the incident started and ended was the event reported
That will actually give us 18 new features haha… I’ll take it.
Date / Time Fields For COMPLAINT_START
# Set UDFs to extract specific parts of date and time
extract_year = udf(lambda x: x.year)
extract_month = udf(lambda x: x.month)
extract_day = udf(lambda x: x.day)
extract_hour = udf(lambda x: x.hour)
# Perform transformation
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_DAY', extract_day(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_START_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_DAY', extract_day(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_END_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_YEAR', extract_year(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_MONTH', extract_month(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_DAY', extract_day(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_WEEKDAY', date_format(col('REPORTED_DATE_TIMESTAMP'), 'E'))
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
COMPLAINT_START_TIMESTAMP,
COMPLAINT_START_TIMESTAMP_YEAR,
COMPLAINT_START_TIMESTAMP_MONTH,
COMPLAINT_START_TIMESTAMP_DAY,
COMPLAINT_START_TIMESTAMP_WEEKDAY,
COMPLAINT_START_TIMESTAMP_HOUR,
COMPLAINT_END_TIMESTAMP,
COMPLAINT_END_TIMESTAMP_YEAR,
COMPLAINT_END_TIMESTAMP_MONTH,
COMPLAINT_END_TIMESTAMP_DAY,
COMPLAINT_END_TIMESTAMP_WEEKDAY,
COMPLAINT_END_TIMESTAMP_HOUR,
REPORTED_DATE_TIMESTAMP,
REPORTED_DATE_TIMESTAMP_YEAR,
REPORTED_DATE_TIMESTAMP_MONTH,
REPORTED_DATE_TIMESTAMP_DAY,
REPORTED_DATE_TIMESTAMP_WEEKDAY
FROM df_na_drop
LIMIT 10;
Perfect. Let’s move onto the next task.
Incident Length Flag & Incident Length
Very simple here. I want to
- I want to calculate how long the incident lasted
- If the incident was a single instance in time, I want to flag it in some way in another column
# Take the difference between start and end, expressed in minutes
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_LENGTH',
(unix_timestamp(df_na_drop['COMPLAINT_END_TIMESTAMP']) - unix_timestamp(df_na_drop['COMPLAINT_START_TIMESTAMP']))/60
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
COMPLAINT_START_TIMESTAMP,
COMPLAINT_END_TIMESTAMP,
COMPLAINT_LENGTH
FROM df_na_drop
LIMIT 10;
# If COMPLAINT_LENGTH = 0, we flag with a new boolean column COMPLAINT_LENGTH_ZERO_TIME
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_LENGTH_ZERO_TIME', when(df_na_drop['COMPLAINT_LENGTH'] == 0, True).otherwise(False)
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
COMPLAINT_LENGTH,
COMPLAINT_LENGTH_ZERO_TIME
FROM df_na_drop
LIMIT 10;
Lag Time Between Incident And Report Date
This one is kinda weird because report date doesn’t have a time component in this dataset, only a date. We’ll take the difference between both the COMPLAINT_START and COMPLAINT_END dates with the REPORTED_DATE. My assumption here is that the incident always happens first and then it is reported when it is completely finished. This may be a wrong assumption, but I’ll make this assumption for the sake of the subtraction and we will allow negative values if the complaint can be reported before the incident actually finished.
# Take the difference between start and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_START_REPORTED_LAG',
(unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_START_TIMESTAMP'])))/60/60/24
)
# Take the difference between end and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_END_REPORTED_LAG',
(unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_END_TIMESTAMP'])))/60/60/24
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
COMPLAINT_START_TIMESTAMP,
COMPLAINT_END_TIMESTAMP,
REPORTED_DATE_TIMESTAMP,
COMPLAINT_START_REPORTED_LAG,
COMPLAINT_END_REPORTED_LAG
FROM df_na_drop
LIMIT 10;
Okay. I think we have our final dataframe! I’m not really sure what other features I can engineer at this point without getting deeper into actual analysis, so let’s save this dataframe back out to S3 as a temporary store.
df_na_drop.printSchema()
df_clean = df_na_drop[[
'COMPLAINT_NUMBER',
'COMPLAINT_START_TIMESTAMP',
'COMPLAINT_END_TIMESTAMP',
'REPORTED_DATE_TIMESTAMP',
'COMPLAINT_START_TIMESTAMP_YEAR',
'COMPLAINT_START_TIMESTAMP_MONTH',
'COMPLAINT_START_TIMESTAMP_DAY',
'COMPLAINT_START_TIMESTAMP_WEEKDAY',
'COMPLAINT_START_TIMESTAMP_HOUR',
'COMPLAINT_END_TIMESTAMP_YEAR',
'COMPLAINT_END_TIMESTAMP_MONTH',
'COMPLAINT_END_TIMESTAMP_DAY',
'COMPLAINT_END_TIMESTAMP_WEEKDAY',
'COMPLAINT_END_TIMESTAMP_HOUR',
'REPORTED_DATE_TIMESTAMP_YEAR',
'REPORTED_DATE_TIMESTAMP_MONTH',
'REPORTED_DATE_TIMESTAMP_DAY',
'REPORTED_DATE_TIMESTAMP_WEEKDAY',
'COMPLAINT_LENGTH',
'COMPLAINT_LENGTH_ZERO_TIME',
'COMPLAINT_START_REPORTED_LAG',
'COMPLAINT_END_REPORTED_LAG',
'OFFENSE_DESCRIPTION',
'OFFENSE_RESULT',
'OFFENSE_LEVEL',
'JURISDICTION',
'BOROUGH',
'PRECINCT',
'SPECIFIC_LOCATION',
'PREMISE_DESCRIPTION',
'LAT',
'LON'
]]
# Save CSV back to S3
df_clean.write.parquet('s3n://2017edmfasatb/nypd_complaints/data/df_clean.parquet')
Okay – this is pretty interesting. Scrolling through that whole error dump, we see that we eventually see an error message:
‘NoneType’ object has no attribute ‘year’
Well, the only time we use the year attribute of anything is where we defined the UDF
extract_year = udf(lambda x: x.year)
And we used this UDF when extracting the year of the COMPLAINT_START date. What’s weird is that we never got this error before, and we already not only used, but built on top of that part of the code already! We’ve done so many more modifications to the dataframe since then. My hypothesis about what’s happening here is that Spark is lazy in its execution. So far in our analysis, we’ve never really taken a look at more than 10 rows when dealing with the datetime and timestamp fields. We always just take a small sample of the data to make sure the operations we’ve built are doing what they should be. Spark has actually never had to run that calculation on an entire dataframe… until now!.
I guess when I replaced NAs in my original function, there are actually “None” values (maybe not the same as NaN, which maybe my function was searching for). Let’s reframe our NA detecting function to see what’s happening with None.
%%read_sql
SELECT COUNT(*) FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL
Remember in our last post, after dropping the NAs, we used the following function we found off stackoverflow to count the NAs in our dataframe, and in fact, we were seeing zero.
def count_not_null(c, nan_as_null=False):
pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
return sum(pred.cast("integer")).alias(c)
Clearly, there are intricacies about the Spark null types that I have not fully grasped yet. I’m also starting to explore how the different APIs interpret this as well. In ANSI SQL, the only null type that I know if is literally called NULL, which we see in the above SQL query, there are in fact NULLs that exist in the SQL world. I guess these correlate to the Nones in the Pyspark world, but how exactly do we get rid of these if, well, our na.drop() function of the Pyspark dataframe wasn’t able to get it in the first place?
df_na_drop.where(col("COMPLAINT_START_TIMESTAMP").isNull()).count()
Alright well there you go, there does seem like the Nones are registering, which I believe correlates directly to the na.drop() function, so the original COMPLAINT_START fields should not be null…
%%read_sql
SELECT COMPLAINT_START_DATE, COMPLAINT_START_TIME, COMPLAINT_START_TIMESTAMP FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL LIMIT 20
Okay, I think I see the problem now. It wasn’t a Null that was in the original dataset, it was a Null that occurred when trying to concatenate the DATE and TIME columns into the TIMESTAMP column… Further, I think the problem is that the time is being expressed as 24:00:00 rather than 00:00:00, as that’s the most obvious commonality of all these rows which have Null TIMESTAMPs.
%%read_sql
SELECT DISTINCT COMPLAINT_START_TIME FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL
Confirmed. That’s all the correlation I need. I think it’s actually easier at this point to start another notebook and run the code back from the beginning while making this change because it’s smack dab in the middle of this notebook. If I wanted to integrate the change back in here, I would have to re-run half the notebook and my commentary would no longer match up with the code and results, so let’s call it here and continue in the next post.