NYPD Crime #17 – Clustering To Explore Neighbourhoods (Part II – Continued)

Review

I’m so confused I don’t even know what to review. In the last post, I used Spark to load a file, tried to cluster, and ran into a memory issue. Made some changes, clustered, did some SQL, tried to transform the data with a one-hot encoder, and ran into more memory issues… Again, we’re talking about 16GB OF WORKING MEMORY FOR A 1.3GB RAW CSV. I’m so confused, but clearly I took too large of a step than I was ready for. Let’s try to start with the basics here and pay closer attention to memory usage within the cluster.

Ganglia

Let’s turn to Ganglia for some deeper cluster memory monitoring… Ganglia… what a fun name to say haha.

After I write this paragraph, I’m going to turn down my cluster, start a new one, and have nothing running… I’d expect 24GB of free memory right now, maybe a bit of overhead memory consumption for node and Spark operation, but I would assume no more than 1GB per cluster maybe?

Well, I’m glad I didn’t make any bets on that, because that could not be further from the truth. At a standing state, HALF OF MY CLUSTER’S MEMORY IS USED UP? Let’s take a deeper look at the nodes. Note that 10.0.0.160 LAN IP is my master node.

Master Node

Right off the bat, this already looks whatever the opposite of “promising” is. Good ol’ thesaurus.com suggests:

unpromising, dull, hopeless, unhappy

I would agree that I feel like a mix of pretty much all of those right now. What makes it even worse, the Spark JVM on my master node doesn’t even have access to the full 8GB of RAM! Let’s see how much my master node actually has:

In [1]:
import os
os.system("sudo pip install findspark sql_magic pyspark_dist_explore seaborn")
Out[1]:
0
In [3]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')

# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

I realize that I’m loading even more into RAM on my master node right now, but I just want to look at the config via code I can run it again seamlessly if I ever use this on another EMR cluster (I’ll eventually have to shut this one down when I’m done with it).

In [5]:
# See spark config parameters
spark.sparkContext.getConf().getAll()
Out[5]:
[('spark.eventLog.enabled', 'true'),
 ('spark.executor.memory', '4771M'),
 ('spark.driver.host', '10.0.0.47'),
 ('spark.driver.extraLibraryPath',
  '/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'),
 ('spark.yarn.appMasterEnv.SPARK_HOME', '/usr/lib/spark'),
 ('spark.executor.cores', '4'),
 ('spark.executor.extraJavaOptions',
  "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"),
 ('spark.eventLog.dir', 'hdfs:///var/log/spark/apps'),
 ('spark.executor.instances', '2'),
 ('spark.sql.hive.metastore.sharedPrefixes',
  'com.amazonaws.services.dynamodbv2'),
 ('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'application_1507603908635_0001'),
 ('spark.driver.memory', '5585M'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executorEnv.PYTHONPATH',
  '/home/ec2-user/src/cntk/bindings/python<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.4-src.zip'),
 ('spark.history.fs.logDirectory', 'hdfs:///var/log/spark/apps'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.driver.port', '44515'),
 ('spark.driver.extraClassPath',
  '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar'),
 ('spark.executor.extraClassPath',
  '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://ip-10-0-0-47.ec2.internal:20888/proxy/application_1507603908635_0001'),
 ('spark.history.ui.port', '18080'),
 ('spark.shuffle.service.enabled', 'true'),
 ('spark.hadoop.yarn.timeline-service.enabled', 'false'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.appUIAddress', 'http://ip-10-0-0-47.ec2.internal:4040'),
 ('spark.yarn.historyServer.address', 'ip-10-0-0-47.ec2.internal:18080'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.extraJavaOptions',
  "-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'ip-10-0-0-47.ec2.internal'),
 ('spark.master', 'yarn'),
 ('spark.default.parallelism', '16'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.extraLibraryPath',
  '/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'),
 ('spark.yarn.isPython', 'true'),
 ('spark.dynamicAllocation.enabled', 'true'),
 ('spark.ui.proxyBase', '/proxy/application_1507603908635_0001')]

The max memory allocated to the master node here is indicated by the spark.driver.memory parameter, which is about 5.6G. Seeing as how my driver is already sitting at 5G RAM usage before anything is loaded up, hopeless seems about right and I’m surprised I was even able to get as far as I got in the previous notebook.

But wait… does this mean I only had 0.5G of working memory left? That can’t be, as my dataset is 1.3G of raw data. Even loading it up as a parquet and processing it as much as I did (including group bys and such), it just doesn’t seem like 0.5G is enough.

This makes me question whether or not I’m reading the graph right, or even understanding the graph properly fundamentally. This feels a bit Player’s Tribune-ish (if you’re not a sports fan, please forgive this waste of your time), but I was never one of those kids that built computers for fun, you know? I begrudgingly learned transistors and FPGAs in school, but nothing at the OS level that would help me do what I do today. Ok. I’m done… Sorry.

Even with that unnecessary ramble, though, I think it’s important to understand what kind of background I come from because this is simply my path to data science. There are probably Comp Sci / Comp Eng folks out there who are laughing at my inability to read a Ganglia chart (and, to be honest, I didn’t even know what Ganglia was before I got into EMR), but who got put on this earth, went to a public school system and decent university, and understood math, stats, software development, scripting, computing, networking, compression, visualization… I could go on forever… right off the bat? I sure hope the answer is NOBODY because this blog is obsolete otherwise.

BACK TO GANGLIA… What I’m starting to think now is that the cached memory in Ganglia actually refers to the memory that the OS needs that is not dedicated to Spark. This would make more sense because, again, 0.5G for me to run the rest of my spark program does not make sense… Upon reading up a bit on cached memory on Red Hat, it seems that that may be in fact that it means. I know that stackoverflow isn’t the most official source, but this answer explains what I’m wondering pretty concisely (stackoverflow is pretty much the equivalent of citing Wikipedia in a research paper these days haha). Anyways, what Ganglia is showing us is the system memory. What our Spark config shows us is how much of that system memory is being given to Spark specifically.

7.8G\ Total\ RAM-2.8G\ Cache\ \&\ Buffer\ Memory=5G\ Free\ Memory

With 5.6G being allocated to Spark, we can start to perhaps see where that number is coming from. We can probably test this theory by loading up some data and processing it to see how the used and cached memories react to the activity, but let’s take a look at the worker nodes first.

Worker Nodes

We see a similar story in both cases here. Something like:

7.8G\ Total\ RAM-1.9G\ Cache\ \&\ Buffer\ Memory=5.9G\ Free\ Memory

I’m not quite sure why only 4.8G is being allocated to the executors, given our logic for the driver memory.

Loading Data

Let’s load up some data and see what happens to the cluster. I don’t anticipate too many changes, but I’ve already proven how little I know, so let’s take it step by step.

In [6]:
%%time
# Read NYPD Complaint Data
df_filtered = spark.read.parquet("s3n://2017edmfasatb/nypd_complaints/data/df_filtered.parquet")
df_filtered.cache()
CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 14.7 s

Let’s see what our memory looks like now:

Master

Workers

Easily the first thing I’m noticing: My driver is pretty much out of RAM already… I have like 1G left.

Second thing I’m noticing: My driver has been like that for a while now. I guess my installing and loading packages took up more memory than I thought it would… goddamn. It added about 1G to the load of the system’s RAM.

Third thing I’m noticing: My driver memory is slowly increasing as time goes on. I’m wondering if this is because of the notebook I’m writing in, but maybe I’ll put this to the side for now and figure out the large spikes first.

Fourth (and last) thing: The executors seem to be doing okay, they experience a small spike, but not as much as the driver. This could make sense to me because, here, I’m even installing regular python (not pyspark) packages which would only affect the driver anyways. Creating something like a SparkSession would probably cause the driver more strain than the executor as well.

Fifth (ok, I lied, THIS is the last) thing: The loading and caching of the data set to memory really didn’t take much of a toll. Perhaps it’s the parquet storage.

At this point, I can already see how my Spark application ran out of memory… Let’s continue to see what happens. The next thing we did in the last notebook was kick off K-Means.

In [8]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Set seed for ability to reproduce results, 20 clusters
kmeans = KMeans(k = 20, seed = 1)

# Initiate and transform columns into vector
vecAssembler = VectorAssembler(inputCols = ['LAT', 'LON'], outputCol = "features")
k_means_input = vecAssembler.transform(df_filtered)
In [9]:
%%time
# Refit model
model = kmeans.fit(k_means_input[['features']])
CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 2min 26s
In [10]:
%%time
# Use model to assign the samples a cluster to belong to
prediction = model.transform(k_means_input[['features']])
print(prediction.head(5))
[Row(features=DenseVector([40.8288, -73.9167]), prediction=0), Row(features=DenseVector([40.6973, -73.7846]), prediction=10), Row(features=DenseVector([40.8026, -73.9451]), prediction=19), Row(features=DenseVector([40.6545, -73.7263]), prediction=10), Row(features=DenseVector([40.738, -73.9879]), prediction=6)]
CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 548 ms

Let’s pause here and take another look.

Master

Workers

Observation 1: Master is still doing ok.

Observation 2: Executors are being hit here by these commands.

Observation 3: One executor is being hit more than the other.

Summary: I’m now at <800M free for my driver, and ~2-2.5G free for my executors.

Let’s run the next command… We end up performing a join, and there is where my application crashed the first time.

In [11]:
from pyspark.sql import functions as F

# Since there are no common column between these two dataframes add row_index so that it can be joined
df_filtered_indexed = df_filtered.withColumn('row_index', F.monotonically_increasing_id())
df_filtered.unpersist()

prediction_indexed = prediction.withColumn('row_index', F.monotonically_increasing_id())
prediction.unpersist()
Out[11]:
DataFrame[features: vector, prediction: int]
In [12]:
# Perform join on our generated ID row_index
df_predicted = df_filtered_indexed.join(prediction_indexed, on = ['row_index'], how = 'left').drop('row_index')
df_filtered_indexed.unpersist()
prediction_indexed.unpersist()
Out[12]:
DataFrame[features: vector, prediction: int, row_index: bigint]
In [13]:
# Preview results
df_predicted.head(2)
Out[13]:
[Row(COMPLAINT_NUMBER=101109527, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 45), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='BRONX', PRECINCT=44, SPECIFIC_LOCATION='INSIDE', PREMISE_DESCRIPTION='BAR/NIGHT CLUB', LAT=40.828848333, LON=-73.916661142, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.8288, -73.9167]), prediction=0),
 Row(COMPLAINT_NUMBER=153401121, COMPLAINT_START_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), COMPLAINT_END_TIMESTAMP=datetime.datetime(2015, 12, 31, 23, 36), REPORTED_DATE_TIMESTAMP=datetime.datetime(2015, 12, 31, 0, 0), COMPLAINT_START_TIMESTAMP_YEAR='2015', COMPLAINT_START_TIMESTAMP_MONTH='12', COMPLAINT_START_TIMESTAMP_DAY='31', COMPLAINT_START_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_START_TIMESTAMP_HOUR='23', COMPLAINT_END_TIMESTAMP_YEAR='2015', COMPLAINT_END_TIMESTAMP_MONTH='12', COMPLAINT_END_TIMESTAMP_DAY='31', COMPLAINT_END_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_END_TIMESTAMP_HOUR='23', REPORTED_DATE_TIMESTAMP_YEAR='2015', REPORTED_DATE_TIMESTAMP_MONTH='12', REPORTED_DATE_TIMESTAMP_DAY='31', REPORTED_DATE_TIMESTAMP_WEEKDAY='Thu', COMPLAINT_LENGTH=0.0, COMPLAINT_LENGTH_ZERO_TIME=True, COMPLAINT_START_REPORTED_LAG=0.0, COMPLAINT_END_REPORTED_LAG=0.0, OFFENSE_DESCRIPTION='OTHER', OFFENSE_RESULT='COMPLETED', OFFENSE_LEVEL='FELONY', JURISDICTION='N.Y. POLICE DEPT', BOROUGH='QUEENS', PRECINCT=103, SPECIFIC_LOCATION='OUTSIDE', PREMISE_DESCRIPTION='OTHER', LAT=40.697338138, LON=-73.784556739, COMPLAINT_LENGTH_DAYS=0.0, COMPLAINT_LENGTH_UNDER_ONE_YEAR=True, features=DenseVector([40.6973, -73.7846]), prediction=10)]

Let’s check again… shall we?

Master

Workers

It just gets more interesting with every new series of commands eh… oh man.

Observation 1: My master now has less than 300M free memory for the node, and my Spark memory is actually eating into my cached memory. I’m actually over my 5.6G memory limit allocated to my driver. The join we did almost caused a 3G jump in driver memory being used.

Observation 2: My executors have actually decreased back to when I even performed the K-Means…

Why is all this happening???? WHY DO YOU HAVE TO BE SO CONFUSING, SPARK?!?!?!?

Okay, calm down and think. It actually does make sense to some extent. In the last series of commands, I performed a join. When performing a join, the executors each take a bit of data and perform the join on their subsets of data. The results are passed back to the master to be “reduced” back together. Perhaps this is that spike of memory added to the driver there. This assumes that our driver is actually holding our entire joined dataframe in memory right now, which I can’t confirm is true here… The second thing that comes to my head is that I actually unpersisted my older dataframes, which clears them from memory – in this case, the worker memories.

This leads to be believe… if I cache my joined dataframe back into memory, would it clear the driver memory and push the dataframe back to the executors?

In [ ]:
df_predicted.cache()

I’ll spare you the 200 lines of error message – it crashed. Maybe the first thing I need to do is up my driver memory here. Again, a little mindblowing that I’m needing a 16GB RAM driver node with 8GB + 8GB worker nodes to analyze, again, a 1.3GB RAW CSV FILE… but let’s try it in the next post.

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s