In the last 2 posts, we reviewed (largely using Spark and Spark SQL (very handy)) all of the interesting fields. All of them except latitude and longitude. I ended the last post puzzled about how to actually plot this many points (5 million points!). Spark didn’t have anything to do this, so I had to look elsewhere. The problem here is my whole exercise is to leverage distributed computing, but at this point, my definition of distributed computing has been… exclusively Spark. So I lied a bit, but that’s only because I’m stupid, lazy, and don’t want to set up some distributed graphing software and manually configure a cluster across machines.
So, if Spark won’t work… what can we do? After a few google searches, I’ve come across a library called datashader. Datashader claims to be able to make plots such as:
Lo and behold… that’s pretty much what I’m looking for… A map of NYC! In the map above, the folks at datashader have mapped out the most prominent races according to the NYC census data. I feel like that’s pretty much what I want, except the most prominent race becomes the prominent type of crime, or by the most prominent seriousness of a crime (violation, misdemeanor, felony).
Man… now I feel like a super basic data geek who thinks they’re cutting edge because they’re working with NYC data…
The worst part? It took me 12 posts to realize it. Oh well, onward we move.
Here’s a tutorial of datashader that honestly outputs some breathtaking visualizations.
Watching this video actually gives an interesting perspective on big data plotting as well. They approached datashader with a big data motivation, but not in the compute sense… more in the art of visualization sense. Their reasoning is that, simply plotting like a million points in a small area (a common example they use is thinking of how many data points is packed into each pixel on screen) will yield perception and accuracy problems.
This post by the folks at Continuum (who created datashader) shows some of the pitfalls of plotting a ton of data. Their argument is that, if any of these 6 pitfalls occur, the visualization may literally be lying to you.
The first one they talk about is overplotting. This is something I run into all the time and have used the alpha / opacity plotting parameter to solve in the past. Overplotting is when we have 2 classes identified by different colors plotted on the same chart. Let’s say one group is plotted with blue, and the other is plotting in red:
We can see, depending on the order in which we plot the points actually matter. If we plot red first, then blue, we get image C which masks many of the red points… we simply don’t know if the red is there because we cannot see the bottom layer of the plot! Image D is just the opposite – we can infer that there are blue points, and we may have already been biased looking at these sets of photos because we already know the true state of the data, but if we were looking at D for the first time we may not have been able to tell if the blue dots really exist!
This is when I’d play around with the alpha, or opacity of the dots to make the visualization a bit more truthful:
Even here, we see that it’s not perfect as there still is a difference between C and D. The datashader documentation then goes on to state that having to tweak the opacity and the dot sizes (see below with same opacity, but smaller dots)
makes the visualization process much slower and takes away the focus from what we’re actually trying to do! It goes on to describe 4 more pitfalls that could occur when visualization the data, and continues to make the point that
For big data, you don’t know when the viz is lying
This comes from datashader’s definition of big data that is
Data is big when you can’t make any inferences from any single point of data
This implies that the data is so granular (a single incident at a single lat long) that you’re not meant to be able to infer any overall conclusions from a single data point, and going through every single point is an inefficient (and in all likelihood impossible) task because of pure volume. In a case like this, all you can rely on is the visualization, and if the visualization is incorrect, you don’t have any QA tools to notice, so it’s best to use the right theory from the get go.
When I tried to build a heatmap from the Edmonton Property Assessment data, the method I tried was to build a regression model that predicts an assessment value for each lat long value / range. In K-NN, we can infer lat / longs which do not have a property assessment tied to them by taking a look at those around them. In essence, we build a gradient across the lats and longs to communicate the predicted assessment value. After doing that, my visualization consisted of a set of predictions on a grid of inputs across edmonton to ensure I have every area covered.
In this example, what I’d like to visualize is the volume of incidents, which I have inherently in the data, so I don’t have to do anything fancy with predictive models. However, I still face all the problems that datashader has laid out. What datashader does is similar to what I had done in the Edmonton Property Assessment post in that
- It represents each pixel of the screen is represented by a value, not solely represented by the data points contributing to that pixel
- #1 is true because the value is represented by an overall model that has the context of the entire dataset, not just the pixel in question
In datashader’s example, if we set an alpha to 0.1, we are indicating that 10 points lying on top of each other will achieve full saturation. If, in our data, we only have points that are either not overlapping (singular points), or they overlap by 50 points at a time, it would be very difficult to create a plot that can show off this contrast well because anything over 10 points at a time will achieve full saturation! Anything between 10 and 50 points overlapping will look the same to the user. That’s what happens when we don’t generalize our data to some type of heatmap or model. This is where datashader’s real advantage comes in, because datashader allows you to easily map your own model without having to go through the trouble that I went through in the Edmonton Property Assessment project. Granted, I don’t know if datashader can perform regression, but it’s got a lot of great gradients (e.g. using log instead of linear) for our specific purpose, analyzing volumes of points in a region where each point is weighted the same!
But wait… what happened to the whole thing about not being able to use Spark anymore? I thought we were trying to leverage distributed computing for this project? I agree, I am being a bit lazy here in dropping distributed computing altogether in this plot, but in the datashader video above, the guy plots ONE BILLION POINTS, and he does it on his Mac which I assume has no more than 32GB RAM (I’m probably wrong). If I think about it objectively, the inputs of a map are simply latitude and longitudes. The dataset is 1.3 GB, but the lats and longs probably only account for like 1/50th of the entire file size! Lat / longs are bounded by however many significant digits of the lat / long itself. A latitude like 53.631611 will always only take 9 bytes to represent, whereas a text description field is often much longer and variable. Given that there’s about 35 rows of data in our parquet dataframe, lat / longs accounting for 1/50th of the file size isn’t so farfetched to me! That brings down the file size to 30 MB. Let’s say our own laptop had 32 GB memory, we could handle over 1 TB of just latitude and longitude data in memory.
There are different solutions for everything, and perhaps this one is the path of least resistance for mapping functionality!
Back To Datashader
Let’s try to import the dataset into Jupyter. I’ve found this package pyarrow which can apparently read parquet files into a Pandas dataframe.
import os os.system("sudo pip install pyarrow")
So it looks like there’s a pretty nice python package pyarrow which seems to be able to load up parquets in a quick and simple manner.
# Import pyarrow import pyarrow.parquet as pq
table2 = pq.read_table("s3n://2017edmfasatb/nypd_complaints/data/df_filtered.parquet")
--------------------------------------------------------------------------- ArrowIOError Traceback (most recent call last) <ipython-input-4-44807fc45e46> in <module>() ----> 1 table2 = pq.read_table("s3n://2017edmfasatb/nypd_complaints/data/df_filtered.parquet") /usr/local/lib64/python2.7/site-packages/pyarrow/parquet.pyc in read_table(source, columns, nthreads, metadata, use_pandas_metadata) 722 metadata=metadata) 723 --> 724 pf = ParquetFile(source, metadata=metadata) 725 return pf.read(columns=columns, nthreads=nthreads, 726 use_pandas_metadata=use_pandas_metadata) /usr/local/lib64/python2.7/site-packages/pyarrow/parquet.pyc in __init__(self, source, metadata, common_metadata) 52 def __init__(self, source, metadata=None, common_metadata=None): 53 self.reader = ParquetReader() ---> 54 self.reader.open(source, metadata=metadata) 55 self.common_metadata = common_metadata 56 /usr/local/lib64/python2.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open (/arrow/python/build/temp.linux-x86_64-2.7/_parquet.cxx:8115)() 396 self.source = source 397 --> 398 get_reader(source, &rd_handle) 399 with nogil: 400 check_status(OpenFile(rd_handle, self.allocator, properties, pyarrow/io.pxi in pyarrow.lib.get_reader (/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:46747)() pyarrow/io.pxi in pyarrow.lib.memory_map (/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:43943)() pyarrow/io.pxi in pyarrow.lib.MemoryMappedFile._open (/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:43722)() pyarrow/error.pxi in pyarrow.lib.check_status (/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:7495)() ArrowIOError: Failed to open local file: s3n://2017edmfasatb/nypd_complaints/data/df_filtered.parquet
Okay, apparently it’s not as straight forward to read a parquet file into a Pandas dataframe as I thought… It looks like, at the time of writing this, pyarrow does not support reading from partitioned S3…:
I’ve used the same path string as when I was using Spark in the last post, but I guess Spark, in this case, was spun up from an Amazon EMR cluster which had partitioned S3 integration built in. Pyarrow doesn’t seem to have this (yet?). I guess that’s why we pay Amazon an additional hourly cost for the EMR service. Obviously not only because of this ability to read parquets, but this capability among other features (like setting up the entire goddamn cluster for us).
The stackoverflow post above suggests another library which can read parquets in this moment in time from S3 (at least the guy has got it working). Let’s give that a shot?
import os os.system("sudo pip install fastparquet s3fs python-snappy")
import s3fs import fastparquet as fp s3 = s3fs.S3FileSystem() fs = s3fs.core.S3FileSystem() # Set up s3fs path object s3_path = "2017edmfasatb/nypd_complaints/data/df_filtered.parquet" all_paths_from_s3 = fs.glob(path = s3_path) # Load file from S3 myopen = s3.open #use s3fs as the filesystem fp_obj = fp.ParquetFile(all_paths_from_s3, open_with = myopen)
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-1-b4d1ef57c126> in <module>() 11 myopen = s3.open 12 #use s3fs as the filesystem ---> 13 fp_obj = fp.ParquetFile(all_paths_from_s3, open_with = myopen) /usr/local/lib64/python2.7/site-packages/fastparquet/api.pyc in __init__(self, fn, verify, open_with, sep, root) 83 if isinstance(fn, (tuple, list)): 84 basepath, fmd = metadata_from_many(fn, verify_schema=verify, ---> 85 open_with=open_with, root=root) 86 self.fn = sep.join([basepath, '_metadata']) # effective file 87 self.fmd = fmd /usr/local/lib64/python2.7/site-packages/fastparquet/util.pyc in metadata_from_many(file_list, verify_schema, open_with, root) 131 file_list = [pf.fn for pf in pfs] 132 elif all(not isinstance(pf, api.ParquetFile) for pf in file_list): --> 133 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list] 134 else: 135 raise ValueError("Merge requires all PaquetFile instances or none") /usr/local/lib64/python2.7/site-packages/fastparquet/api.pyc in __init__(self, fn, verify, open_with, sep, root) 97 self.fn = fn 98 with open_with(fn, 'rb') as f: ---> 99 self._parse_header(f, verify) 100 if not self.row_groups: 101 self.file_scheme = 'empty' /usr/local/lib64/python2.7/site-packages/fastparquet/api.pyc in _parse_header(self, f, verify) 113 if verify: 114 assert f.read(4) == b'PAR1' --> 115 f.seek(-8, 2) 116 head_size = struct.unpack('<i', f.read(4)) 117 if verify: /usr/local/lib/python2.7/site-packages/s3fs/core.pyc in seek(self, loc, whence) 1002 "invalid whence (%s, should be 0, 1 or 2)" % whence) 1003 if nloc < 0: -> 1004 raise ValueError('Seek before start of file') 1005 self.loc = nloc 1006 return self.loc ValueError: Seek before start of file
Wow, I just can’t get this parquet file to load… What a lesson in data management. I was working so fluidly in my analysis and, SURPRISE, more technical issues. Well, I shouldn’t say more technical issues… I should say my own lack of knowledge lol.
So, what I think is happening here is that Spark seems to be loading and saving parquets in a partitioned manner. Again, this is what my parquet file looks like on EMRFS (which is then abstracted on top of the actual file systems in the underlying clusters):
First of all, the .parquet “file” is actually a folder, and the above image is the partitioned EMRFS pieces within that .parquet folder. These files are also .parquet files as well, actually. Snappy seems to be a method of compression. I might just have to go back to Spark and actually use Spark to load up this dataframe, then convert it to Pandas to be used with datashader. That just seems too convoluted though, there must be an easier way.
— 5 minutes later —
Wow, I’m an idiot.
The answer was right there in the stackoverflow answer
In the answer, the user writes
#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet s3_path = "mybucket/data_folder/*/*/*.parquet"
I actually changed the path to my top level .parquet file, which, again, was my .parquet folder, when I really needed to point the path to the multiple, individual, .snappy.parquet files within that folder. There was a lot of new concepts to me here… Spark, partitioned storage, parquet… I’m glad it’s somewhat coming together now. Let’s try this one more time.
# Set up s3fs path object s3_path = "2017edmfasatb/nypd_complaints/data/df_filtered.parquet/*.parquet" all_paths_from_s3 = fs.glob(path = s3_path) # Load file from S3 myopen = s3.open #use s3fs as the filesystem fp_obj = fp.ParquetFile(all_paths_from_s3, open_with = myopen)
It seems to have worked.
Now that I look at the fastparquet documentation, they literally say:
read and write Parquet files, in single- or multiple-file format. The latter is common found in hive/Spark usage.
It’s literally the first feature they list. In addition, they also allow us to load specific columns (nice feature of parquet I’ll finally get to see in action) to a Pandas dataframe! Let’s try it out.
%%time # Load only lat and long to pandas dataframe df = fp_obj.to_pandas(['LAT', 'LON'])
CPU times: user 2.86 s, sys: 64 ms, total: 2.92 s Wall time: 5.19 s
5 seconds to theoretically load 5M+ latitude and longitudes. Not quite sure how to benchmark that, but it doesn’t put a damper in my day as of yet, so I’ll take it.
# View dataframe columns df.head()
Yup, pretty much what I’m looking for! We’re so far into this post already (it seems), and I haven’t even touched datashader. Finally, we are here, but I’m almost expecting it to take like another 3 posts until I see an actual map like the one above haha.
Datashader… ACTUALLY this time…
The first thing we have to do is install datashader because the Amazon Deep Learning AMI doesn’t come with datashader right off the bat. Datashader is not currently hosted on the pypi repo, so we have to install it using conda.
Since Anaconda2 and Anaconda3 are installed on the Amazon Deep Learning AMI, we have to specify to use Anaconda2.
# Install datashader via conda, we use the -y flag so we don't have to reply to any prompts import os os.system("sudo /home/ec2-user/src/anaconda2/bin/conda install -y bokeh datashader") os.system("sudo /home/ec2-user/src/anaconda2/bin/conda remove --force datashader") os.system("git clone https://github.com/bokeh/datashader.git") os.system("sudo pip install -e datashader/") os.system("sudo pip install dask[complete]")
import datashader as ds from datashader import transfer_functions as tf from datashader.colors import Greys9 Greys9_r = list(reversed(Greys9))[:-2]
plot_width = int(750) plot_height = int(plot_width//1.2)
%%time cvs = ds.Canvas() agg = cvs.points(df, 'LON', 'LAT') img = tf.shade(agg, cmap=["white", 'green'], how='eq_hist')
CPU times: user 108 ms, sys: 0 ns, total: 108 ms Wall time: 107 ms
This is awesome, pretty much exactly what I was looking for! With this much data (similarly to datashader’s NYC Taxi and 2010 US Census visualizations, you can basically make out the geographical region without even plotting a map! You can even make out the street blocks, parks, and rivers / lakes. It looks like the most crime occurs in Manhattan, the Bronx, and East Brooklyn. I don’t know too much more about NYC to be able to infer whether this is because of population, demographics, landmarks… etc, but this is definitely pointing me to the right direction. There’s a few more things I want to do in datashader first before I dive any deeper into analysis, but this notebook is already getting pretty convoluted with the parquet loading and datashader installation, so let’s start a fresh notebook for our datashader deep dive.