NYPD Crime #18 – Clustering To Explore Neighbourhoods (Part III – Continued Because Spark Hates Me)

Review

To sum up the last post, our driver’s RAM was essentially the bottleneck and what was causing our Spark application and the underlying JVM to crash. Before, we were using 3 AWS m4.large (8GB RAM) boxes for our master + 2 workers. In this notebook, I’ve spawned a new cluster keeping my workers the same but using a m4.xlarge (16GB RAM) for my master:

I have my doubts as to whether even this will be enough, but let’s try it out and see. I’ll quickly run all the code that gets me to the point where my Spark application crashed last time.

In [30]:
import os
os.system("sudo pip install findspark sql_magic pyspark_dist_explore seaborn")
Out[30]:
0
In [31]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')

# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'

# Load other libraries
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd
import numpy as np
import seaborn as sns

# Graphing with matplotlib
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
The sql_magic extension is already loaded. To reload it, use:
  %reload_ext sql_magic
In [32]:
# See spark config parameters
spark.sparkContext.getConf().getAll()
Out[32]:
[('spark.eventLog.enabled', 'true'),
 ('spark.executor.memory', '4771M'),
 ('spark.ui.proxyBase', '/proxy/application_1508722456237_0001'),
 ('spark.driver.extraLibraryPath',
  '/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'),
 ('spark.yarn.appMasterEnv.SPARK_HOME', '/usr/lib/spark'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'ip-10-0-0-188.ec2.internal'),
 ('spark.driver.port', '41298'),
 ('spark.executor.cores', '4'),
 ('spark.executor.extraJavaOptions',
  "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"),
 ('spark.eventLog.dir', 'hdfs:///var/log/spark/apps'),
 ('spark.executor.instances', '2'),
 ('spark.sql.hive.metastore.sharedPrefixes',
  'com.amazonaws.services.dynamodbv2'),
 ('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.appUIAddress', 'http://ip-10-0-0-188.ec2.internal:4040'),
 ('spark.driver.memory', '5585M'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executorEnv.PYTHONPATH',
  '/home/ec2-user/src/cntk/bindings/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.4-src.zip'),
 ('spark.history.fs.logDirectory', 'hdfs:///var/log/spark/apps'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.driver.extraClassPath',
  '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar'),
 ('spark.yarn.historyServer.address', 'ip-10-0-0-188.ec2.internal:18080'),
 ('spark.executor.extraClassPath',
  '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar'),
 ('spark.history.ui.port', '18080'),
 ('spark.driver.host', '10.0.0.188'),
 ('spark.shuffle.service.enabled', 'true'),
 ('spark.hadoop.yarn.timeline-service.enabled', 'false'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://ip-10-0-0-188.ec2.internal:20888/proxy/application_1508722456237_0001'),
 ('spark.executor.id', 'driver'),
 ('spark.app.id', 'application_1508722456237_0001'),
 ('spark.driver.extraJavaOptions',
  "-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.master', 'yarn'),
 ('spark.default.parallelism', '16'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.extraLibraryPath',
  '/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'),
 ('spark.yarn.isPython', 'true'),
 ('spark.dynamicAllocation.enabled', 'true')]

Load Data

In [33]:
%%time
# Read NYPD Complaint Data
df_filtered = spark.read.parquet("s3n://2017edmfasatb/nypd_complaints/data/df_filtered.parquet")
df_filtered.cache()
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 854 ms

K-Means

In [34]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Set seed for ability to reproduce results, 20 clusters
kmeans = KMeans(k = 20, seed = 1)

# Initiate and transform columns into vector
vecAssembler = VectorAssembler(inputCols = ['LAT', 'LON'], outputCol = "features")
k_means_input = vecAssembler.transform(df_filtered)
In [35]:
%%time
# Refit model
model = kmeans.fit(k_means_input[['features']])
CPU times: user 24 ms, sys: 4 ms, total: 28 ms
Wall time: 2min 18s
In [36]:
%%time
# Use model to assign the samples a cluster to belong to
prediction = model.transform(k_means_input[['features']])
print(prediction.head(5))
[Row(features=DenseVector([40.8288, -73.9167]), prediction=0), Row(features=DenseVector([40.6973, -73.7846]), prediction=10), Row(features=DenseVector([40.8026, -73.9451]), prediction=19), Row(features=DenseVector([40.6545, -73.7263]), prediction=10), Row(features=DenseVector([40.738, -73.9879]), prediction=6)]
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 191 ms

Join K-Means Back To Dataframe

In [37]:
# Since there are no common column between these two dataframes add row_index so that it can be joined
df_filtered_indexed = df_filtered.withColumn('row_index', F.monotonically_increasing_id())
df_filtered.unpersist()

prediction_indexed = prediction.withColumn('row_index', F.monotonically_increasing_id())
prediction.unpersist()
Out[37]:
DataFrame[features: vector, prediction: int]
In [38]:
# Perform join on our generated ID row_index
df_predicted = df_filtered_indexed.join(prediction_indexed, on = ['row_index'], how = 'left').drop('row_index')
df_filtered_indexed.unpersist()
prediction_indexed.unpersist()
Out[38]:
DataFrame[features: vector, prediction: int, row_index: bigint]
In [39]:
# Preview results
df_predicted.head(2)
Out[39]:
[Row(COMPLAINT_NUMBER=101109527, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='BRONX', PRECINCT=44, SPECIFIC_LOCATION='INSIDE', PREMISE_DESCRIPTION='BAR/NIGHT CLUB', LAT=40.828848333, LON=-73.916661142, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.8288, -73.9167]), prediction=0),
 Row(COMPLAINT_NUMBER=153401121, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='QUEENS', PRECINCT=103, SPECIFIC_LOCATION='OUTSIDE', PREMISE_DESCRIPTION='OTHER', LAT=40.697338138, LON=-73.784556739, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.6973, -73.7846]), prediction=10)]

Master

Workers

Alright… let’s do it again…

Observation 1: Let’s back up to my Spark config settings. If you were watching closely, you’d see that my driver memory is STILL AT 5.5GB despite my master node having 16GB RAM now… WHY SPARK WHY?? or, rather, WHY EMR WHY?? I thought setting the maximizeResourceAllocation EMR Spark config parameter would automatically set this to something like 11GB… guess not? Grr…

Observation 2: If we speed up to the Ganglia charts now, we see our used memory on the master node is past 5.5GB anyways… shouldn’t this have crashed already?

Observation 3: Only 1 worker seems active right now, but this is consistent with the last notebook as well. Screaming was fun though, so…

That 5.5GB driver memory concerns me a bit, but since we’re already over, let’s continue to see if the settings are actually configured differently than what the config output is telling me.

Viewing Cluster Centers

In [40]:
# Add table to SQL Context
df_predicted.createOrReplaceTempView("df_predicted")
In [41]:
cluster_stats_result = %read_sql \
SELECT \
    prediction, \
    COUNT(*) AS NUM_SAMPLES, \
    AVG(LAT) AS LAT_CENTRE, \
    AVG(LON) AS LON_CENTRE \
FROM df_predicted \
GROUP BY \
    prediction
Query started at 03:02:23 AM UTC; Query executed in 0.35 m
In [42]:
# See cluster centres
fig, ax = plt.subplots()
sns.regplot(
    x = "LON_CENTRE", 
    y = "LAT_CENTRE", 
    data = cluster_stats_result, 
    fit_reg = False, 
    scatter_kws = {'s': cluster_stats_result['NUM_SAMPLES'] / cluster_stats_result['NUM_SAMPLES'].max() * 150},
    ax = ax
)
cluster_stats_result[['LON_CENTRE','LAT_CENTRE','prediction']].apply(lambda x: ax.text(*x), axis=1);
nypd_crime_18_1

Master

Workers

We see the executors working to execute on the Spark SQL query, and the result is stored back into the driver (only takes a small memory bump here).

One-Hot Encoding

This is where we crashed 2 notebooks ago, so let’s cross our fingers and hope that the extra driver memory works. Let’s start with baby steps and just try to encode OFFENSE_LEVEL although we’ll aim to encode

  • OFFENSE_DESCRIPTION
  • OFFENSE_RESULT
  • OFFENSE_LEVEL
  • PREMISE_DESCRIPTION
In [43]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# One Hot Encode
stringIndexer = StringIndexer(inputCol = "OFFENSE_LEVEL", outputCol = "OFFENSE_LEVEL_INDEX")
model = stringIndexer.fit(df_predicted)
indexed = model.transform(df_predicted)

encoder = OneHotEncoder(inputCol = "OFFENSE_LEVEL_INDEX", outputCol = "OFFENSE_LEVEL_ENC")
encoded = encoder.transform(indexed)

Hurrah…? I think it worked…

In [44]:
# Preview results
encoded.head(2)
Out[44]:
[Row(COMPLAINT_NUMBER=101109527, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='BRONX', PRECINCT=44, SPECIFIC_LOCATION='INSIDE', PREMISE_DESCRIPTION='BAR/NIGHT CLUB', LAT=40.828848333, LON=-73.916661142, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.8288, -73.9167]), prediction=0, OFFENSE_LEVEL_INDEX=1.0, OFFENSE_LEVEL_ENC=SparseVector(2, {1: 1.0})),
 Row(COMPLAINT_NUMBER=153401121, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='QUEENS', PRECINCT=103, SPECIFIC_LOCATION='OUTSIDE', PREMISE_DESCRIPTION='OTHER', LAT=40.697338138, LON=-73.784556739, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.6973, -73.7846]), prediction=10, OFFENSE_LEVEL_INDEX=1.0, OFFENSE_LEVEL_ENC=SparseVector(2, {1: 1.0}))]

There we go! It’s not the traditional python one-hot encoder that I’d expect from sklearn, but it is returning some result!

Let’s check out the memory usage real quick as well.

Master

Workers

More added to the master. It looks like that 5.5GB driver mem limit was a bit misleading… Let’s maybe circle back to this if we run into any more issues going forward.

Sparse Vectors

More importantly, at this point, what the heck is a Sparse Vector exactly? Databricks provides this useful image:

The image above doesn’t show one-hot encoding, but it does illustrate a sparse vector very well. A sparse vector essentially doesn’t store any zeroes to save space, and can be read by Spark ML’s ML models. However, to achieve what we want to achieve (averages / totals of the various OFFENSE_LEVELs for each cluster), I think we’d rather have the data laid out as a dense vector, or even better, as actual columns… While this is not the most memory efficient (it is destroying the exact purpose of a sparse vector), it’s the only way we’ll actually be able to make our calculations.

Let’s one-hot encode the other columns we’re interested in and then figure the sparse vector to columns transformation afterwards.

In [45]:
# One Hot Encode
for column in ['OFFENSE_DESCRIPTION', 'OFFENSE_RESULT', 'PREMISE_DESCRIPTION']:
    indexed.unpersist()    
    stringIndexer = StringIndexer(inputCol = column, outputCol = column + "_INDEX")
    model = stringIndexer.fit(encoded)
    indexed = model.transform(encoded)

    encoded.unpersist()
    encoder = OneHotEncoder(inputCol = column + "_INDEX", outputCol = column + "_ENC")
    encoded = encoder.transform(indexed)
In [46]:
# Preview results
encoded.head(2)
Out[46]:
[Row(COMPLAINT_NUMBER=101109527, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='BRONX', PRECINCT=44, SPECIFIC_LOCATION='INSIDE', PREMISE_DESCRIPTION='BAR/NIGHT CLUB', LAT=40.828848333, LON=-73.916661142, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.8288, -73.9167]), prediction=0, OFFENSE_LEVEL_INDEX=1.0, OFFENSE_LEVEL_ENC=SparseVector(2, {1: 1.0}), OFFENSE_DESCRIPTION_INDEX=0.0, OFFENSE_DESCRIPTION_ENC=SparseVector(9, {0: 1.0}), OFFENSE_RESULT_INDEX=0.0, OFFENSE_RESULT_ENC=SparseVector(1, {0: 1.0}), PREMISE_DESCRIPTION_INDEX=12.0, PREMISE_DESCRIPTION_ENC=SparseVector(13, {12: 1.0})),
 Row(COMPLAINT_NUMBER=153401121, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='QUEENS', PRECINCT=103, SPECIFIC_LOCATION='OUTSIDE', PREMISE_DESCRIPTION='OTHER', LAT=40.697338138, LON=-73.784556739, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.6973, -73.7846]), prediction=10, OFFENSE_LEVEL_INDEX=1.0, OFFENSE_LEVEL_ENC=SparseVector(2, {1: 1.0}), OFFENSE_DESCRIPTION_INDEX=0.0, OFFENSE_DESCRIPTION_ENC=SparseVector(9, {0: 1.0}), OFFENSE_RESULT_INDEX=0.0, OFFENSE_RESULT_ENC=SparseVector(1, {0: 1.0}), PREMISE_DESCRIPTION_INDEX=2.0, PREMISE_DESCRIPTION_ENC=SparseVector(13, {2: 1.0}))]

Perfect. It looks like our driver memory was absolutely the issue here and that 5.5GB driver memory limit means nothing. Well, not nothing, but either not what I think it means or it’s just straight up wrong. My trust is a little shaken, to be honest.

Anywho, in the spirit of path of least resistance, let’s continue to take a closer look at how Spark’s one hot encoding works. I’ll deal with memory issues when we run into them again.

In [47]:
# Add table to SQL Context
encoded.createOrReplaceTempView("encoded")
In [48]:
one_hot_encoding_preview = %read_sql \
SELECT \
    OFFENSE_LEVEL, \
    OFFENSE_LEVEL_INDEX, \
    OFFENSE_LEVEL_ENC \
FROM encoded \
LIMIT 100
Query started at 03:04:38 AM UTC; Query executed in 0.17 m
In [49]:
# View results
one_hot_encoding_preview.tail(5)
Out[49]:
OFFENSE_LEVEL OFFENSE_LEVEL_INDEX OFFENSE_LEVEL_ENC
95 VIOLATION 2.0 (0.0, 0.0)
96 FELONY 1.0 (0.0, 1.0)
97 VIOLATION 2.0 (0.0, 0.0)
98 MISDEMEANOR 0.0 (1.0, 0.0)
99 VIOLATION 2.0 (0.0, 0.0)

Here, we get a cleaner look at what Spark’s one hot encoder is doing. Interestingly enough, we see that our sparse vector in the column OFFENSE_LEVEL_ENC actually turned into a dense vector array. Fair enough, I will take what Spark gives me.

More importantly, we see the one hot encoding in action, with

  • MISDEMEANOR being encoded to an index value of 0, represented by the vector (1, 0)
  • FELONY being encoded to an index value of 1, represented by the vector (0, 1)
  • VIOLATION being encoded to an index value of 2, represented by the vector (0, 0)

The bad news? I can’t really sum up metrics by clusters with this sparse vector representation. In the last post, I wrote the format I really wanted it in, with each category of each categorical variable being its own column, i.e. “OFFENSE_LEVEL_MISDEMEANOR”, “OFFENSE_LEVEL_FELONY”, “OFFENSE_LEVEL_VIOLATION”, should all be their own columns with a flag of 1 if that specific crime fit into that category.

— 2 days later —

Yes, literally 2 days of googling (among work, cooking… etc just to make myself sound little less sad). It really is amazing how 2 days can pass and I have literally zero lines of code or markdown to show of right…

I simply cannot find a solution to perform this conversion. It wouldn’t seem like something that would be so difficult to do, but I honestly can’t find a solution. I understand that Spark is innately designed to go easy on memory (again, the whole point of a sparse vector in the first place), but you’d think that someone else would be in my situation and would need to break out the data in this way. It could also be my own fundamental understanding of data preparation and machine learning as well; There may be a method out there which actually does what I want to do, and I may just not know about it yet. Either way, I’m at a loss and think I might have to revert to another method…

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s