Intro
In the last post, we wanted to dive into data exploration, but got caught up looking at the basic workflow of simple Spark processes. In fact, we really did no exploration whatsoever other than look at the top 10 rows of the data set haha. It’s okay though, that deeper dive was necessary and will help with troubleshooting in the future. Let’s load the dataset from S3 into a Spark dataframe again.
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')
# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession
# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf, count, isnan, lit, sum, coalesce, concat, to_timestamp, when
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd
# Initiate SparkSession as "spark"
spark = SparkSession\
.builder\
.getOrCreate()
# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
"s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv",
header = True,
inferSchema = True
)
# Describe df
df.printSchema()
# Register temporary table
df.createOrReplaceTempView("df")
# Perform the SQL equivalent of "head"
result = %read_sql SELECT * FROM df LIMIT 10;
result
Data Cleansing
Removing NAs
Okay, so let’s actually start cleaning up some of this data. The first thing I want to see if how many NAs exist in each row.
# Find number of rows in dataframe
df_num_rows = df.count()
df_num_rows
# Function copied from https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark
# Retrieves all the non-na rows
def count_not_null(c, nan_as_null=False):
pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
return sum(pred.cast("integer")).alias(c)
def show_null_count(df, df_num_rows):
df_na_pandas = df.agg(*[count_not_null(c) for c in df.columns]).toPandas().T
df_na_pandas.columns = ['non_na_rows']
df_na_pandas['total_rows'] = df_num_rows
df_na_pandas['na_rows'] = df_na_pandas['total_rows'] - df_na_pandas['non_na_rows']
df_na_pandas['na_rows_ratio'] = df_na_pandas['na_rows'] / df_na_pandas['total_rows']
return df_na_pandas
# Show nulls
show_null_count(df, df_num_rows)
I actually am not a big fan of the column names. I’m going to clean them up a big based on the data dictionary.
oldColumns = df.schema.names
newColumns = [
'COMPLAINT_NUMBER',
'COMPLAINT_START_DATE',
'COMPLAINT_START_TIME',
'COMPLAINT_END_DATE',
'COMPLAINT_END_TIME',
'REPORTED_DATE',
'OFFENSE_ID',
'OFFENSE_DESCRIPTION',
'OFFENSE_INTERNAL_CODE',
'OFFENSE_INTERNAL_DESCRIPTION',
'OFFENSE_RESULT',
'OFFENSE_LEVEL',
'JURISDICTION',
'BOROUGH',
'PRECINCT',
'SPECIFIC_LOCATION',
'PREMISE_DESCRIPTION',
'PARK_NAME',
'HOUSING_NAME',
'X_COORD_NYC',
'Y_COORD_NYC',
'LAT',
'LON',
'LAT_LON'
]
df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()
# Show nulls
show_null_count(df, df_num_rows)
A few observations
- Overall, the data quality looks great! Most fields I’m interested don’t have more than 5% of data missing
- According to the data dictionary, the COMPLAINT_START/END_DATE/TIME are when the incident itself took place, and the COMPLAINT_END fields will only exist if they are different than the COMPLAINT_START fields
- Fields like PARK_NAME and HOUSING_NAME are largely missing, and can likely be excluded
- There are 3.5% rows of data that do not have a specific location, maybe I can throw these out for now as I’d prefer not to deal with missing data
Let’s remove all rows in which the following columns are NA:
- COMPLAINT_START_DATE
- COMPLAINT_START_TIME
- OFFENSE_DESCRIPTION
- OFFENSE_RESULT
- BOROUGH
- PRECINCT
- LAT / LON
# Drop rows with any NA values in the specified columns
df_na_drop = df.na.drop(subset=[
'COMPLAINT_START_DATE',
'COMPLAINT_START_TIME',
'OFFENSE_DESCRIPTION',
'OFFENSE_RESULT',
'BOROUGH',
'PRECINCT',
'LAT',
'LON'
])
Okay, we let’s see how many rows remain.
# Count number of rows remaining
df_num_rows_na = df_na_drop.count()
print('{} out of {} rows remain after dropping NAs ({}%)'.format(df_num_rows_na, df_num_rows, df_num_rows_na / df_num_rows))
We ended up dropping not more than 4% of our total number of rows. That’s a number I can live with!
# Show nulls
show_null_count(df_na_drop, df_num_rows_na)
Formatting Dates & Times
The second thing that jumped out at me is that dates and times are still being formatted as strings. It looks like Spark’s inferSchema parameter of the read.csv() command wasn’t able to tell dates and times.
Even before that, however, we mentioned earlier that the COMPLAINT_END fields are populated only if there exists an end date that’s different than the start date. This means that, if the COMPLAINT_END fields are not populated, they are in fact the same as the COMPLAINT_START dates. Let’s make that change now.
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_DATE', coalesce(df_na_drop['COMPLAINT_END_DATE'], df_na_drop['COMPLAINT_START_DATE']))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', coalesce(df_na_drop['COMPLAINT_END_TIME'], df_na_drop['COMPLAINT_START_TIME']))
# Show nulls
show_null_count(df_na_drop, df_num_rows_na)
Pyspark has got some interesting notation… the withColumn() function seems to be used a lot and takes the place of a straight assignment operator =… Why? I’m not sure yet…
Alright, now we can try to format our dates. Let’s start by concatenating our COMPLAINT date and times together in hopes of combining them into a single field of TimestampType. REPORTED_DATE also is a date field, but contains no time.
# Combine date and time fields and create new timestamp field for COMPLAINT fields
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_START_TIMESTAMP',
to_timestamp(
concat(df_na_drop['COMPLAINT_START_DATE'], lit(' '), df_na_drop['COMPLAINT_START_TIME']),
'MM/dd/yyyy HH:mm:ss'
)
)
df_na_drop = df_na_drop.withColumn(
'COMPLAINT_END_TIMESTAMP',
to_timestamp(
concat(df_na_drop['COMPLAINT_END_DATE'], lit(' '), df_na_drop['COMPLAINT_END_TIME']),
'MM/dd/yyyy HH:mm:ss'
)
)
# Convert REPORTED_DATE
df_na_drop = df_na_drop.withColumn(
'REPORTED_DATE_TIMESTAMP',
to_timestamp(
df_na_drop['REPORTED_DATE'],
'MM/dd/yyyy'
)
)
df_na_drop.printSchema()
# View timestamp columns
df_na_drop.createOrReplaceTempView("df_na_drop")
%read_sql SELECT COMPLAINT_START_DATE, COMPLAINT_START_TIME, COMPLAINT_END_DATE, COMPLAINT_END_TIME, REPORTED_DATE, COMPLAINT_START_TIMESTAMP, COMPLAINT_END_TIMESTAMP, REPORTED_DATE_TIMESTAMP FROM df_na_drop LIMIT 10;
Looks good to me! Let’s take a look at some other fields as well.
Offenses
The next most interesting thing to me is probably the offenses themselves. What are the offenses? How standardized are they? How many are there? I would imagine there to be quite a bit. Should I keep all of them? Cluster them? Lots of questions to be answered, obviously. Let’s see how many there are first.
%%read_sql
SELECT
COUNT(DISTINCT OFFENSE_ID),
COUNT(DISTINCT OFFENSE_DESCRIPTION),
COUNT(DISTINCT OFFENSE_INTERNAL_CODE),
COUNT(DISTINCT OFFENSE_INTERNAL_DESCRIPTION),
COUNT(DISTINCT OFFENSE_RESULT),
COUNT(DISTINCT OFFENSE_LEVEL)
FROM df_na_drop;
Okay, so it looks like the offense descriptions may cause us some trouble. 69 overall distinct offense types actually sound quite reasonable to me – I’d imagined there would be much more, but of course that’s where the internal descriptions come in. According to the data dictionary, the internal description is where the more detailed descriptions are kept.
Let’s just take a look at the 1st level offense descriptions… perhaps the top 20 or something.
%%read_sql
SELECT
OFFENSE_ID,
OFFENSE_DESCRIPTION,
COUNT(*) AS NUM_RECORDS,
(COUNT(*) / (SELECT COUNT(*) FROM df_na_drop))*100 AS NUM_REC_PERC
FROM df_na_drop
GROUP BY
OFFENSE_ID,
OFFENSE_DESCRIPTION
ORDER BY
COUNT(*) DESC
LIMIT 25;
Alrighty. Petit Larceny. I’ll admit I’ve heard of Grand Larceny before, but I never knew what Larceny ever meant… You learn something new every day, I guess:
Larceny – A crime involving the unlawful taking of the personal property of another person or business.
Hmm, that sounds… a lot like Theft? But wait there is Robbery as a category too… and Burglary. What’s the difference between them?
The basic distinction between robbery and larceny is that robbery involves the use of force, whereas larceny doesn’t
Ok fair enough. How about Burglary?
The crime of burglary, though most often equated with theft, doesn’t actually require that a theft occur, or even be intended. Burglary is simply the unlawful entry into a structure, such as a home or business, with the intent to commit a crime inside. Although many burglaries involve theft, the crime intended can be any crime from theft, to murder, to making pot brownies.
Wow, never had a clue between those distinctions. I’ll have to add this to my domain knowledge learnings lol. How about some other ones… oh wait, Petit Larceny and Grand Larceny… what’s the difference? $200! Anything below is Petit and anything larger is Grand.
How about Harassment 2 and Assault 3? It looks like anything in this notation means x in the yth degree.
- 2nd Degree Harassment: Intent to harass, annoy or alarm another person.
- 3rd Degree Assault: Intentionally or recklessly causes injury to another person, or if he is criminally negligent with a weapon.
How about… Criminal Mischief? Looks like it’s when you damage or vandalize someone else’s property.
OFF. AGNST PUB ORD SENSBLTY &? I’m going ahead to assume that it means Offenses Against Public Order, which are violations that interfere with the normal operations of society. These crimes go against publicly shared values, norms, or customs. One example given is public drunkenness – Fair enough!
There’s also Felony Assault, which just seems like a more severe Assault.
I think those are the major ones. It’s important to point out that the first 3, Petit Larceny, Harassment, and Assault make up almost 40% of all crime! Including Mischief, Grand Larceny, Public Order, and Dangerous Drugs, we’re almost up to 70%. No other complaints make up more than 4% of the total calls. Let’s keep the top 10 categories here and lump the rest into ‘Other’.
# List of crimes to keep
crimes_to_keep = [
'PETIT LARCENY',
'HARRASSMENT 2',
'ASSAULT 3 & RELATED OFFENSES',
'CRIMINAL MISCHIEF & RELATED OF',
'GRAND LARCENY',
'OFF. AGNST PUB ORD SENSBLTY &',
'DANGEROUS DRUGS',
'ROBBERY',
'BURGLUARY',
'FELONY ASSAULT'
]
# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
'OFFENSE_DESCRIPTION',
when(df_na_drop['OFFENSE_DESCRIPTION'].isin(crimes_to_keep), df_na_drop['OFFENSE_DESCRIPTION']).otherwise('OTHER')
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
OFFENSE_DESCRIPTION,
COUNT(*) AS NUM_RECORDS,
(COUNT(*) / (SELECT COUNT(*) FROM df_na_drop))*100 AS NUM_REC_PERC
FROM df_na_drop
GROUP BY
OFFENSE_DESCRIPTION
ORDER BY
COUNT(*) DESC
LIMIT 25;
Perfect – Other is now our largest category, but it includes like 50-60 categories lumped in there. The top 10 make up around 80%. I might just leave it at this without diving too deep into the internal offense codes for now. This is about as detailed I was thinking about in my head anyways.
Let’s take a look at OFFENSE_RESULT and OFFENSE_LEVEL before we wrap up offenses, just to be aware of what’s there.
%%read_sql
SELECT DISTINCT OFFENSE_RESULT FROM df_na_drop
%%read_sql
SELECT DISTINCT OFFENSE_LEVEL FROM df_na_drop
Ok… more education time… what’s the difference between these 3 haha.
- VIOLATION – Punishable by up to 15 days in jail
- MISDEMEANOR – Punishable by between 15 days and 1 year in jail
- FELONY – Punishable by over 1 year in jail
Location (Environment)
The next area I want to look to clean is the the location, and I’m talking specifically about the SPECIFIC_LOCATION and the PREMISE_DESCRIPTION fields. Note that these are not the long lat / lat fields, but a text description of the location and environment of the incident.
%%read_sql
SELECT
COUNT(DISTINCT SPECIFIC_LOCATION),
COUNT(DISTINCT PREMISE_DESCRIPTION)
FROM df_na_drop;
Let’s check out SPECIFIC_LOCATION first:
%%read_sql
SELECT DISTINCT SPECIFIC_LOCATION FROM df_na_drop;
Not too many descriptors, which will be great for us. I’m honestly not even sure how useful this column will be at all, so I won’t spend too much time cleaning or standardizing. There are the None values and the blank values, which to me, are the same… but maybe there’s a bit of context that I’m missing. Maybe “None” indicates that the premise doesn’t have descriptors that apply to it, whereas blank encapsulates “other” or something like that… who knows. Let’s leave this for now and maybe take a look at it in tandem with PREMISE_DESCRIPTION before we make a call.
Now PREMISE_DESCRIPTION (let’s take the top 25 again here).
%%read_sql
SELECT
PREMISE_DESCRIPTION,
COUNT(*) AS NUM_RECORDS,
(COUNT(*) / (SELECT COUNT(*) FROM df_na_drop))*100 AS NUM_REC_PERC
FROM df_na_drop
GROUP BY
PREMISE_DESCRIPTION
ORDER BY
COUNT(*) DESC
LIMIT 25;
Let’s see the most popular combinations of the 2 fields:
%%read_sql
SELECT
PREMISE_DESCRIPTION,
SPECIFIC_LOCATION,
COUNT(*) AS NUM_RECORDS,
(COUNT(*) / (SELECT COUNT(*) FROM df_na_drop))*100 AS NUM_REC_PERC
FROM df_na_drop
GROUP BY
PREMISE_DESCRIPTION,
SPECIFIC_LOCATION
ORDER BY
COUNT(*) DESC
LIMIT 25;
It looks like the “None” value implies “On” the premise. On a street, or on an NYC subway, or on the grounds of a park. I guess in some cases it can also mean “In”, but there is also the “Inside” descriptor which is explicitly stated as a value. In any event, I’ll leave None for now because it really does apply. Not one word will be able to replace “None”. I’m still not sure what the blank means, though, but it seems that it only occurs in a very small portion of the data anyways.
Let’s just do what we did last time and take the top 10 PREMISE_DESCRIPTION.
# List of premises to keep
premises_to_keep = [
'STREET',
'RESIDENCE - APT. HOUSE',
'RESIDENCE-HOUSE',
'RESIDENCE - PUBLIC HOUSING',
'COMMERCIAL BUILDING',
'DEPARTMENT STORE',
'TRANSIT - NYC SUBWAY',
'CHAIN STORE',
'PUBLIC SCHOOL',
'GROCERY/BODEGA',
'RESTAURANT/DINER',
'BAR/NIGHT CLUB',
'PARK/PLAYGROUND'
]
# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
'PREMISE_DESCRIPTION',
when(df_na_drop['PREMISE_DESCRIPTION'].isin(premises_to_keep), df_na_drop['PREMISE_DESCRIPTION']).otherwise('OTHER')
)
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
%%read_sql
SELECT
PREMISE_DESCRIPTION,
COUNT(*) AS NUM_RECORDS,
(COUNT(*) / (SELECT COUNT(*) FROM df_na_drop))*100 AS NUM_REC_PERC
FROM df_na_drop
GROUP BY
PREMISE_DESCRIPTION
ORDER BY
COUNT(*) DESC
LIMIT 25;
I lied, I kept the top 13 PREMISE_DESCRIPTION values because I thought restaurants, bars, and parks would be interesting to explore later on… sly… I know…
“OTHER” falls in third this time, with more crime happening in streets and apartments. Awesome. Well, I mean… not awesome I guess, but… interesting. Yes, let’s go with that!
With that, I could argue that I’m actually done here! Let’s feature build in the next post.