NYPD Crime #6 – Data Exploration (Part I – Spark Basics)

Intro

Alright, now we know the ins and outs of Spark!

Spark be like:

And, yes Spark, you would be absolutely correct. I won’t pretend to be a Spark expert after scouring the internet for random tutorial videos, but you know what? I feel like I have enough knowledge to get started. So, let’s do just that.

Now, having the data on S3 really helps us here, and I’m starting to quickly appreciate the integrated-ness (definitely not a word) of AWS’ suite of services. In the case of EMR, S3 actually integrates directly with it as a distributed storage source (replacing HDFS). All this is abstracted to us, and I’ll just keep my mind naive for now until I need to deal with that.

First, I have to set up a few packages that will help me integrate Spark with Jupyter. By default, EMR comes with the Zeppelin notebook which integrates directly into Spark out of the box, but I’m going with Jupyter because it’s what I’m more familiar with (Zeppelin actually looks pretty cool though, built in interactive visualizations from SQL results!) and it translates better to a blog.

The findspark package will allow us to connect up SparkSession with our Jupyter notebook, and the sql_magic package will allow us to add the SQL “Magic” command capabilities, that is, to actually write in-line SQL statements to query our Spark dataframes using Spark SQL (back in the all-nba predict days, we were using the R magic commands to pass dataframes back and forth between R and Python).

In [1]:
import os
os.system("sudo pip install findspark sql_magic")
Out[1]:
0

Initiate Spark

Let’s use the findspark package to connect Jupyter up to the Spark shell. The first thing we do apparently in any new Spark application is initiate a SparkSession. When you initiate a Spark shell from command line, or open up Zeppelin, a SparkSession will already be instantiated as the spark object. In Jupyter, we can either actually set up Jupyter on the server-side to recognize the Spark shell, or we can use findspark to be able to work within the Spark shell within our Python shell, which we have chosen Jupyter to open up with. Therefore, we have to initiate this SparkSession ourselves. The SparkSession is an object that will provide an entry point into all the different APIs (e.g. Spark SQL that we will be using later) and also tracks the application being spun up so we can view it and tag it in the Spark History Server console.

In [1]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')
In [2]:
# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
import pandas as pd
In [3]:
# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

SQL Magic

Okay, now we have a SparkSession initiated as spark like we would have if we just opened up a Spark shell. Next up, we have to load the sql_magic package so we have Spark SQL capabilities later on when we play around with the data.

We load the “Magic” package and configure it to be hooked up to Spark. Reading up on sql_magic a little bit, it’s actually pretty cool because it provides an entry point into multiple technologies, one of which is Spark, but others as well that include sqlalchemy connections… We can pull from a database inline with SQL! Amazing!

I immediately thought of that, but NO, I WILL STICK TO MY WORD. IT’S GODDAMN AMAZING.

In [4]:
%load_ext sql_magic
%config SQL.conn_name = 'spark'

Read CSV

Spark 2.0 and on makes it relatively easy to read a CSV, let’s try it:

In [5]:
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
    "s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv", 
    header = True, 
    inferSchema = True
)
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 53.1 s

45 seconds to load a ~1.5G dataset… a bit on the longer side I would imagine? The dataset is not that big, but remember I’m kind of coming into this knowing that the data set size is on the smaller side to be working with Spark or EMR.

In [6]:
# Describe df
df.printSchema()
root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Lat_Lon: string (nullable = true)

We see here that the dataframe has loaded successfully, and that the headers and column types have been inferred somewhat. Let’s try to do a head() on the dataframe.

In [7]:
%%time
# See df
df.head(10)
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 625 ms
Out[7]:
[Row(CMPLNT_NUM=101109527, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:45:00', CMPLNT_TO_DT=None, CMPLNT_TO_TM=None, RPT_DT='12/31/2015', KY_CD=113, OFNS_DESC='FORGERY', PD_CD=729, PD_DESC='FORGERY,ETC.,UNCLASSIFIED-FELO', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='FELONY', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='BRONX', ADDR_PCT_CD=44, LOC_OF_OCCUR_DESC='INSIDE', PREM_TYP_DESC='BAR/NIGHT CLUB', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=1007314, Y_COORD_CD=241257, Latitude=40.828848333, Longitude=-73.916661142, Lat_Lon='(40.828848333, -73.916661142)'),
 Row(CMPLNT_NUM=153401121, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:36:00', CMPLNT_TO_DT=None, CMPLNT_TO_TM=None, RPT_DT='12/31/2015', KY_CD=101, OFNS_DESC='MURDER & NON-NEGL. MANSLAUGHTER', PD_CD=None, PD_DESC=None, CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='FELONY', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='QUEENS', ADDR_PCT_CD=103, LOC_OF_OCCUR_DESC='OUTSIDE', PREM_TYP_DESC=None, PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=1043991, Y_COORD_CD=193406, Latitude=40.697338138, Longitude=-73.784556739, Lat_Lon='(40.697338138, -73.784556739)'),
 Row(CMPLNT_NUM=569369778, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:30:00', CMPLNT_TO_DT=None, CMPLNT_TO_TM=None, RPT_DT='12/31/2015', KY_CD=117, OFNS_DESC='DANGEROUS DRUGS', PD_CD=503, PD_DESC='CONTROLLED SUBSTANCE,INTENT TO', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='FELONY', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='MANHATTAN', ADDR_PCT_CD=28, LOC_OF_OCCUR_DESC=None, PREM_TYP_DESC='OTHER', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=999463, Y_COORD_CD=231690, Latitude=40.802606608, Longitude=-73.945051911, Lat_Lon='(40.802606608, -73.945051911)'),
 Row(CMPLNT_NUM=968417082, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:30:00', CMPLNT_TO_DT=None, CMPLNT_TO_TM=None, RPT_DT='12/31/2015', KY_CD=344, OFNS_DESC='ASSAULT 3 & RELATED OFFENSES', PD_CD=101, PD_DESC='ASSAULT 3', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='MISDEMEANOR', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='QUEENS', ADDR_PCT_CD=105, LOC_OF_OCCUR_DESC='INSIDE', PREM_TYP_DESC='RESIDENCE-HOUSE', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=1060183, Y_COORD_CD=177862, Latitude=40.654549444, Longitude=-73.726338791, Lat_Lon='(40.654549444, -73.726338791)'),
 Row(CMPLNT_NUM=641637920, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:25:00', CMPLNT_TO_DT='12/31/2015', CMPLNT_TO_TM='23:30:00', RPT_DT='12/31/2015', KY_CD=344, OFNS_DESC='ASSAULT 3 & RELATED OFFENSES', PD_CD=101, PD_DESC='ASSAULT 3', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='MISDEMEANOR', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='MANHATTAN', ADDR_PCT_CD=13, LOC_OF_OCCUR_DESC='FRONT OF', PREM_TYP_DESC='OTHER', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=987606, Y_COORD_CD=208148, Latitude=40.7380024, Longitude=-73.98789129, Lat_Lon='(40.7380024, -73.98789129)'),
 Row(CMPLNT_NUM=365661343, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:18:00', CMPLNT_TO_DT='12/31/2015', CMPLNT_TO_TM='23:25:00', RPT_DT='12/31/2015', KY_CD=106, OFNS_DESC='FELONY ASSAULT', PD_CD=109, PD_DESC='ASSAULT 2,1,UNCLASSIFIED', CRM_ATPT_CPTD_CD='ATTEMPTED', LAW_CAT_CD='FELONY', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='BROOKLYN', ADDR_PCT_CD=71, LOC_OF_OCCUR_DESC='FRONT OF', PREM_TYP_DESC='DRUG STORE', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=996149, Y_COORD_CD=181562, Latitude=40.665022689, Longitude=-73.957110763, Lat_Lon='(40.665022689, -73.957110763)'),
 Row(CMPLNT_NUM=608231454, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:15:00', CMPLNT_TO_DT=None, CMPLNT_TO_TM=None, RPT_DT='12/31/2015', KY_CD=235, OFNS_DESC='DANGEROUS DRUGS', PD_CD=511, PD_DESC='CONTROLLED SUBSTANCE, POSSESSI', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='MISDEMEANOR', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='MANHATTAN', ADDR_PCT_CD=7, LOC_OF_OCCUR_DESC='OPPOSITE OF', PREM_TYP_DESC='STREET', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=987373, Y_COORD_CD=201662, Latitude=40.720199996, Longitude=-73.988735082, Lat_Lon='(40.720199996, -73.988735082)'),
 Row(CMPLNT_NUM=265023856, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:15:00', CMPLNT_TO_DT='12/31/2015', CMPLNT_TO_TM='23:15:00', RPT_DT='12/31/2015', KY_CD=118, OFNS_DESC='DANGEROUS WEAPONS', PD_CD=792, PD_DESC='WEAPONS POSSESSION 1 & 2', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='FELONY', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='BRONX', ADDR_PCT_CD=46, LOC_OF_OCCUR_DESC='FRONT OF', PREM_TYP_DESC='STREET', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=1009041, Y_COORD_CD=247401, Latitude=40.845707148, Longitude=-73.910398033, Lat_Lon='(40.845707148, -73.910398033)'),
 Row(CMPLNT_NUM=989238731, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:15:00', CMPLNT_TO_DT='12/31/2015', CMPLNT_TO_TM='23:30:00', RPT_DT='12/31/2015', KY_CD=344, OFNS_DESC='ASSAULT 3 & RELATED OFFENSES', PD_CD=101, PD_DESC='ASSAULT 3', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='MISDEMEANOR', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='BRONX', ADDR_PCT_CD=48, LOC_OF_OCCUR_DESC='INSIDE', PREM_TYP_DESC='RESIDENCE - APT. HOUSE', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=1014154, Y_COORD_CD=251416, Latitude=40.856711291, Longitude=-73.891899956, Lat_Lon='(40.856711291, -73.891899956)'),
 Row(CMPLNT_NUM=415095955, CMPLNT_FR_DT='12/31/2015', CMPLNT_FR_TM='23:10:00', CMPLNT_TO_DT='12/31/2015', CMPLNT_TO_TM='23:10:00', RPT_DT='12/31/2015', KY_CD=341, OFNS_DESC='PETIT LARCENY', PD_CD=338, PD_DESC='LARCENY,PETIT FROM BUILDING,UN', CRM_ATPT_CPTD_CD='COMPLETED', LAW_CAT_CD='MISDEMEANOR', JURIS_DESC='N.Y. POLICE DEPT', BORO_NM='MANHATTAN', ADDR_PCT_CD=19, LOC_OF_OCCUR_DESC='INSIDE', PREM_TYP_DESC='DRUG STORE', PARKS_NM=None, HADEVELOPT=None, X_COORD_CD=994327, Y_COORD_CD=218211, Latitude=40.765617688, Longitude=-73.96362342, Lat_Lon='(40.765617688, -73.96362342)')]

Alright, we’ve only issued like 2 commands, but a whirlwind just happened in the back end… Let’s try to break it down.

Initial state

First of all, when we spun up this EMR cluster, we get an architecture like this:

It’s not the greatest diagram, but simple is better right? As painful to the eyes as this diagram is, please bear with me haha. The master and 2 workers are the EC2 nodes that our EMR cluster spun up. The S3 stack is our storage, which acts as our distributed file system for our EMR cluster.

The master is where the EMR applications are installed, and whether you’re reaching Zeppelin, Spark History Server, or Hue, you would be connecting to the master node. Before I opened this Jupyter notebook, I SSH’ed into the master node and did a simple

sudo pip install jupyter

and Jupyter was available on the master node at port 8889 (8888 was used for Zeppelin by default on EMR). Hopefully everything makes sense so far! As I’m working in this notebook right now, I’m working on the master node. The master node is not only providing an interface for me to interact with Spark via Jupyter, but its also acting as my Spark application’s master who is generating the DAG, pushing individual tasks as defined by the DAG down to the appropriate workers, and collecting the results back from the workers and presenting it back to me when I run an action.

spark.read.csv

When I read a csv into a dataframe within Spark, this is the first time I’m now interacting with any data on S3 (remember, this is where my raw CSV file is hosted). I’m not 100% sure about this, but I don’t actually think any data is loaded when you perform spark.read (Spark’s dataframe reader). Because I’ve specified for spark.read to

  1. Look for headers
  2. Infer a schema (try to guess which fields are text or numbers etc)

It actually has to scan through the entire data set (or at least sample a sizeable portion of the data set) to confidently say that this column is a string and that column is an integer. I think that’s why the spark.read function took 35 seconds. At this point, however, no data has actually be read into the memory of our workers at all, Spark has simply mapped our data structure, enough that we can run a

df.printSchema()

command! Note that Spark doesn’t actually have to show any data on this command, just the skeleton of the dataframe! This actually makes me wonder because I’m pretty sure if I loaded a Pandas dataframe of this 1.5G data set, it wouldn’t take more than 35 seconds. I could just be talking out of my arse though, maybe a comparison to try for another day! At this point, we’ll say that 35 seconds is fair as it does have to scan the dataframe to infer the schema (and it probably does this in a distributed way)

head()

When we try a head() command, now we are starting to load some data into memory. I’ve specified head(10), so we’re only looking at the first 10 rows of the dataframe, so it only has to load a subset of the data into memory. We see that it took around 10 seconds to load the first 10 rows of the dataframe. Is this efficient? With the size of data we pulled? Probably not. I could imagine a head(10) on a normal Pandas dataframe doesn’t take more than a few milliseconds! However, we see some of the distributed overhead at work here. It took 10 seconds to roughly do something like this:


I want to make it absolutely clear that I’m not saying that these specific workers went and got these specific rows. I’m not quite sure which workers got which rows, but I wanted to illustrate one possibility in which the workers could have split up the task. The Master ultimately decides how the workers will distribute the tasks and this is because the Master has access to a metadata store which quickly tells the Master which storage nodes / partitions each piece of data lies in.

Onwards

Okay! Our head() function provided a not-so-friendly output, but we actually see some data. Usually, with vanilla Python and Pandas, Jupyter will output your dataframe nicely:

In [8]:
pd.DataFrame({
    'a': [1, 2],
    'b': [3, 4]
})
Out[8]:
a b
0 1 3
1 2 4

One thing we can do is actually introduce Spark SQL here. Spark SQL, by default within the SQL Magic library, executes the distributed Spark DAG and collects the result into a Pandas dataframe, we can then view the Pandas dataframe.

We first have to register the Spark dataframe as a “temporary table”, which opens up the dataframe to SQLContext for the Spark SQL API so we can perform SQL queries on it.

Spark SQL

In [9]:
# Register temporary table
df.createOrReplaceTempView("df")
In [10]:
# Perform the SQL equivalent of "head"
result = %read_sql SELECT * FROM df LIMIT 10;
result
Query started at 07:24:56 PM UTC; Query executed in 0.01 m
Out[10]:
CMPLNT_NUM CMPLNT_FR_DT CMPLNT_FR_TM CMPLNT_TO_DT CMPLNT_TO_TM RPT_DT KY_CD OFNS_DESC PD_CD PD_DESC ADDR_PCT_CD LOC_OF_OCCUR_DESC PREM_TYP_DESC PARKS_NM HADEVELOPT X_COORD_CD Y_COORD_CD Latitude Longitude Lat_Lon
0 101109527 12/31/2015 23:45:00 None None 12/31/2015 113 FORGERY 729.0 FORGERY,ETC.,UNCLASSIFIED-FELO 44 INSIDE BAR/NIGHT CLUB None None 1007314 241257 40.828848 -73.916661 (40.828848333, -73.916661142)
1 153401121 12/31/2015 23:36:00 None None 12/31/2015 101 MURDER & NON-NEGL. MANSLAUGHTER NaN None 103 OUTSIDE None None None 1043991 193406 40.697338 -73.784557 (40.697338138, -73.784556739)
2 569369778 12/31/2015 23:30:00 None None 12/31/2015 117 DANGEROUS DRUGS 503.0 CONTROLLED SUBSTANCE,INTENT TO 28 None OTHER None None 999463 231690 40.802607 -73.945052 (40.802606608, -73.945051911)
3 968417082 12/31/2015 23:30:00 None None 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 105 INSIDE RESIDENCE-HOUSE None None 1060183 177862 40.654549 -73.726339 (40.654549444, -73.726338791)
4 641637920 12/31/2015 23:25:00 12/31/2015 23:30:00 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 13 FRONT OF OTHER None None 987606 208148 40.738002 -73.987891 (40.7380024, -73.98789129)
5 365661343 12/31/2015 23:18:00 12/31/2015 23:25:00 12/31/2015 106 FELONY ASSAULT 109.0 ASSAULT 2,1,UNCLASSIFIED 71 FRONT OF DRUG STORE None None 996149 181562 40.665023 -73.957111 (40.665022689, -73.957110763)
6 608231454 12/31/2015 23:15:00 None None 12/31/2015 235 DANGEROUS DRUGS 511.0 CONTROLLED SUBSTANCE, POSSESSI 7 OPPOSITE OF STREET None None 987373 201662 40.720200 -73.988735 (40.720199996, -73.988735082)
7 265023856 12/31/2015 23:15:00 12/31/2015 23:15:00 12/31/2015 118 DANGEROUS WEAPONS 792.0 WEAPONS POSSESSION 1 & 2 46 FRONT OF STREET None None 1009041 247401 40.845707 -73.910398 (40.845707148, -73.910398033)
8 989238731 12/31/2015 23:15:00 12/31/2015 23:30:00 12/31/2015 344 ASSAULT 3 & RELATED OFFENSES 101.0 ASSAULT 3 48 INSIDE RESIDENCE – APT. HOUSE None None 1014154 251416 40.856711 -73.891900 (40.856711291, -73.891899956)
9 415095955 12/31/2015 23:10:00 12/31/2015 23:10:00 12/31/2015 341 PETIT LARCENY 338.0 LARCENY,PETIT FROM BUILDING,UN 19 INSIDE DRUG STORE None None 994327 218211 40.765618 -73.963623 (40.765617688, -73.96362342)

10 rows × 24 columns

Another 2 commands… another time to stop and review haha. One of the most important things to realize here is that Spark itself is a single core processing engine. It’s got many APIs built on top, two of which I just used: Pyspark (Python) and Spark SQL, but these are just that – APIs. Spark is taking on the overhead of translating these user-friendly commands to a core Spark API DAG. In essence, this is what Spark is! It’s simply a translating service for my dumb brain to work in languages I’m familiar with so I don’t have to worry about how it all abstracts to a distributed world. The two commands I just used to pull the first 10 rows of the dataframe looked very different from each other (df.head() vs SELECT * … LIMIT 10), but in fact they are executed no differently in Spark! There is absolutely differences in overhead while translating these APIs to a DAG, but once the DAG is created, theoretically it is the same every time.

In the case of Spark SQL, this is essentially what happened:


We see the major differences here all lie in the Spark Master node. We can start to appreciate the role that the Master plays in all of this.

In the first image, the Master:

  • Hosts the Jupyter notebook
  • Provides the contact point for the client to interact with
  • Translates the input Spark SQL command to a DAG
  • Decides how to distribute DAG components amongst the workers to get the job done

In the second image, the Master:

  • Receives distributed results from workers
  • Aggregates the results
  • Converts the Spark results to a Pandas dataframe, which has now been transferred to the context of the Python interpreter on the Master (Spark is no longer involved at this point)
  • Formats and presents this Pandas dataframe back to the user via Jupyter

Quite a bit of responsibility on one guy’s shoulders! Now that we have a general sense of how Spark processes a task, we can start to do some serious data exploration and data cleansing!

In [7]:
date_func =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
time_func =  udf(lambda x: datetime.strptime(x, '%H:%M:%S'), DateType())

df = df.fillna('01/01/1900', subset = ['CMPLNT_FR_DT', 'CMPLNT_TO_DT', 'RPT_DT'])
df = df.fillna('00:00:00', subset = ['CMPLNT_FR_TM', 'CMPLNT_TO_TM'])

df = df.withColumn('CMPLNT_FR_DT_FORMATTED', date_func(col('CMPLNT_FR_DT')))
df = df.withColumn('CMPLNT_TO_DT_FORMATTED', date_func(col('CMPLNT_TO_DT')))
df = df.withColumn('RPT_DT_FORMATTED', date_func(col('RPT_DT')))
df = df.withColumn('CMPLNT_FR_TM_FORMATTED', time_func(col('CMPLNT_FR_TM')))
df = df.withColumn('CMPLNT_TO_TM_FORMATTED', time_func(col('CMPLNT_TO_TM')))

df.printSchema()
root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = false)
 |-- CMPLNT_FR_TM: string (nullable = false)
 |-- CMPLNT_TO_DT: string (nullable = false)
 |-- CMPLNT_TO_TM: string (nullable = false)
 |-- RPT_DT: string (nullable = false)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Lat_Lon: string (nullable = true)
 |-- CMPLNT_FR_DT_FORMATTED: date (nullable = true)
 |-- CMPLNT_TO_DT_FORMATTED: date (nullable = true)
 |-- RPT_DT_FORMATTED: date (nullable = true)
 |-- CMPLNT_FR_TM_FORMATTED: date (nullable = true)
 |-- CMPLNT_TO_TM_FORMATTED: date (nullable = true)

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s