4 Loading data into DataFrames

published book

This chapter covers

  • Creating DataFrames from delimited text files and defining data schemas
  • Extracting data from a SQL relational database and manipulating it using Dask
  • Reading data from distributed filesystems (S3 and HDFS)
  • Working with data stored in Parquet format

I’ve given you a lot of concepts to chew on over the course of the previous three chapters—all of which will serve you well along your journey to becoming a Dask expert. But, we’re now ready to roll up our sleeves and get into working with some data. As a reminder, figure 4.1 shows the data science workflow we’ll be following as we work through the functionality of Dask.

Figure 4.1 The Data Science with Python and Dask workflow

c04_01.eps

In this chapter, we remain at the very first steps of our workflow: Problem Definition and Data Gathering. Over the next few chapters, we’ll be working with the NYC Parking Ticket data to answer the following question:

What patterns can we find in the data that are correlated with increases or decreases in the number of parking tickets issued by the New York City parking authority?

Perhaps we might find that older vehicles are more likely to receive tickets, or perhaps a particular color attracts more attention from the parking authority than other colors. Using this guiding question, we’ll gather, clean, and explore the relevant data with Dask DataFrames. With that in mind, we’ll begin by learning how to read data into Dask DataFrames.

One of the unique challenges that data scientists face is our tendency to study data at rest, or data that wasn’t specifically collected for the purpose of predictive modeling and analysis. This is quite different from a traditional academic study in which data is carefully and thoughtfully collected. Consequentially, you’re likely to come across a wide variety of storage media and data formats throughout your career. We will cover reading data in some of the most popular formats and storage systems in this chapter, but by no means does this chapter cover the full extent of Dask’s abilities. Dask is very flexible in many ways, and the DataFrame API’s ability to interface with a very large number of data collection and storage systems is a shining example of that.

As we work through reading data into DataFrames, keep what you learned in previous chapters about Dask’s components in mind: the Dask DataFrames we will create are made up of many small Pandas DataFrames that have been logically divided into partitions. All operations performed on the Dask DataFrame result in the generation of a DAG (directed acyclic graph) of Delayed objects which can be distributed to many processes or physical machines. And the task scheduler controls the distribution and execution of the task graph. Now on to the data!

join today to enjoy all our content. all the time.
 

4.1 Reading data from text files

We’ll start with the simplest and most common format you’re likely to come across: delimited text files. Delimited text files come in many flavors, but all share the common concept of using special characters called delimiters that are used to divide data up into logical rows and columns.

Every delimited text file format has two types of delimiters: row delimiters and column delimiters. A row delimiter is a special character that indicates that you’ve reached the end of a row, and any additional data to the right of it should be considered part of the next row. The most common row delimiter is simply a newline character (\n) or a carriage return followed by a newline character (\r\n). Delimiting rows by line is a standard choice because it provides the additional benefit of breaking up the raw data visually and reflects the layout of a spreadsheet.

Likewise, a column delimiter indicates the end of a column, and any data to the right of it should be treated as part of the next column. Of all the popular column delimiters out there, the comma (,) is the most frequently used. In fact, delimited text files that use comma column delimiters have a special file format named for it: comma-separated values or CSV for short. Among other common options are pipe (|), tab, space, and semicolon.

In figure 4.2, you can see the general structure of a delimited text file. This one in particular is a CSV file because we’re using commas as the column delimiter. Also, since we’re using the newline as the row delimiter, you can see that each row is on its own line.

Figure 4.2 The structure of a delimited text file

c04_02.eps

Two additional attributes of a delimited text file that we haven’t discussed yet include an optional header row and text qualifiers. A header row is simply the use of the first row to specify names of columns. Here, Person ID, Last Name, and First Name aren’t descriptions of a person; they are metadatathat describe the data structure. While not required, a header row can be helpful for communicating what your data structure is supposed to hold.

Text qualifiers are yet another type of special character used to denote that the contents of the column is a text string. They can be very useful in instances where the actual data is allowed to contain characters that are also being used as row or column delimiters. This is a fairly common issue when working with CSV files that contain text data, because commas normally show up in text. Surrounding these columns with text qualifiers indicates that any instances of the column or row delimiters inside the text qualifiers should be ignored.

Now that you’ve had a look at the structure of delimited text files, let’s have a look at how to apply this knowledge by importing some delimited text files into Dask. The NYC Parking Ticket data we briefly looked at in chapter 2 comes as a set of CSV files, so this will be a perfect dataset to work with for this example. If you haven’t downloaded the data already, you can do so by visiting www.kaggle.com/new-york-city/nyc-parking-tickets. As I mentioned before, I’ve unzipped the data into the same folder as the Jupyter notebook I’m working in for convenience’s sake. If you’ve put your data elsewhere, you’ll need to change the file path to match the location where you saved the data.

Listing 4.1 Importing CSV files using Dask defaults
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

fy14 = dd.read_csv('nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv')
fy15 = dd.read_csv('nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2015.csv')
fy16 = dd.read_csv('nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2016.csv')
fy17 = dd.read_csv('nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

fy17

In listing 4.1, the first three lines should look familiar: we’re simply importing the DataFrame library and the ProgressBar context. In the next four lines of code, we’re reading in the four CSV files that come with the NYC Parking Ticket dataset. For now, we’ll read each file into its own separate DataFrame. Let’s have a look at what happened by inspecting the fy17 DataFrame.

Figure 4.3 The metadata of the fy17 DataFrame

c04_03.eps

In figure 4.3, we see the metadata of the fy17 DataFrame. Using the default 64 MB blocksize, the data was split into 33 partitions. You might recall this from chapter 3. You can also see the column names at the top, but where did those come from? By default, Dask assumes that your CSV files will have a header row, and our file indeed has a header row. If you look at the raw CSV file in your favorite text editor, you will see the column names on the first line of the file. If you want to see all the column names, you can inspect the columns attribute of the DataFrame.

Listing 4.2 Inspecting the columns of a DataFrame
fy17.columns

'''
Produces the output:

Index([u'Summons Number', u'Plate ID', u'Registration State', u'Plate Type', u'Issue Date', u'Violation Code', u'Vehicle Body Type', u'Vehicle Make', u'Issuing Agency', u'Street Code1', u'Street Code2', u'Street Code3',u'Vehicle Expiration Date', u'Violation Location',
       u'Violation Precinct', u'Issuer Precinct', u'Issuer Code',
       u'Issuer Command', u'Issuer Squad', u'Violation Time',
       u'Time First Observed', u'Violation County',
       u'Violation In Front Of Or Opposite', u'House Number', u'Street Name', u'Intersecting Street', u'Date First Observed', u'Law Section',
       u'Sub Division', u'Violation Legal Code', u'Days Parking In Effect    ', u'From Hours In Effect', u'To Hours In Effect', u'Vehicle Color',
       u'Unregistered Vehicle?', u'Vehicle Year', u'Meter Number',
       u'Feet From Curb', u'Violation Post Code', u'Violation Description',
       u'No Standing or Stopping Violation', u'Hydrant Violation',
       u'Double Parking Violation'],
      dtype='object')
'''

If you happen to take a look at the columns of any other DataFrame, such as fy14 (Parking Tickets for 2014), you’ll notice that the columns are different from the fy17 (Parking Tickets for 2017) DataFrame. It looks as though the NYC government changed what data it collects about parking violations in 2017. For example, the latitude and longitude of the violation was not recorded prior to 2017, so these columns won’t be useful for analyzing year-over-year trends (such as how parking violation “hotspots” migrate throughout the city). If we simply concatenated the datasets together as is, we would get a resulting DataFrame with an awful lot of missing values. Before we combine the datasets, we should find the columns that all four of the DataFrames have in common. Then we should be able to simply union the DataFrames together to produce a new DataFrame that contains all four years of data.

We could manually look at each DataFrame’s columns and deduce which columns overlap, but that would be terribly inefficient. Instead, we’ll automate the process by taking advantage of the DataFrames’ columns attribute and Python’s set operations. The following listing shows you how to do this.

Listing 4.3 Finding the common columns between the four DataFrames
# Import for Python 3.x
from functools import reduce

columns = [set(fy14.columns),
    set(fy15.columns),
    set(fy16.columns),
    set(fy17.columns)]
common_columns = list(reduce(lambda a, i: a.intersection(i), columns))

On the first line, we create a list that contains four set objects, respectively representing each DataFrame’s columns. On the next line, we take advantage of the intersection method of set objects that returns a set containing the items that exist in both of the sets it’s comparing. Wrapping this in a reduce function, we’re able to walk through each DataFrame’s metadata, pull out the columns that are common to all four DataFrames, and discard any columns that aren’t found in all four DataFrames. What we’re left with is the following abbreviated list of columns:

['House Number',
 'No Standing or Stopping Violation',
 'Sub Division',
 'Violation County',
 'Hydrant Violation',
 'Plate ID',
 'Plate Type',
 'Vehicle Year',
 'Street Name',
 'Vehicle Make',
'Issuing Agency',
...
 'Issue Date']

Now that we have a set of common columns shared by all four of the DataFrames, let’s take a look at the first couple of rows of the fy17 DataFrame.

Listing 4.4 Looking at the head of the fy17 DataFrame
fy17[common_columns].head()

Figure 4.4 The first five rows of the fy17 DataFrame using the common column set

c04_04.eps

Two important things are happening in listing 4.4: the column filtering operation and the top collecting operation. Specifying one or more columns in square brackets to the right of the DataFrame name is the primary way you can select/filter columns in the DataFrame. Since common_columns is a list of column names, we can pass that in to the column selector and get a result containing the columns contained in the list. We’ve also chained a call to the head method, which allows you to view the top n rows of a DataFrame. As shown in figure 4.4, by default, it will return the first five rows of the DataFrame, but you can specify the number of rows you wish to retrieve as an argument. For example, fy17.head(10) will return the first 10 rows of the DataFrame. Keep in mind that when you get rows back from Dask, they’re being loaded into your computer’s RAM. So, if you try to return too many rows of data, you will receive an out-of-memory error. Now let’s try the same call on the fy14 DataFrame.

Listing 4.5 Looking at the head of the fy14 DataFrame
fy14[common_columns].head()

'''
Produces the following output:

Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-----------------------+---------+----------+
| Column                | Found   | Expected |
+-----------------------+---------+----------+
| Issuer Squad          | object  | int64    |
| Unregistered Vehicle? | float64 | int64    |
| Violation Description | object  | float64  |
| Violation Legal Code  | object  | float64  |
| Violation Post Code   | object  | float64  |
+-----------------------+---------+----------+

The following columns also raised exceptions on conversion:

- Issuer Squad
  ValueError('cannot convert float NaN to integer',)
- Violation Description
  ValueError('invalid literal for float(): 42-Exp. Muni-Mtr (Com. Mtr. Z)',)
- Violation Legal Code
  ValueError('could not convert string to float: T',)
- Violation Post Code
  ValueError('invalid literal for float(): 05 -',)

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually
'''

Looks like Dask ran into trouble when trying to read the fy14 data! Thankfully, the Dask development team has given us some pretty detailed information in this error message about what happened. Five columns—Issuer Squad, Unregistered Vehicle?, Violation Description, Violation Legal Code, and Violation Post Code—failed to be read correctly because their datatypes were not what Dask expected. As we learned in chapter 2, Dask uses random sampling to infer datatypes to avoid scanning the entire (potentially massive) DataFrame. Although this usually works well, it can break down when a large number of values are missing in a column or the vast majority of data can be classified as one datatype (such as an integer), but a small number of edge cases break that assumption (such as a random string or two). When that happens, Dask will throw an exception once it begins to work on a computation. In order to help Dask read our dataset correctly, we’ll need to manually define a schema for our data instead of relying on type inference. Before we get around to doing that, let’s review what datatypes are available in Dask so we can create an appropriate schema for our data.

4.1.1 Using Dask datatypes

Similar to relational database systems, column datatypes play an important role in Dask DataFrames. They control what kind of operations can be performed on a column, how overloaded operators (+, -, and so on) behave, and how memory is allocated to store and access the column’s values. Unlike most collections and objects in Python, Dask DataFrames use explicit typing rather than duck typing. This means that all values contained in a column must conform to the same datatype. As we saw already, Dask will throw errors if values in a column are found that violate the column’s datatype.

Since Dask DataFrames consist of partitions made up of Pandas DataFrames, which in turn are complex collections of NumPy arrays, Dask sources its datatypes from NumPy. The NumPy library is a powerful and important mathematics library for Python. It enables users to perform advanced operations from linear algebra, calculus, and trigonometry. This library is important for the needs of data science because it provides the cornerstone mathematics for many statistical analysis methods and machine learning algorithms in Python. Let’s take a look at NumPy’s datatypes, which can be seen in figure 4.5.

Figure 4.5 NumPy datatypes used by Dask

c04_05.eps

As you can see, many of these reflect the primitive types in Python. The biggest difference is that NumPy datatypes can be explicitly sized with a specified bit-width. For example, the int32 datatype is a 32-bit integer that allows any integer between −2,147,483,648 and 2,147,483,647. Python, by comparison, always uses the maximum bit-width based on your operating system and hardware’s support. So, if you’re working on a computer with a 64-bit CPU and running a 64-bit OS, Python will always allocate 64 bits of memory to store an integer. The advantage of using smaller datatypes where appropriate is that you can hold more data in RAM and the CPU’s cache at one time, leading to faster, more efficient computations. This means that when creating a schema for your data, you should always choose the smallest possible datatype to hold your data. The risk, however, is that if a value exceeds the maximum size allowed by the particular datatype, you will experience overflow errors, so you should think carefully about the range and domain of your data.

For example, consider house prices in the United States: home prices are typically above $32,767 and are unlikely to exceed $2,147,483,647 for quite some time if historical inflation rates prevail. Therefore, if you were to store house prices rounded to the nearest whole dollar, the int32 datatype would be most appropriate. While the int64 and int128 types are wide enough to hold this range of numbers, it would be inefficient to use more than 32 bits of memory to store each value. Likewise, using int8 or int16 would not be large enough to hold the data, resulting in an overflow error.

If none of the NumPy datatypes are appropriate for the kind of data you have, a column can be stored as an object type, which represents any Python object. This is also the datatype that Dask will default to when its type inference comes across a column that has a mix of numbers and strings, or when type inference cannot determine an appropriate datatype to use. However, one common exception to this rule happens when you have a column with a high percentage of missing data. Take a look at figure 4.6, which shows part of the output of that last error message again.

Figure 4.6 A Dask error showing mismatched datatypes

c04_06.eps

Would you really believe that a column called Violation Description should be a floating-point number? Probably not! Typically, we can expect description columns to be text, and therefore Dask should use an object datatype. Then why did Dask’s type inference think the column holds 64-bit floating-point numbers? It turns out that a large majority of records in this DataFrame have missing violation descriptions. In the raw data, they are simply blank. Dask treats blank records as null values when parsing files, and by default fills in missing values with NumPy’s NaN (not a number) object called np.nan. If you use Python’s built-in type function to inspect the datatype of an object, it reports that np.nan is a float type. So, since Dask’s type inference randomly selected a bunch of np.nan objects when trying to infer the type of the Violation Description column, it assumed that the column must contain floating-point numbers. Now let’s fix the problem so we can read in our DataFrame with the appropriate datatypes.

4.1.2 Creating schemas for Dask DataFrames

Oftentimes when working with a dataset, you’ll know each column’s datatype, whether it can contain missing values, and its valid range of values ahead of time. This information is collectively known as the data’s schema. You’re especially likely to know the schema for a dataset if it came from a relational database. Each column in a database table must have a well-known datatype. If you have this information ahead of time, using with Dask is as easy as writing up the schema and applying it to the read_csv method. You’ll see how to do that at the end of this section. However, sometimes you might not know what the schema is ahead of time, and you’ll need to figure it out on your own. Perhaps you’re pulling data from a web API which hasn’t been properly documented or you’re analyzing a data extract and you don’t have access to the data source. Neither of these approaches is ideal because they can be tedious and time consuming, but sometimes you may really have no other option. Here are two methods you can try:

  • Guess-and-check
  • Manually sample the data

The guess-and-check method isn’t complicated. If you have well-named columns, such as Product Description, Sales Amount, and so on, you can try to infer what kind of data each column contains using the names. If you run into a datatype error while running a computation like the ones we’ve seen, simply update the schema and start over again. The advantage of this method is that you can quickly and easily try different schemas, but the downside is that it may become tedious to constantly restart your computations if they continue to fail due to datatype issues.

The manual sampling method aims to be a bit more sophisticated but can take more time up front since it involves scanning through some of the data to profile it. However, if you’re planning to analyze the dataset anyways, it’s not “wasted” time in the sense that you will be familiarizing yourself with the data while creating the schema. Let’s look at how we can do this.

Listing 4.6 Building a generic schema
import numpy as np
import pandas as pd

dtype_tuples = [(x, np.str) for x in common_columns]
dtypes = dict(dtype_tuples)
dtypes

'''
Displays the following output:
{'Date First Observed': str,
 'Days Parking In Effect    ': str,
 'Double Parking Violation': str,
 'Feet From Curb': str,
 'From Hours In Effect': str,
 ...
}
'''

First we need to build a dictionary that maps column names to datatypes. This must be done because the dtype argument that we’ll feed this object into later expects a dictionary type. To do that, in listing 4.6, we first walk through the common_columns list that we made earlier to hold all of the column names that can be found in all four DataFrames. We transform each column name into a tuple containing the column name and the np.str datatype, which represents strings. On the second line, we take the list of tuples and convert them into a dict, the partial contents of which are displayed. Now that we’ve constructed a generic schema, we can apply it to the read_csv function to use the schema to load the fy14 data into a DataFrame.

Listing 4.7 Creating a DataFrame with an explicit schema
fy14 = dd.read_csv('nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv', dtype=dtypes)

with ProgressBar():
    display(fy14[common_columns].head())

Listing 4.7 looks largely the same as the first time we read in the 2014 data file. However, this time we specified the dtype argument and passed in our schema dictionary. What happens under the hood is Dask will disable type inference for the columns that have matching keys in the dtype dictionary and use the explicitly specified types instead. While it’s perfectly reasonable to include only the columns you want to change, it’s best to not rely on Dask’s type inference at all whenever possible. Here I’ve shown you how to create an explicit schema for all columns in a DataFrame, and I encourage you to make this a regular practice when working with big datasets. With this particular schema, we’re telling Dask to just assume that all of the columns are strings. Now if we try to view the first five rows of the DataFrame again, using fy14[common_columns].head(), Dask doesn’t throw an error message! But we’re not done yet. We now need to have a look at each column and pick a more appropriate datatype (if possible) to maximize efficiency. Let’s have a look at the Vehicle Year column.

Listing 4.8 Inspecting the Vehicle Year column
with ProgressBar():
    print(fy14['Vehicle Year'].unique().head(10))

# Produces the following output:

0    2013
1    2012
2       0
3    2010
4    2011
5    2001
6    2005
7    1998
8    1995
9    2003
Name: Vehicle Year, dtype: object

In listing 4.8, we’re simply looking at 10 of the unique values contained in the Vehicle Year column. It looks like they are all integers that would fit comfortably in the uint16 datatype. uint16 is the most appropriate because years can’t be negative values, and these years are too large to be stored in uint8 (which has a maximum size of 255). If we had seen any letters or special characters, we would not need to proceed any further with analyzing this column. The string datatype we had already selected would be the only datatype suitable for the column.

One thing to be careful about is that a sample of 10 unique values might not be a sufficiently large enough sample size to determine that there aren’t any edge cases you need to consider. You could use .compute() instead of .head() to bring back all the unique values, but this might not be a good idea if the particular column you’re looking at has a high degree of uniqueness to it (such as a primary key or a high-dimensional category). The range of 10–50 unique samples has served me well in most cases, but sometimes you will still run into edge cases where you will need to go back and tweak your datatypes.

Since we’re thinking an integer datatype might be appropriate for this column, we need to check one more thing: Are there any missing values in this column? As you learned earlier, Dask represents missing values with np.nan, which is considered to be a float type object. Unfortunately, np.nan cannot be cast or coerced to an integer uint16 datatype. In the next chapter we will learn how to deal with missing values, but for now if we come across a column with missing values, we will need to ensure that the column will use a datatype that can support the np.nan object. This means that if the Vehicle Year column contains any missing values, we’ll be required to use a float32 datatype and not the uint16 datatype we originally thought appropriate because uint16 is unable to store np.nan.

Listing 4.9 Checking the Vehicle Year column for missing values
with ProgressBar():
    print(fy14['Vehicle Year'].isnull().values.any().compute())

# Produces the following output:
True

In listing 4.9, we’re using the isnull method, which checks each value in the specified column for existence of np.nan. It returns True if np.nan is found and False if it’s not, and then aggregates the checks for all rows into a Boolean Series. Chaining with .values.any() reduces the Boolean Series to a single True if at least one row is True, and False if no rows are True. This means that if the code in listing 4.9 returns True, at least one row in the Vehicle Year column is missing. If it returned False, it would indicate that no rows in the Vehicle Year column are missing data. Since we have missing values in the Vehicle Year column, we must use the float32 datatype for the column instead of uint16.

Now, we should repeat the process for the remaining 42 columns. For brevity’s sake, I’ve gone ahead and done this for you. In this particular instance, we could also use the data dictionary posted on the Kaggle webpage (at https://www.kaggle.com/new-york-city/nyc-parking-tickets/data) to help speed along this process.

Listing 4.10 The final schema for the NYC Parking Ticket Data
dtypes = {
 'Date First Observed': np.str,
 'Days Parking In Effect    ': np.str,
 'Double Parking Violation': np.str,
 'Feet From Curb': np.float32,
 'From Hours In Effect': np.str,
 'House Number': np.str,
 'Hydrant Violation': np.str,
 'Intersecting Street': np.str,
 'Issue Date': np.str,
 'Issuer Code': np.float32,
 'Issuer Command': np.str,
 'Issuer Precinct': np.float32,
 'Issuer Squad': np.str,
 'Issuing Agency': np.str,
 'Law Section': np.float32,
 'Meter Number': np.str,
 'No Standing or Stopping Violation': np.str,
 'Plate ID': np.str,
 'Plate Type': np.str,
 'Registration State': np.str,
 'Street Code1': np.uint32,
 'Street Code2': np.uint32,
 'Street Code3': np.uint32,
 'Street Name': np.str,
 'Sub Division': np.str,
 'Summons Number': np.uint32,
 'Time First Observed': np.str,
 'To Hours In Effect': np.str,
 'Unregistered Vehicle?': np.str,
 'Vehicle Body Type': np.str,
 'Vehicle Color': np.str,
 'Vehicle Expiration Date': np.str,
 'Vehicle Make': np.str,
 'Vehicle Year': np.float32,
 'Violation Code': np.uint16,
 'Violation County': np.str,
 'Violation Description': np.str,
 'Violation In Front Of Or Opposite': np.str,
 'Violation Legal Code': np.str,
 'Violation Location': np.str,
 'Violation Post Code': np.str,
 'Violation Precinct': np.float32,
 'Violation Time': np.str
}

Listing 4.10 contains the final schema for the NYC Parking Ticket data. Let’s use it to reload all four of the DataFrames, then union all four years of data together into a final DataFrame.

Listing 4.11 Applying the schema to all four DataFrames
data = dd.read_csv('nyc-parking-tickets/*.csv', dtype=dtypes, usecols=common_columns)

In listing 4.11, we reload the data and apply the schema we created. Notice that instead of loading four separate files into four separate DataFrames, we’re now loading all CSV files contained in the nyc-parking-tickets folder into a single DataFrame by using the * wildcard. Dask provides this for convenience since it’s common to split large datasets into multiple files, especially on distributed filesystems. As before, we’re passing the final schema into the dtype argument, and we’re now also passing the list of columns we want to keep into the usecols argument. usecols takes a list of column names and drops any columns from the resulting DataFrame that aren’t specified in the list. Since we only care about analyzing the data we have available for all four years, we’ll choose to simply ignore the columns that aren’t shared across all four years.

usecols is an interesting argument because if you look at the Dask API documentation, it’s not listed. It might not be immediately obvious why this is, but it’s because the argument comes from Pandas. Since each partition of a Dask DataFrame is a Pandas DataFrame, you can pass along any Pandas arguments through the *args and **kwargs interfaces and they will control the underlying Pandas DataFrames that make up each partition. This interface is also how you can control things like which column delimiter should be used, whether the data has a header or not, and so on. The Pandas API documentation for read_csv and its many arguments can be found at http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html.

We’ve now read in the data and we are ready to clean and analyze this DataFrame. If you count the rows, we have over 42.3 million parking violations to explore! However, before we get into that, we will look at interfacing with a few other storage systems as well as writing data. We’ll now look at reading data from relational database systems.

Get Data Science with Python and Dask
add to cart

4.2 Reading data from relational databases

Reading data from a relational database system (RDBMS) into Dask is fairly easy. In fact, you’re likely to find that the most tedious part of interfacing with RDBMSs is setting up and configuring your Dask environment to do so. Because of the wide variety of RDBMSs used in production environments, we can’t cover the specifics for each one here. But, a substantial amount of documentation and support is available online for the specific RDBMS you’re working with. The most important thing to be aware of is that when using Dask in a multi-node cluster, your client machine is not the only machine that will need access to the database. Each worker node needs to be able to access the database server, so it’s important to install the correct software and configure each node in the cluster to be able to do so.

Dask uses the SQL Alchemy library to interface with RDBMSs, and I recommend using the pyodbc library to manage your ODBC drivers. This means you will need to install and configure SQL Alchemy, pyodbc, and the ODBC drivers for your specific RDBMS on each machine in your cluster for Dask to work correctly. To learn more about SQL Alchemy, you can check out www.sqlalchemy.org/library.html. Likewise, you can learn more about pyodbc at https://github.com/mkleehammer/pyodbc/wiki.

Listing 4.12 Reading a SQL table into a Dask DataFrame
username = 'jesse'
password = 'DataScienceRulez'
hostname = 'localhost'
database_name = 'DSAS'
odbc_driver = 'ODBC+Driver+13+for+SQL+Server'

connection_string = 'mssql+pyodbc://{0}:{1}@{2}/{3}?driver={4}'.format(username, password, hostname, database_name, odbc_driver)

data = dd.read_sql_table('violations', connection_string, index_col='Summons Number')

In listing 4.12, we first set up a connection to our database server by building a connection string. For this particular example, I’m using SQL Server on Linux from the official SQL Server Docker container on a Mac. Your connection string might look different based on the database server and operating system you’re running on. The last line demonstrates how to use the read_sql_table function to connect to the database and create the DataFrame. The first argument is the name of the database table you want to query, the second argument is the connection string, and the third argument is the column to use as the DataFrame’s index. These are the three required arguments for this function to work. However, you should be aware of a few important assumptions.

First, concerning datatypes, you might think that Dask gets datatype information directly from the database server since the database has a defined schema already. Instead, Dask samples the data and infers datatypes just like it does when reading a delimited text file. However, Dask sequentially reads the first five rows from the table instead of randomly sampling data across the dataset. Because databases indeed have a well-defined schema, Dask’s type inference is much more reliable when reading data from an RDBMS versus a delimited text file. However, it’s still not perfect. Because of the way data might be sorted, edge cases can come up that cause Dask to choose incorrect datatypes. For example, a string column might have some rows where the strings contain only numbers (“1456,” “2986,” and so on.) If the data is sorted in such a way that only these numeric-like strings appear in the sample Dask takes when inferring datatypes, it may incorrectly assume the column should be an integer datatype instead of a string datatype. In these situations, you may still have to do some manual schema tweaking as you learned in the previous section.

The second assumption is how the data should be partitioned. If the index_col (currently set to 'Summons Number') is a numeric or date/time datatype, Dask will automatically infer boundaries and partition the data based on a 256 MB block size (which is larger than read_csv’s 64 MB block size). However, if the index_col is not a numeric or date/time datatype, you must either specify the number of partitions or the boundaries to partition the data by.

Listing 4.13 Even partitioning on a non-numeric or date/time index
data = dd.read_sql_table('violations', connection_string, index_col='Vehicle Color', npartitions=200)

In listing 4.13, we chose to index the DataFrame by the Vehicle Color column, which is a string column. Therefore, we have to specify how the DataFrame should be partitioned. Here, using the npartitions argument, we are telling Dask to split the DataFrame into 200 evenly sized pieces. Alternatively, we can manually specify boundaries for the partitions.

Listing 4.14 Custom partitioning on a non-numeric or date/time index
partition_boundaries = sorted(['Red', 'Blue', 'White', 'Black', 'Silver', 'Yellow'])

data = dd.read_sql_table('violations', connection_string, index_col='Vehicle Color', divisions=partition_boundaries)

Listing 4.14 shows how to manually define partition boundaries. The important thing to note about this is Dask uses these boundaries as an alphabetically sorted half-closed interval. This means that you won’t have partitions that only contain the color defined by their boundary. For example, because green is alphabetically between blue and red, green cars will fall into the red partition. The “red partition” is actually all colors that are alphabetically greater than blue and alphabetically less than or equal to red. This isn’t really intuitive at first and can take some getting used to.

The third assumption that Dask makes when you pass only the minimum required parameters is that you want to select all columns from the table. You can limit the columns you get back using the columns argument, which behaves similarly to the usecols argument in read_csv. While you are allowed to use SQL Alchemy expressions in the argument, I recommend that you avoid offloading any computations to the database server, since you lose the advantages of parallelizing that computation that Dask gives you.

Listing 4.15 Selecting a subset of columns
# Equivalent to:
# SELECT [Summons Number], [Plate ID], [Vehicle Color] FROM dbo.violations
column_filter = ['Summons Number', 'Plate ID', 'Vehicle Color']
data = dd.read_sql_table('violations', connection_string, index_col='Summons Number', columns=column_filter)

Listing 4.15 shows how to add a column filter to the connection query. Here we’ve created a list of column names that exist in the table; then we pass them to the columns argument. You can use the column filter even if you are querying a view instead of a table.

The fourth and final assumption made by providing the minimum arguments is the schema selection. When I say “schema” here, I’m not referring to the datatypes used by the DataFrame; I’m referring to the database schema object that RDBMSs use to group tables into logical clusters (such as dim/fact in a data warehouse or sales, hr, and so on, in a transactional database). If you don’t provide a schema, the database driver will use the default for the platform. For SQL Server, this results in Dask looking for the violations table in the dbo schema. If we had put the table in a different schema, perhaps one called chapterFour, we would receive a “table not found” error.

Listing 4.16 Specifying a database schema
# Equivalent to:
# SELECT * FROM chapterFour.violations
data = dd.read_sql_table('violations', connection_string, index_col='Summons Number', schema='chapterFour')

Listing 4.16 shows you how to select a specific schema from Dask. Passing the schema name into the schema argument will cause Dask to use the provided database schema rather than the default.

Like read_csv, Dask allows you to forward along arguments to the underlying calls to the Pandas read_sql function being used at the partition level to create the Pandas DataFrames. We’ve covered all the most important functions here, but if you need an extra degree of customization, have a look at the API documentation for the Pandas read_sql function. All its arguments can be manipulated using the *args and **kwargs interfaces provided by Dask DataFrames. Now we’ll look at how Dask deals with distributed filesystems.

Sign in for more free preview time

4.3 Reading data from HDFS and S3

While it’s very likely that many datasets you’ll come across throughout your work will be stored in relational databases, powerful alternatives are rapidly growing in popularity. Most notable are the developments in distributed filesystem technologies from 2006 onward. Powered by technologies like Apache Hadoop and Amazon’s Simple Storage System (or S3 for short), distributed filesystems bring the same benefits to file storage that distributed computing brings to data processing: increased throughput, scalability, and robustness. Using a distributed computing framework alongside a distributed filesystem technology is a harmonious combination: in the most advanced distributed filesystems, such as the Hadoop Distributed File System (HDFS), nodes are aware of data locality, allowing computations to be shipped to the data rather than the data shipped to the compute resources. This saves a lot of time and back-and-forth communication over the network. Figure 4.7 demonstrates why keeping data isolated so a single node can have some performance consequences.

Figure 4.7 Running a distributed computation without a distributed filesystem

c04_07.eps

A significant bottleneck is caused by the need to chunk up and ship data to the other nodes in the cluster. Under this configuration, when Dask reads in the data, it will partition the DataFrame as usual, but the other worker nodes can’t do any work until a partition of data is sent to them. Because it takes some time to transfer these 64 MB chunks over the network, the total computation time will be increased by the time it takes to ship data back and forth between the node that has the data and the other workers. This becomes even more problematic if the size of the cluster grows by any significant amount. If we had several hundred (or more) worker nodes vying for chunks of data all at once, the networking stack on the data node could easily get saturated with requests and slow to a crawl. Both of these problems can be mitigated by using a distributed filesystem. Figure 4.8 shows how distributing the data across worker nodes makes the process more efficient.

Figure 4.8 Running a distributed computation on a distributed filesystem

c04_08.eps

Instead of creating a bottleneck by holding data on only one node, the distributed filesystem chunks up data ahead of time and spreads it across multiple machines. It’s standard practice in many distributed filesystems to store redundant copies of chunks/partitions both for reliability and performance. From the perspective of reliability, storing each partition in triplicate (which is a common default configuration) means that two separate machines would have to fail before any data loss occurs. The probability of two machines failing in a short amount of time is much lower than the probability of one machine failing, so it adds an extra layer of safety at a nominal cost of additional storage.

From the performance perspective, spreading the data out across the cluster makes it more likely that a node containing the data will be available to run a computation when requested. Or, in the event that all worker nodes that hold that partition are already busy, one of them can ship the data to another worker node. In this case, spreading out the data avoids any single node getting saturated by requests for data. If one node is busy serving up a bunch of data, it can offload some of those requests to other nodes that hold the requested data. Figure 4.9 demonstrates why data-local distributed filesystems are even more advantageous.

Figure 4.9 Shipping computations to the data

c04_09.eps

The node controlling the orchestration of the distributed computation (called the driver) knows that the data it wants to process is available in a few locations because the distributed filesystem maintains a catalogue of the data held within the system. It will first ask the machines that have the data locally whether they’re busy or not. If one of the nodes is not busy, the driver will instruct the worker node to perform the computation. If all the nodes are busy, the driver can either choose to wait until one of the worker nodes is free, or instruct another free worker node to get the data remotely and run the computation. HDFS and S3 are two of the most popular distributed filesystems, but they have one key difference for our purposes: HDFS is designed to allow computations to run on the same nodes that serve up data, and S3 is not. Amazon designed S3 as a web service dedicated solely to file storage and retrieval. There’s absolutely no way to execute application code on S3 servers. This means that when you work with data stored in S3, you will always have to transmit partitions from S3 to a Dask worker node in order to process it. Let’s now take a look at how we can use Dask to read data from these systems.

Listing 4.17 Reading data from HDFS
data = dd.read_csv('hdfs://localhost/nyc-parking-tickets/*.csv', dtype=dtypes, usecols=common_columns)

In listing 4.17, we have a read_csv call that should look very familiar by now. In fact, the only thing that’s changed is the file path. Prefixing the file path with hdfs:// tells Dask to look for the files on an HDFS cluster instead of the local filesystem, and localhost indicates that Dask should query the local HDFS NameNode for information on the whereabouts of the file.

All the arguments for read_csv that you learned before can still be used here. In this way, Dask makes it extremely easy to work with HDFS. The only additional requirement is that you install the hdfs3 library on each of your Dask workers. This library allows Dask to communicate with HDFS; therefore, this functionality won’t work if you haven’t installed the package. You can simply install the package with pip or conda (hdfs3 is on the conda-forge channel).

Listing 4.18 Reading data from S3
data = dd.read_csv('s3://my-bucket/nyc-parking-tickets/*.csv', dtype=dtypes, usecols=common_columns)

In listing 4.18, our read_csv call is (again) almost exactly the same as listing 4.17. This time, however, we’ve prefixed the file path with s3:// to tell Dask that the data is located on an S3 filesystem, and my-bucket lets Dask know to look for the files in the S3 bucket associated with your AWS account named “my-bucket”.

In order to use the S3 functionality, you must have the s3fs library installed on each Dask worker. Like hdfs3, this library can be installed simply through pip or conda (from the conda-forge channel). The final requirement is that each Dask worker is properly configured for authenticating with S3. s3fs uses the boto library to communicate with S3. You can learn more about configuring boto at http://boto.cloudhackers.com/en/latest/getting_started.html. The most common S3 authentication configuration consists of using the AWS Access Key and AWS Secret Access Key. Rather than injecting these keys in your code, it’s a better idea to set these values using environment variables or a configuration file. Boto will check both the environment variables and the default configuration paths automatically, so there’s no need to pass authentication credentials directly to Dask. Otherwise, as with using HDFS, the call to read_csv allows you to do all the same things as if you were operating on a local filesystem. Dask really makes it easy to work with distributed filesystems!

Now that you have some experience working with a few different storage systems, we’ll round out the “reading data” part of this chapter by talking about a special file format that is very useful for fast computations.

join today to enjoy all our content. all the time.
 

4.4 Reading data in Parquet format

CSV and other delimited text files are great for their simplicity and portability, but they aren’t really optimized for the best performance, especially when performing complex data operations such as sorts, merges, and aggregations. While a wide variety of file formats attempt to increase efficiency in many different ways, with mixed results, one of the more recent high-profile file formats is Apache Parquet. Parquet is a high-performance columnar storage format jointly developed by Twitter and Cloudera that was designed with use on distributed filesystems in mind. Its design brings several key advantages to the table over text-based formats: more efficient use of IO, better compression, and strict typing. Figure 4.10 shows the difference in how data is stored in Parquet format versus a row-oriented storage scheme like CSV.

Figure 4.10 The structure of Parquet compared with delimited text files

c04_10.eps

With row-oriented formats, values are stored on disk and in memory sequentially based on the row position of the data. Consider what we’d have to do if we wanted to perform an aggregate function over x, such as finding the mean. To collect all the values of x, we’d have to scan over 10 values in order to get the 4 values we want. This means we spend more time waiting for IO completion just to throw away over half of the values read from disk. Compare that with the columnar format: in that format, we’d simply grab the sequential chunk of x values and have all four values we want. This seeking operation is much faster and more efficient.

Another significant advantage of applying column-oriented chunking of the data is that the data can now be partitioned and distributed by column. This leads to much faster and more efficient shuffle operations, since only the columns that are necessary for an operation can be transmitted over the network instead of entire rows.

Finally, efficient compression is also a major advantage of Parquet. With column-oriented data, it’s possible to apply different compression schemes to individual columns so the data becomes compressed in the most efficient way possible. Python’s Parquet library supports many of the popular compression algorithms such as gzip, lzo, and snappy.

To use Parquet with Dask, you need to make sure you have the fastparquet or pyarrow library installed, both of which can be installed either via pip or conda (conda-forge). I would generally recommend using pyarrow over fastparquet, because it has better support for serializing complex nested data structures. You can also install the compression libraries you want to use, such as python-snappy or python-lzo, which are also available via pip or conda (conda-forge). Now let’s take a look at reading the NYC Parking Ticket dataset one more time in Parquet format. As a side note, we will be using Parquet format extensively through the book, and in the next chapter you will write some of the NYC Parking Ticket dataset to Parquet format. Therefore, you will see the read_parquet method many more times! This discussion is here to simply give you a first look at how to use the method. Now, without further ado, here’s how to use the read_parquet method.

Listing 4.19 Reading in Parquet data
data = dd.read_parquet('nyc-parking-tickets-prq')

Listing 4.19 is about as simple as it gets! The read_parquet method is used to create a Dask DataFrame from one or more Parquet files, and the only required argument is the path. One thing to notice about this call that might look strange: nyc-parking-tickets-prq is a directory, not a file. That’s because datasets stored as Parquet are typically written to disk pre-partitioned, resulting in potentially hundreds or thousands of individual files. Dask provides this method for convenience so you don’t have to manually create a long list of filenames to pass in. You can specify a single Parquet file in the path if you want to, but it’s much more typical to see Parquet datasets referenced as a directory of files rather than individual files.

Listing 4.20 Reading Parquet files from distributed filesystems
data = dd.read_parquet('hdfs://localhost/nyc-parking-tickets-prq')

# OR

data = dd.read_parquet('s3://my-bucket/nyc-parking-tickets-prq')

Listing 4.20 shows how to read Parquet from distributed filesystems. Just as with delimited text files, the only difference is specifying a distributed filesystem protocol, such as hdfs or s3, and specifying the relevant path to the data.

Parquet is stored with a predefined schema, so there are no options to mess with datatypes. The only real relevant options that Dask gives you to control importing Parquet data are column filters and index selection. These work the same way as with the other file formats. By default, they will be inferred from the schema stored alongside the data, but you can override that selection by manually passing in values to the relevant arguments.

Listing 4.21 Specifying Parquet read options
columms = ['Summons Number', 'Plate ID', 'Vehicle Color']

data = dd.read_parquet('nyc-parking-tickets-prq', columns=columns, index='Plate ID')

In listing 4.21, we pick a few columns that we want to read from the dataset and put them in a list called columns. We then pass in the list to the columns argument, and we specify Plate ID to be used as the index by passing it in to the index argument. The result of this will be a Dask DataFrame containing only the three columns shown here and sorted/indexed by the Plate ID column.

We’ve now covered a number of ways to get data into Dask from a myriad array of systems and formats. As you can see, the DataFrame API offers a great deal of flexibility to ingest structured data in fairly simple ways. In the next chapter, we’ll cover fundamental data transformations and, naturally, writing data back out in a number of different ways.

Summary

  • Inspecting the columns of a DataFrame can be done with the columns attribute.
  • Dask’s datatype inference shouldn’t be relied on for large datasets. Instead, you should define your own schemas based on common NumPy datatypes.
  • Parquet format offers good performance because it’s a column-oriented format and highly compressible. Whenever possible, try to get your dataset in Parquet format.
sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage