NYPD Crime #8 – Data Exploration (Part III – Feature Building)

Intro

Okay, I’m starting to get the hang of Spark. I’ve totally abused the Spark SQL capabilities so far, and it’s been extremely user friendly doing all this in Jupyter. Honestly, it’s not that large of a data set so I haven’t had too much complexity navigating the fields so far. There are not many features to build, but going through the date and time section, I could think of a few there that we should pull out for deeper analysis.

Let’s set up our environment and load the data as we had it last time again:

In [1]:
import os
os.system("sudo pip install findspark sql_magic")
Out[1]:
0
In [2]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')

# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf, count, isnan, lit, sum, coalesce, concat, to_date, to_timestamp, when, date_format, unix_timestamp
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd

# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'
In [78]:
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
    "s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv", 
    header = True, 
    inferSchema = True
)
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 43 s
In [79]:
oldColumns = df.schema.names
newColumns = [
    'COMPLAINT_NUMBER',
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'COMPLAINT_END_DATE',
    'COMPLAINT_END_TIME',
    'REPORTED_DATE',
    'OFFENSE_ID',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_INTERNAL_CODE',
    'OFFENSE_INTERNAL_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'PARK_NAME',
    'HOUSING_NAME',
    'X_COORD_NYC',
    'Y_COORD_NYC',
    'LAT',
    'LON',
    'LAT_LON'
]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()
root
 |-- COMPLAINT_NUMBER: integer (nullable = true)
 |-- COMPLAINT_START_DATE: string (nullable = true)
 |-- COMPLAINT_START_TIME: string (nullable = true)
 |-- COMPLAINT_END_DATE: string (nullable = true)
 |-- COMPLAINT_END_TIME: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- OFFENSE_ID: integer (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_INTERNAL_CODE: integer (nullable = true)
 |-- OFFENSE_INTERNAL_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_RESULT: string (nullable = true)
 |-- OFFENSE_LEVEL: string (nullable = true)
 |-- JURISDICTION: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- SPECIFIC_LOCATION: string (nullable = true)
 |-- PREMISE_DESCRIPTION: string (nullable = true)
 |-- PARK_NAME: string (nullable = true)
 |-- HOUSING_NAME: string (nullable = true)
 |-- X_COORD_NYC: integer (nullable = true)
 |-- Y_COORD_NYC: integer (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- LAT_LON: string (nullable = true)

In [80]:
# Drop rows with any NA values in the specified columns
df_na_drop = df.na.drop(subset=[
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'BOROUGH',
    'PRECINCT',
    'LAT',
    'LON'
])
In [81]:
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_DATE', coalesce(df_na_drop['COMPLAINT_END_DATE'], df_na_drop['COMPLAINT_START_DATE']))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', coalesce(df_na_drop['COMPLAINT_END_TIME'], df_na_drop['COMPLAINT_START_TIME'])) 
In [82]:
# Combine date and time fields and create new timestamp field for COMPLAINT fields
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_START_DATE'], lit(' '), df_na_drop['COMPLAINT_START_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_END_DATE'], lit(' '), df_na_drop['COMPLAINT_END_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

# Convert REPORTED_DATE
df_na_drop = df_na_drop.withColumn(
    'REPORTED_DATE_TIMESTAMP', 
    to_timestamp(
        df_na_drop['REPORTED_DATE'],
        'MM/dd/yyyy'
    )
)
In [83]:
# List of crimes to keep
crimes_to_keep = [
    'PETIT LARCENY',
    'HARRASSMENT 2',
    'ASSAULT 3 & RELATED OFFENSES',
    'CRIMINAL MISCHIEF & RELATED OF',
    'GRAND LARCENY',
    'OFF. AGNST PUB ORD SENSBLTY &',
    'DANGEROUS DRUGS',
    'ROBBERY',
    'BURGLUARY',
    'FELONY ASSAULT'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'OFFENSE_DESCRIPTION', 
    when(df_na_drop['OFFENSE_DESCRIPTION'].isin(crimes_to_keep), df_na_drop['OFFENSE_DESCRIPTION']).otherwise('OTHER')
)
In [84]:
# List of premises to keep
premises_to_keep = [
    'STREET',
    'RESIDENCE - APT. HOUSE',
    'RESIDENCE-HOUSE',
    'RESIDENCE - PUBLIC HOUSING',
    'COMMERCIAL BUILDING',
    'DEPARTMENT STORE',
    'TRANSIT - NYC SUBWAY',
    'CHAIN STORE',
    'PUBLIC SCHOOL',
    'GROCERY/BODEGA',
    'RESTAURANT/DINER',
    'BAR/NIGHT CLUB',
    'PARK/PLAYGROUND'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'PREMISE_DESCRIPTION', 
    when(df_na_drop['PREMISE_DESCRIPTION'].isin(premises_to_keep), df_na_drop['PREMISE_DESCRIPTION']).otherwise('OTHER')
)
In [85]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [86]:
%%read_sql
SELECT * FROM df_na_drop LIMIT 10;
Query started at 10:14:17 PM UTC; Query executed in 0.01 m
Out[86]:
COMPLAINT_NUMBER COMPLAINT_START_DATE COMPLAINT_START_TIME COMPLAINT_END_DATE COMPLAINT_END_TIME REPORTED_DATE OFFENSE_ID OFFENSE_DESCRIPTION OFFENSE_INTERNAL_CODE OFFENSE_INTERNAL_DESCRIPTION PARK_NAME HOUSING_NAME X_COORD_NYC Y_COORD_NYC LAT LON LAT_LON COMPLAINT_START_TIMESTAMP COMPLAINT_END_TIMESTAMP REPORTED_DATE_TIMESTAMP
0 101109527 12/31/2015 23:45:00 12/31/2015 23:45:00 12/31/2015 113 OTHER 729.0 FORGERY,ETC.,UNCLASSIFIED-FELO None None 1007314 241257 40.828848 -73.916661 (40.828848333, -73.916661142) 2015-12-31 23:45:00 2015-12-31 23:45:00 2015-12-31
1 153401121 12/31/2015 23:36:00 12/31/2015 23:36:00 12/31/2015 101 OTHER NaN None None None 1043991 193406 40.697338 -73.784557 (40.697338138, -73.784556739) 2015-12-31 23:36:00 2015-12-31 23:36:00 2015-12-31
2 569369778 12/31/2015 23:30:00 12/31/2015 23:30:00 12/31/2015 117 DANGEROUS DRUGS 503.0 CONTROLLED SUBSTANCE,INTENT TO None None 999463 231690 40.802607 -73.945052 (40.802606608, -73.945051911) 2015-12-31 23:30:00 2015-12-31 23:30:00 2015-12-31
3 968417082 12/31/2015 23:30:00 12/31/2015 23:30:00 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 None None 1060183 177862 40.654549 -73.726339 (40.654549444, -73.726338791) 2015-12-31 23:30:00 2015-12-31 23:30:00 2015-12-31
4 641637920 12/31/2015 23:25:00 12/31/2015 23:30:00 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 None None 987606 208148 40.738002 -73.987891 (40.7380024, -73.98789129) 2015-12-31 23:25:00 2015-12-31 23:30:00 2015-12-31
5 365661343 12/31/2015 23:18:00 12/31/2015 23:25:00 12/31/2015 106 FELONY ASSAULT 109.0 ASSAULT 2,1,UNCLASSIFIED None None 996149 181562 40.665023 -73.957111 (40.665022689, -73.957110763) 2015-12-31 23:18:00 2015-12-31 23:25:00 2015-12-31
6 608231454 12/31/2015 23:15:00 12/31/2015 23:15:00 12/31/2015 235 DANGEROUS DRUGS 511.0 CONTROLLED SUBSTANCE, POSSESSI None None 987373 201662 40.720200 -73.988735 (40.720199996, -73.988735082) 2015-12-31 23:15:00 2015-12-31 23:15:00 2015-12-31
7 265023856 12/31/2015 23:15:00 12/31/2015 23:15:00 12/31/2015 118 OTHER 792.0 WEAPONS POSSESSION 1 & 2 None None 1009041 247401 40.845707 -73.910398 (40.845707148, -73.910398033) 2015-12-31 23:15:00 2015-12-31 23:15:00 2015-12-31
8 989238731 12/31/2015 23:15:00 12/31/2015 23:30:00 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 None None 1014154 251416 40.856711 -73.891900 (40.856711291, -73.891899956) 2015-12-31 23:15:00 2015-12-31 23:30:00 2015-12-31
9 415095955 12/31/2015 23:10:00 12/31/2015 23:10:00 12/31/2015 341 PETIT LARCENY 338.0 LARCENY,PETIT FROM BUILDING,UN None None 994327 218211 40.765618 -73.963623 (40.765617688, -73.96362342) 2015-12-31 23:10:00 2015-12-31 23:10:00 2015-12-31

10 rows × 27 columns

Timestamp Columns

Remember, in the last post, we cleaned up the COMPLAINT_START, COMPLAINT_END, and REPORTED_DATE columns. Through these 3 fields, alone, I can probably think of 5 or 6 features off the top of my head that we should build:

  • Year, Month, Day, Day Of Week, and Hour of each of these dates (no hour for REPORTED_DATE)
  • Flag of whether incident spanned any amount of time (vs a one-time incident where COMPLAINT_START = COMPLAINT_END)
  • Length of the incident, if not one-time
  • How far after the incident started and ended was the event reported

That will actually give us 18 new features haha… I’ll take it.

Date / Time Fields For COMPLAINT_START

In [87]:
# Set UDFs to extract specific parts of date and time
extract_year =  udf(lambda x: x.year)
extract_month =  udf(lambda x: x.month)
extract_day =  udf(lambda x: x.day)
extract_hour =  udf(lambda x: x.hour)

# Perform transformation
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_DAY', extract_day(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_START_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_START_TIMESTAMP')))

df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_DAY', extract_day(col('COMPLAINT_END_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_END_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_END_TIMESTAMP')))

df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_YEAR', extract_year(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_MONTH', extract_month(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_DAY', extract_day(col('REPORTED_DATE_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('REPORTED_DATE_TIMESTAMP_WEEKDAY', date_format(col('REPORTED_DATE_TIMESTAMP'), 'E'))
In [88]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [89]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_START_TIMESTAMP_YEAR,
    COMPLAINT_START_TIMESTAMP_MONTH,
    COMPLAINT_START_TIMESTAMP_DAY,
    COMPLAINT_START_TIMESTAMP_WEEKDAY,
    COMPLAINT_START_TIMESTAMP_HOUR,
    COMPLAINT_END_TIMESTAMP,
    COMPLAINT_END_TIMESTAMP_YEAR,
    COMPLAINT_END_TIMESTAMP_MONTH,
    COMPLAINT_END_TIMESTAMP_DAY,
    COMPLAINT_END_TIMESTAMP_WEEKDAY,
    COMPLAINT_END_TIMESTAMP_HOUR,
    REPORTED_DATE_TIMESTAMP,
    REPORTED_DATE_TIMESTAMP_YEAR,
    REPORTED_DATE_TIMESTAMP_MONTH,
    REPORTED_DATE_TIMESTAMP_DAY,
    REPORTED_DATE_TIMESTAMP_WEEKDAY
FROM df_na_drop 
LIMIT 10;
Query started at 10:14:18 PM UTC; Query executed in 0.04 m
Out[89]:
COMPLAINT_START_TIMESTAMP COMPLAINT_START_TIMESTAMP_YEAR COMPLAINT_START_TIMESTAMP_MONTH COMPLAINT_START_TIMESTAMP_DAY COMPLAINT_START_TIMESTAMP_WEEKDAY COMPLAINT_START_TIMESTAMP_HOUR COMPLAINT_END_TIMESTAMP COMPLAINT_END_TIMESTAMP_YEAR COMPLAINT_END_TIMESTAMP_MONTH COMPLAINT_END_TIMESTAMP_DAY COMPLAINT_END_TIMESTAMP_WEEKDAY COMPLAINT_END_TIMESTAMP_HOUR REPORTED_DATE_TIMESTAMP REPORTED_DATE_TIMESTAMP_YEAR REPORTED_DATE_TIMESTAMP_MONTH REPORTED_DATE_TIMESTAMP_DAY REPORTED_DATE_TIMESTAMP_WEEKDAY
0 2015-12-31 23:45:00 2015 12 31 Thu 23 2015-12-31 23:45:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
1 2015-12-31 23:36:00 2015 12 31 Thu 23 2015-12-31 23:36:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
2 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
3 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
4 2015-12-31 23:25:00 2015 12 31 Thu 23 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
5 2015-12-31 23:18:00 2015 12 31 Thu 23 2015-12-31 23:25:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
6 2015-12-31 23:15:00 2015 12 31 Thu 23 2015-12-31 23:15:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
7 2015-12-31 23:15:00 2015 12 31 Thu 23 2015-12-31 23:15:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
8 2015-12-31 23:15:00 2015 12 31 Thu 23 2015-12-31 23:30:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu
9 2015-12-31 23:10:00 2015 12 31 Thu 23 2015-12-31 23:10:00 2015 12 31 Thu 23 2015-12-31 2015 12 31 Thu

Perfect. Let’s move onto the next task.

Incident Length Flag & Incident Length

Very simple here. I want to

  1. I want to calculate how long the incident lasted
  2. If the incident was a single instance in time, I want to flag it in some way in another column
In [90]:
# Take the difference between start and end, expressed in minutes
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH', 
    (unix_timestamp(df_na_drop['COMPLAINT_END_TIMESTAMP']) - unix_timestamp(df_na_drop['COMPLAINT_START_TIMESTAMP']))/60
)
In [91]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [92]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_END_TIMESTAMP,
    COMPLAINT_LENGTH
FROM df_na_drop 
LIMIT 10;
Query started at 10:14:20 PM UTC; Query executed in 0.01 m
Out[92]:
COMPLAINT_START_TIMESTAMP COMPLAINT_END_TIMESTAMP COMPLAINT_LENGTH
0 2015-12-31 23:45:00 2015-12-31 23:45:00 0.0
1 2015-12-31 23:36:00 2015-12-31 23:36:00 0.0
2 2015-12-31 23:30:00 2015-12-31 23:30:00 0.0
3 2015-12-31 23:30:00 2015-12-31 23:30:00 0.0
4 2015-12-31 23:25:00 2015-12-31 23:30:00 5.0
5 2015-12-31 23:18:00 2015-12-31 23:25:00 7.0
6 2015-12-31 23:15:00 2015-12-31 23:15:00 0.0
7 2015-12-31 23:15:00 2015-12-31 23:15:00 0.0
8 2015-12-31 23:15:00 2015-12-31 23:30:00 15.0
9 2015-12-31 23:10:00 2015-12-31 23:10:00 0.0
In [93]:
# If COMPLAINT_LENGTH = 0, we flag with a new boolean column COMPLAINT_LENGTH_ZERO_TIME
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH_ZERO_TIME', when(df_na_drop['COMPLAINT_LENGTH'] == 0, True).otherwise(False)
)
In [94]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [95]:
%%read_sql
SELECT 
    COMPLAINT_LENGTH,
    COMPLAINT_LENGTH_ZERO_TIME
FROM df_na_drop 
LIMIT 10;
Query started at 10:14:21 PM UTC; Query executed in 0.00 m
Out[95]:
COMPLAINT_LENGTH COMPLAINT_LENGTH_ZERO_TIME
0 0.0 True
1 0.0 True
2 0.0 True
3 0.0 True
4 5.0 False
5 7.0 False
6 0.0 True
7 0.0 True
8 15.0 False
9 0.0 True

Lag Time Between Incident And Report Date

This one is kinda weird because report date doesn’t have a time component in this dataset, only a date. We’ll take the difference between both the COMPLAINT_START and COMPLAINT_END dates with the REPORTED_DATE. My assumption here is that the incident always happens first and then it is reported when it is completely finished. This may be a wrong assumption, but I’ll make this assumption for the sake of the subtraction and we will allow negative values if the complaint can be reported before the incident actually finished.

In [96]:
# Take the difference between start and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_START_TIMESTAMP'])))/60/60/24
)

# Take the difference between end and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_END_TIMESTAMP'])))/60/60/24
)
In [97]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")
In [98]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_END_TIMESTAMP,
    REPORTED_DATE_TIMESTAMP,
    COMPLAINT_START_REPORTED_LAG,
    COMPLAINT_END_REPORTED_LAG
FROM df_na_drop 
LIMIT 10;
Query started at 10:14:21 PM UTC; Query executed in 0.01 m
Out[98]:
COMPLAINT_START_TIMESTAMP COMPLAINT_END_TIMESTAMP REPORTED_DATE_TIMESTAMP COMPLAINT_START_REPORTED_LAG COMPLAINT_END_REPORTED_LAG
0 2015-12-31 23:45:00 2015-12-31 23:45:00 2015-12-31 0.0 0.0
1 2015-12-31 23:36:00 2015-12-31 23:36:00 2015-12-31 0.0 0.0
2 2015-12-31 23:30:00 2015-12-31 23:30:00 2015-12-31 0.0 0.0
3 2015-12-31 23:30:00 2015-12-31 23:30:00 2015-12-31 0.0 0.0
4 2015-12-31 23:25:00 2015-12-31 23:30:00 2015-12-31 0.0 0.0
5 2015-12-31 23:18:00 2015-12-31 23:25:00 2015-12-31 0.0 0.0
6 2015-12-31 23:15:00 2015-12-31 23:15:00 2015-12-31 0.0 0.0
7 2015-12-31 23:15:00 2015-12-31 23:15:00 2015-12-31 0.0 0.0
8 2015-12-31 23:15:00 2015-12-31 23:30:00 2015-12-31 0.0 0.0
9 2015-12-31 23:10:00 2015-12-31 23:10:00 2015-12-31 0.0 0.0

Okay. I think we have our final dataframe! I’m not really sure what other features I can engineer at this point without getting deeper into actual analysis, so let’s save this dataframe back out to S3 as a temporary store.

In [99]:
df_na_drop.printSchema()
root
 |-- COMPLAINT_NUMBER: integer (nullable = true)
 |-- COMPLAINT_START_DATE: string (nullable = true)
 |-- COMPLAINT_START_TIME: string (nullable = true)
 |-- COMPLAINT_END_DATE: string (nullable = true)
 |-- COMPLAINT_END_TIME: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- OFFENSE_ID: integer (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_INTERNAL_CODE: integer (nullable = true)
 |-- OFFENSE_INTERNAL_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_RESULT: string (nullable = true)
 |-- OFFENSE_LEVEL: string (nullable = true)
 |-- JURISDICTION: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- SPECIFIC_LOCATION: string (nullable = true)
 |-- PREMISE_DESCRIPTION: string (nullable = true)
 |-- PARK_NAME: string (nullable = true)
 |-- HOUSING_NAME: string (nullable = true)
 |-- X_COORD_NYC: integer (nullable = true)
 |-- Y_COORD_NYC: integer (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- LAT_LON: string (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP: timestamp (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP: timestamp (nullable = true)
 |-- REPORTED_DATE_TIMESTAMP: timestamp (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP_YEAR: string (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP_MONTH: string (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP_DAY: string (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP_WEEKDAY: string (nullable = true)
 |-- COMPLAINT_START_TIMESTAMP_HOUR: string (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP_YEAR: string (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP_MONTH: string (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP_DAY: string (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP_WEEKDAY: string (nullable = true)
 |-- COMPLAINT_END_TIMESTAMP_HOUR: string (nullable = true)
 |-- REPORTED_DATE_TIMESTAMP_YEAR: string (nullable = true)
 |-- REPORTED_DATE_TIMESTAMP_MONTH: string (nullable = true)
 |-- REPORTED_DATE_TIMESTAMP_DAY: string (nullable = true)
 |-- REPORTED_DATE_TIMESTAMP_WEEKDAY: string (nullable = true)
 |-- COMPLAINT_LENGTH: double (nullable = true)
 |-- COMPLAINT_LENGTH_ZERO_TIME: boolean (nullable = false)
 |-- COMPLAINT_START_REPORTED_LAG: double (nullable = true)
 |-- COMPLAINT_END_REPORTED_LAG: double (nullable = true)

In [100]:
df_clean = df_na_drop[[
    'COMPLAINT_NUMBER', 
    'COMPLAINT_START_TIMESTAMP',
    'COMPLAINT_END_TIMESTAMP',
    'REPORTED_DATE_TIMESTAMP',
    'COMPLAINT_START_TIMESTAMP_YEAR',
    'COMPLAINT_START_TIMESTAMP_MONTH',
    'COMPLAINT_START_TIMESTAMP_DAY',
    'COMPLAINT_START_TIMESTAMP_WEEKDAY',
    'COMPLAINT_START_TIMESTAMP_HOUR',
    'COMPLAINT_END_TIMESTAMP_YEAR',
    'COMPLAINT_END_TIMESTAMP_MONTH',
    'COMPLAINT_END_TIMESTAMP_DAY',
    'COMPLAINT_END_TIMESTAMP_WEEKDAY',
    'COMPLAINT_END_TIMESTAMP_HOUR',
    'REPORTED_DATE_TIMESTAMP_YEAR',
    'REPORTED_DATE_TIMESTAMP_MONTH',
    'REPORTED_DATE_TIMESTAMP_DAY',
    'REPORTED_DATE_TIMESTAMP_WEEKDAY',
    'COMPLAINT_LENGTH',
    'COMPLAINT_LENGTH_ZERO_TIME',
    'COMPLAINT_START_REPORTED_LAG',
    'COMPLAINT_END_REPORTED_LAG',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'LAT',
    'LON'
]]
In [26]:
# Save CSV back to S3
df_clean.write.parquet('s3n://2017edmfasatb/nypd_complaints/data/df_clean.parquet')
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-26-3c8aba718f02> in <module>()
      1 # Save CSV back to S3
----> 2 df_clean.write.parquet('s3n://2017edmfasatb/nypd_complaints/data/df_clean.parquet')

/usr/lib/spark/python/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
    689             self.partitionBy(partitionBy)
    690         self._set_opts(compression=compression)
--> 691         self._jwrite.parquet(path)
    692 
    693     @since(1.6)

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o266.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 8.0 failed 4 times, most recent failure: Lost task 8.3 in stage 8.0 (TID 33, ip-10-0-0-154.ec2.internal, executor 2): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-11-ca9fdfbab274>", line 2, in <lambda>
AttributeError: 'NoneType' object has no attribute 'year'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1569)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1557)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1556)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1556)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:815)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:815)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:815)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1784)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1739)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1728)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:631)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
	... 45 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-11-ca9fdfbab274>", line 2, in <lambda>
AttributeError: 'NoneType' object has no attribute 'year'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
	... 8 more

Okay – this is pretty interesting. Scrolling through that whole error dump, we see that we eventually see an error message:

‘NoneType’ object has no attribute ‘year’

Well, the only time we use the year attribute of anything is where we defined the UDF

extract_year =  udf(lambda x: x.year)

And we used this UDF when extracting the year of the COMPLAINT_START date. What’s weird is that we never got this error before, and we already not only used, but built on top of that part of the code already! We’ve done so many more modifications to the dataframe since then. My hypothesis about what’s happening here is that Spark is lazy in its execution. So far in our analysis, we’ve never really taken a look at more than 10 rows when dealing with the datetime and timestamp fields. We always just take a small sample of the data to make sure the operations we’ve built are doing what they should be. Spark has actually never had to run that calculation on an entire dataframe… until now!.

I guess when I replaced NAs in my original function, there are actually “None” values (maybe not the same as NaN, which maybe my function was searching for). Let’s reframe our NA detecting function to see what’s happening with None.

In [29]:
%%read_sql
SELECT COUNT(*) FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL
Query started at 06:21:25 PM UTC; Query executed in 0.43 m
Out[29]:
count(1)
0 844

Remember in our last post, after dropping the NAs, we used the following function we found off stackoverflow to count the NAs in our dataframe, and in fact, we were seeing zero.

def count_not_null(c, nan_as_null=False):
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

Clearly, there are intricacies about the Spark null types that I have not fully grasped yet. I’m also starting to explore how the different APIs interpret this as well. In ANSI SQL, the only null type that I know if is literally called NULL, which we see in the above SQL query, there are in fact NULLs that exist in the SQL world. I guess these correlate to the Nones in the Pyspark world, but how exactly do we get rid of these if, well, our na.drop() function of the Pyspark dataframe wasn’t able to get it in the first place?

In [30]:
df_na_drop.where(col("COMPLAINT_START_TIMESTAMP").isNull()).count()
Out[30]:
844

Alright well there you go, there does seem like the Nones are registering, which I believe correlates directly to the na.drop() function, so the original COMPLAINT_START fields should not be null…

In [31]:
%%read_sql
SELECT COMPLAINT_START_DATE, COMPLAINT_START_TIME, COMPLAINT_START_TIMESTAMP FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL LIMIT 20
Query started at 06:46:01 PM UTC; Query executed in 0.71 m
Out[31]:
COMPLAINT_START_DATE COMPLAINT_START_TIME COMPLAINT_START_TIMESTAMP
0 08/31/2009 24:00:00 None
1 08/31/2009 24:00:00 None
2 08/18/2009 24:00:00 None
3 08/18/2009 24:00:00 None
4 01/09/2009 24:00:00 None
5 08/24/2009 24:00:00 None
6 08/16/2009 24:00:00 None
7 07/21/2009 24:00:00 None
8 08/15/2009 24:00:00 None
9 08/12/2009 24:00:00 None
10 08/04/2009 24:00:00 None
11 08/08/2009 24:00:00 None
12 08/07/2009 24:00:00 None
13 08/07/2009 24:00:00 None
14 06/01/2007 24:00:00 None
15 07/06/2009 24:00:00 None
16 08/05/2009 24:00:00 None
17 08/04/2009 24:00:00 None
18 06/27/2009 24:00:00 None
19 08/01/2009 24:00:00 None

Okay, I think I see the problem now. It wasn’t a Null that was in the original dataset, it was a Null that occurred when trying to concatenate the DATE and TIME columns into the TIMESTAMP column… Further, I think the problem is that the time is being expressed as 24:00:00 rather than 00:00:00, as that’s the most obvious commonality of all these rows which have Null TIMESTAMPs.

In [32]:
%%read_sql
SELECT DISTINCT COMPLAINT_START_TIME FROM df_na_drop WHERE COMPLAINT_START_TIMESTAMP IS NULL
Query started at 06:57:09 PM UTC; Query executed in 0.65 m
Out[32]:
COMPLAINT_START_TIME
0 24:00:00

Confirmed. That’s all the correlation I need. I think it’s actually easier at this point to start another notebook and run the code back from the beginning while making this change because it’s smack dab in the middle of this notebook. If I wanted to integrate the change back in here, I would have to re-run half the notebook and my commentary would no longer match up with the code and results, so let’s call it here and continue in the next post.

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s