Intro
Alright, now we know the ins and outs of Spark!
Spark be like:
And, yes Spark, you would be absolutely correct. I won’t pretend to be a Spark expert after scouring the internet for random tutorial videos, but you know what? I feel like I have enough knowledge to get started. So, let’s do just that.
Now, having the data on S3 really helps us here, and I’m starting to quickly appreciate the integrated-ness (definitely not a word) of AWS’ suite of services. In the case of EMR, S3 actually integrates directly with it as a distributed storage source (replacing HDFS). All this is abstracted to us, and I’ll just keep my mind naive for now until I need to deal with that.
First, I have to set up a few packages that will help me integrate Spark with Jupyter. By default, EMR comes with the Zeppelin notebook which integrates directly into Spark out of the box, but I’m going with Jupyter because it’s what I’m more familiar with (Zeppelin actually looks pretty cool though, built in interactive visualizations from SQL results!) and it translates better to a blog.
The findspark package will allow us to connect up SparkSession with our Jupyter notebook, and the sql_magic package will allow us to add the SQL “Magic” command capabilities, that is, to actually write in-line SQL statements to query our Spark dataframes using Spark SQL (back in the all-nba predict days, we were using the R magic commands to pass dataframes back and forth between R and Python).
import os
os.system("sudo pip install findspark sql_magic")
Initiate Spark
Let’s use the findspark package to connect Jupyter up to the Spark shell. The first thing we do apparently in any new Spark application is initiate a SparkSession. When you initiate a Spark shell from command line, or open up Zeppelin, a SparkSession will already be instantiated as the spark object. In Jupyter, we can either actually set up Jupyter on the server-side to recognize the Spark shell, or we can use findspark to be able to work within the Spark shell within our Python shell, which we have chosen Jupyter to open up with. Therefore, we have to initiate this SparkSession ourselves. The SparkSession is an object that will provide an entry point into all the different APIs (e.g. Spark SQL that we will be using later) and also tracks the application being spun up so we can view it and tag it in the Spark History Server console.
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')
# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession
# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
import pandas as pd
# Initiate SparkSession as "spark"
spark = SparkSession\
.builder\
.getOrCreate()
SQL Magic
Okay, now we have a SparkSession initiated as spark like we would have if we just opened up a Spark shell. Next up, we have to load the sql_magic package so we have Spark SQL capabilities later on when we play around with the data.
We load the “Magic” package and configure it to be hooked up to Spark. Reading up on sql_magic a little bit, it’s actually pretty cool because it provides an entry point into multiple technologies, one of which is Spark, but others as well that include sqlalchemy connections… We can pull from a database inline with SQL! Amazing!
I immediately thought of that, but NO, I WILL STICK TO MY WORD. IT’S GODDAMN AMAZING.
%load_ext sql_magic
%config SQL.conn_name = 'spark'
Read CSV
Spark 2.0 and on makes it relatively easy to read a CSV, let’s try it:
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
"s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv",
header = True,
inferSchema = True
)
45 seconds to load a ~1.5G dataset… a bit on the longer side I would imagine? The dataset is not that big, but remember I’m kind of coming into this knowing that the data set size is on the smaller side to be working with Spark or EMR.
# Describe df
df.printSchema()
We see here that the dataframe has loaded successfully, and that the headers and column types have been inferred somewhat. Let’s try to do a head() on the dataframe.
%%time
# See df
df.head(10)
Alright, we’ve only issued like 2 commands, but a whirlwind just happened in the back end… Let’s try to break it down.
Initial state
First of all, when we spun up this EMR cluster, we get an architecture like this:
It’s not the greatest diagram, but simple is better right? As painful to the eyes as this diagram is, please bear with me haha. The master and 2 workers are the EC2 nodes that our EMR cluster spun up. The S3 stack is our storage, which acts as our distributed file system for our EMR cluster.
The master is where the EMR applications are installed, and whether you’re reaching Zeppelin, Spark History Server, or Hue, you would be connecting to the master node. Before I opened this Jupyter notebook, I SSH’ed into the master node and did a simple
sudo pip install jupyter
and Jupyter was available on the master node at port 8889 (8888 was used for Zeppelin by default on EMR). Hopefully everything makes sense so far! As I’m working in this notebook right now, I’m working on the master node. The master node is not only providing an interface for me to interact with Spark via Jupyter, but its also acting as my Spark application’s master who is generating the DAG, pushing individual tasks as defined by the DAG down to the appropriate workers, and collecting the results back from the workers and presenting it back to me when I run an action.
spark.read.csv
When I read a csv into a dataframe within Spark, this is the first time I’m now interacting with any data on S3 (remember, this is where my raw CSV file is hosted). I’m not 100% sure about this, but I don’t actually think any data is loaded when you perform spark.read (Spark’s dataframe reader). Because I’ve specified for spark.read to
- Look for headers
- Infer a schema (try to guess which fields are text or numbers etc)
It actually has to scan through the entire data set (or at least sample a sizeable portion of the data set) to confidently say that this column is a string and that column is an integer. I think that’s why the spark.read function took 35 seconds. At this point, however, no data has actually be read into the memory of our workers at all, Spark has simply mapped our data structure, enough that we can run a
df.printSchema()
command! Note that Spark doesn’t actually have to show any data on this command, just the skeleton of the dataframe! This actually makes me wonder because I’m pretty sure if I loaded a Pandas dataframe of this 1.5G data set, it wouldn’t take more than 35 seconds. I could just be talking out of my arse though, maybe a comparison to try for another day! At this point, we’ll say that 35 seconds is fair as it does have to scan the dataframe to infer the schema (and it probably does this in a distributed way)
head()
When we try a head() command, now we are starting to load some data into memory. I’ve specified head(10), so we’re only looking at the first 10 rows of the dataframe, so it only has to load a subset of the data into memory. We see that it took around 10 seconds to load the first 10 rows of the dataframe. Is this efficient? With the size of data we pulled? Probably not. I could imagine a head(10) on a normal Pandas dataframe doesn’t take more than a few milliseconds! However, we see some of the distributed overhead at work here. It took 10 seconds to roughly do something like this:
I want to make it absolutely clear that I’m not saying that these specific workers went and got these specific rows. I’m not quite sure which workers got which rows, but I wanted to illustrate one possibility in which the workers could have split up the task. The Master ultimately decides how the workers will distribute the tasks and this is because the Master has access to a metadata store which quickly tells the Master which storage nodes / partitions each piece of data lies in.
Onwards
Okay! Our head() function provided a not-so-friendly output, but we actually see some data. Usually, with vanilla Python and Pandas, Jupyter will output your dataframe nicely:
pd.DataFrame({
'a': [1, 2],
'b': [3, 4]
})
One thing we can do is actually introduce Spark SQL here. Spark SQL, by default within the SQL Magic library, executes the distributed Spark DAG and collects the result into a Pandas dataframe, we can then view the Pandas dataframe.
We first have to register the Spark dataframe as a “temporary table”, which opens up the dataframe to SQLContext for the Spark SQL API so we can perform SQL queries on it.
Spark SQL
# Register temporary table
df.createOrReplaceTempView("df")
# Perform the SQL equivalent of "head"
result = %read_sql SELECT * FROM df LIMIT 10;
result
Another 2 commands… another time to stop and review haha. One of the most important things to realize here is that Spark itself is a single core processing engine. It’s got many APIs built on top, two of which I just used: Pyspark (Python) and Spark SQL, but these are just that – APIs. Spark is taking on the overhead of translating these user-friendly commands to a core Spark API DAG. In essence, this is what Spark is! It’s simply a translating service for my dumb brain to work in languages I’m familiar with so I don’t have to worry about how it all abstracts to a distributed world. The two commands I just used to pull the first 10 rows of the dataframe looked very different from each other (df.head() vs SELECT * … LIMIT 10), but in fact they are executed no differently in Spark! There is absolutely differences in overhead while translating these APIs to a DAG, but once the DAG is created, theoretically it is the same every time.
In the case of Spark SQL, this is essentially what happened:
We see the major differences here all lie in the Spark Master node. We can start to appreciate the role that the Master plays in all of this.
In the first image, the Master:
- Hosts the Jupyter notebook
- Provides the contact point for the client to interact with
- Translates the input Spark SQL command to a DAG
- Decides how to distribute DAG components amongst the workers to get the job done
In the second image, the Master:
- Receives distributed results from workers
- Aggregates the results
- Converts the Spark results to a Pandas dataframe, which has now been transferred to the context of the Python interpreter on the Master (Spark is no longer involved at this point)
- Formats and presents this Pandas dataframe back to the user via Jupyter
Quite a bit of responsibility on one guy’s shoulders! Now that we have a general sense of how Spark processes a task, we can start to do some serious data exploration and data cleansing!
date_func = udf(lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
time_func = udf(lambda x: datetime.strptime(x, '%H:%M:%S'), DateType())
df = df.fillna('01/01/1900', subset = ['CMPLNT_FR_DT', 'CMPLNT_TO_DT', 'RPT_DT'])
df = df.fillna('00:00:00', subset = ['CMPLNT_FR_TM', 'CMPLNT_TO_TM'])
df = df.withColumn('CMPLNT_FR_DT_FORMATTED', date_func(col('CMPLNT_FR_DT')))
df = df.withColumn('CMPLNT_TO_DT_FORMATTED', date_func(col('CMPLNT_TO_DT')))
df = df.withColumn('RPT_DT_FORMATTED', date_func(col('RPT_DT')))
df = df.withColumn('CMPLNT_FR_TM_FORMATTED', time_func(col('CMPLNT_FR_TM')))
df = df.withColumn('CMPLNT_TO_TM_FORMATTED', time_func(col('CMPLNT_TO_TM')))
df.printSchema()