NYPD Crime #9 – Data Exploration (Part III – Feature Building)

Let’s get right into it. If you’ve just landed on this post, please read the past few “Data Exploration” posts on this project to understand the context of what I’m trying to do here in this post. Note that this post is almost identical to the last with the exception of replacing all 24:00:00 time values with 00:00:00.

In [2]:
import os
os.system("sudo pip install findspark sql_magic")
Out[2]:
0
In [3]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')

# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf, count, isnan, lit, sum, coalesce, concat, to_date, to_timestamp, when, date_format, unix_timestamp, regexp_replace
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd

# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'
In [37]:
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
    "s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv", 
    header = True, 
    inferSchema = True
)
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 42.5 s
In [38]:
oldColumns = df.schema.names
newColumns = [
    'COMPLAINT_NUMBER',
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'COMPLAINT_END_DATE',
    'COMPLAINT_END_TIME',
    'REPORTED_DATE',
    'OFFENSE_ID',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_INTERNAL_CODE',
    'OFFENSE_INTERNAL_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'PARK_NAME',
    'HOUSING_NAME',
    'X_COORD_NYC',
    'Y_COORD_NYC',
    'LAT',
    'LON',
    'LAT_LON'
]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()
root
 |-- COMPLAINT_NUMBER: integer (nullable = true)
 |-- COMPLAINT_START_DATE: string (nullable = true)
 |-- COMPLAINT_START_TIME: string (nullable = true)
 |-- COMPLAINT_END_DATE: string (nullable = true)
 |-- COMPLAINT_END_TIME: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- OFFENSE_ID: integer (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_INTERNAL_CODE: integer (nullable = true)
 |-- OFFENSE_INTERNAL_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_RESULT: string (nullable = true)
 |-- OFFENSE_LEVEL: string (nullable = true)
 |-- JURISDICTION: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- SPECIFIC_LOCATION: string (nullable = true)
 |-- PREMISE_DESCRIPTION: string (nullable = true)
 |-- PARK_NAME: string (nullable = true)
 |-- HOUSING_NAME: string (nullable = true)
 |-- X_COORD_NYC: integer (nullable = true)
 |-- Y_COORD_NYC: integer (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- LAT_LON: string (nullable = true)

In [39]:
# Drop rows with any NA values in the specified columns
df_na_drop = df.na.drop(subset=[
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'BOROUGH',
    'PRECINCT',
    'LAT',
    'LON'
])

Let’s make the conversion of times here.

In [40]:
# Replace 24:00:00 with 00:00:00 in the time columns
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIME', regexp_replace('COMPLAINT_START_TIME', '24:00:00', '00:00:00'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', regexp_replace('COMPLAINT_END_TIME', '24:00:00', '00:00:00'))
In [41]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [42]:
%%read_sql
SELECT count(*) FROM df_na_drop WHERE COMPLAINT_START_TIME = '24:00:00'
Query started at 10:19:53 PM UTC; Query executed in 0.55 m
Out[42]:
count(1)
0 0
In [43]:
%%read_sql
SELECT count(*) FROM df_na_drop WHERE COMPLAINT_START_TIME = '00:00:00'
Query started at 10:20:27 PM UTC; Query executed in 0.53 m
Out[43]:
count(1)
0 30887

Looks like it’s worked to me. Let’s continue.

In [44]:
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_DATE', coalesce(df_na_drop['COMPLAINT_END_DATE'], df_na_drop['COMPLAINT_START_DATE']))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', coalesce(df_na_drop['COMPLAINT_END_TIME'], df_na_drop['COMPLAINT_START_TIME'])) 
In [45]:
# Combine date and time fields and create new timestamp field for COMPLAINT fields
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_START_DATE'], lit(' '), df_na_drop['COMPLAINT_START_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_END_DATE'], lit(' '), df_na_drop['COMPLAINT_END_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

# Convert REPORTED_DATE
df_na_drop = df_na_drop.withColumn(
    'REPORTED_DATE_TIMESTAMP', 
    to_timestamp(
        df_na_drop['REPORTED_DATE'],
        'MM/dd/yyyy'
    )
)
In [46]:
# List of crimes to keep
crimes_to_keep = [
    'PETIT LARCENY',
    'HARRASSMENT 2',
    'ASSAULT 3 & RELATED OFFENSES',
    'CRIMINAL MISCHIEF & RELATED OF',
    'GRAND LARCENY',
    'OFF. AGNST PUB ORD SENSBLTY &',
    'DANGEROUS DRUGS',
    'ROBBERY',
    'BURGLUARY',
    'FELONY ASSAULT'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'OFFENSE_DESCRIPTION', 
    when(df_na_drop['OFFENSE_DESCRIPTION'].isin(crimes_to_keep), df_na_drop['OFFENSE_DESCRIPTION']).otherwise('OTHER')
)
In [47]:
# List of premises to keep
premises_to_keep = [
    'STREET',
    'RESIDENCE - APT. HOUSE',
    'RESIDENCE-HOUSE',
    'RESIDENCE - PUBLIC HOUSING',
    'COMMERCIAL BUILDING',
    'DEPARTMENT STORE',
    'TRANSIT - NYC SUBWAY',
    'CHAIN STORE',
    'PUBLIC SCHOOL',
    'GROCERY/BODEGA',
    'RESTAURANT/DINER',
    'BAR/NIGHT CLUB',
    'PARK/PLAYGROUND'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'PREMISE_DESCRIPTION', 
    when(df_na_drop['PREMISE_DESCRIPTION'].isin(premises_to_keep), df_na_drop['PREMISE_DESCRIPTION']).otherwise('OTHER')
)
In [48]:
# Set UDFs to extract specific parts of date and time
extract_year =  udf(lambda x: x.year)
extract_month =  udf(lambda x: x.month)
extract_day =  udf(lambda x: x.day)
extract_hour =  udf(lambda x: x.hour)

# Perform transformation
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_DAY', extract_day(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_START_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_START_TIMESTAMP')))

df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_DAY', extract_day(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_END_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_END_TIMESTAMP')))

df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_YEAR', extract_year(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_MONTH', extract_month(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_DAY', extract_day(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_WEEKDAY', date_format(col('REPORTED_DATE_TIMESTAMP'), 'E'))
In [49]:
# Take the difference between start and end, expressed in minutes
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH', 
    (unix_timestamp(df_na_drop['COMPLAINT_END_TIMESTAMP']) - unix_timestamp(df_na_drop['COMPLAINT_START_TIMESTAMP']))/60
)
In [50]:
# If COMPLAINT_LENGTH = 0, we flag with a new boolean column COMPLAINT_LENGTH_ZERO_TIME
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH_ZERO_TIME', when(df_na_drop['COMPLAINT_LENGTH'] == 0, True).otherwise(False)
)
In [51]:
# Take the difference between start and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_START_TIMESTAMP'])))/60/60/24
)

# Take the difference between end and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_END_TIMESTAMP'])))/60/60/24
)

Okay, let’s check the dataframe for null timestamp values again.

In [52]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [53]:
%%read_sql
SELECT COUNT(*) FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL
Query started at 10:21:00 PM UTC; Query executed in 0.71 m
Out[53]:
count(1)
0 0

Okay let’s try saving as a parquet again to S3. Wait, we haven’t gone over parquets! Yet another term to learn for better or for worse. Let’s pretend here like it’s for the better haha.

A parquet is a different method of storing a file. While CSV is raw text delimited by a comma, a parquet is a much more intricate and complex data storage algorithm.

Its advantage is that its a columnar storage format, which means that each column is stored separately, rather than each row (think about when you open up a CSV, you generally read it by rows. Only when you format the data into a Python array or a Python / Pyspark dataframe can you really start referring to it by columns. This gives us storage and speed disadvantages.

The storage advantages come from the fact that each column now has a specific context. Just like how a Pyspark dataframe column can be designated as a string or float or timestamp, parquets have overhead and metadata within the file that make this distinction as well, again, because the data is being stored in a columnar format. Even further, this then allows us to actually encode and compress columns in specific ways, gaining efficiencies where possible.

The speed advantages, gain, stem from the columnar format and builds on top of the storage advantages as well. When querying data, often, we only want to select one or two columns, right? The columnar storage format actually allows us to only scan specific columns and, therefore, we don’t need to scan the entire file like we would if we loaded up an entire CSV! The fact that the data is compressed also implies that we end up scanning less raw binary (with some overhead to uncompress, but compute power is much faster than disk reading).

A hidden advantage here is that my AWS storage & data transfer costs should go down as well!

This is all I know about parquet for now, and I won’t really be taking advantage of anything other than the compression abilities of parquet right now (I will still end up loading the entire parquet to Spark working memory and slicing and dicing from there), but good to know for the future as AWS is starting to build out services standardized on parquets.

In [54]:
df_clean = df_na_drop[[
    'COMPLAINT_NUMBER', 
    'COMPLAINT_START_TIMESTAMP',
    'COMPLAINT_END_TIMESTAMP',
    'REPORTED_DATE_TIMESTAMP',
    'COMPLAINT_START_TIMESTAMP_YEAR',
    'COMPLAINT_START_TIMESTAMP_MONTH',
    'COMPLAINT_START_TIMESTAMP_DAY',
    'COMPLAINT_START_TIMESTAMP_WEEKDAY',
    'COMPLAINT_START_TIMESTAMP_HOUR',
    'COMPLAINT_END_TIMESTAMP_YEAR',
    'COMPLAINT_END_TIMESTAMP_MONTH',
    'COMPLAINT_END_TIMESTAMP_DAY',
    'COMPLAINT_END_TIMESTAMP_WEEKDAY',
    'COMPLAINT_END_TIMESTAMP_HOUR',
    'REPORTED_DATE_TIMESTAMP_YEAR',
    'REPORTED_DATE_TIMESTAMP_MONTH',
    'REPORTED_DATE_TIMESTAMP_DAY',
    'REPORTED_DATE_TIMESTAMP_WEEKDAY',
    'COMPLAINT_LENGTH',
    'COMPLAINT_LENGTH_ZERO_TIME',
    'COMPLAINT_START_REPORTED_LAG',
    'COMPLAINT_END_REPORTED_LAG',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'LAT',
    'LON'
]]
In [55]:
# Save CSV back to S3
df_clean.write.parquet('s3n://2017edmfasatb/nypd_complaints/data/df_clean.parquet')

Hurrah! It worked. Our file is now stored on S3. It’s pretty interesting. Our file is actually a folder consisting of multiple parts. This is because Spark is saving to a file within Spark takes into account partitioning abilities! Our partitions look like this:

This sums to a grand total of… 120MB. Good for a compression of 91%… This doesn’t even seem real, but I will absolutely not complain for now. Talk about storage gains… sheesh.

Anyways, our file is finally stored. Let’s actually continue onto actual anaylsis-oriented exploration!

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s