Examples

Transportation

Introduction

New York City released an extremely detailed data set of over 1 billion taxi rides taken in the city - this data has become a popular target for analysis by tech bloggers and has been very well studied. For this reason, we thought it would be interesting to import this data to Pilosa in order to compare with other data stores and techniques on the exact same data set.

Transportation in general is a compelling use case for Pilosa as it often involves multiple disparate data sources, as well as high rate, real time, and extremely large amounts of data (particularly if one wants to draw reasonable conclusions).

We’ve written a tool to help import the NYC taxi data into Pilosa - this tool is part of the PDK (Pilosa Development Kit), and takes advantage of a number of reusable modules that may help you import other data as well. Follow along and we’ll explain the whole process step by step.

After initial setup, the PDK import tool does everything we need to define a Pilosa schema, map data to bitmaps accordingly, and import it into Pilosa.

Data Model

The NYC taxi data is comprised of a number of csv files listed here: http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml. These data files have around 20 columns, about half of which are relevant to the benchmark queries we’re looking at:

We import these fields, creating one or more Pilosa fields from each of them:

field mapping
cab_type direct map of enum int → row ID
dist_miles round(dist) → row ID
total_amount_dollars round(dist) → row ID
passenger_count direct map of integer value → row ID
drop_grid_id (lat, lon) → 100x100 rectangular grid → cell ID
drop_year year(timestamp) → row ID
drop_month month(timestamp) → row ID
drop_day day(timestamp) → row ID
drop_time time of day mapped to one of 48 half-hour buckets
pickup_grid_id (lat, lon) → 100x100 rectangular grid → cell ID
pickup_year year(timestamp) → row ID
pickup_month month(timestamp) → row ID
pickup_day day(timestamp) → row ID
pickup_time time of day mapped to one of 48 half-hour buckets → row ID

We also created two extra fields that represent the duration and average speed of each ride:

field mapping
duration_minutes round(drop_timestamp - pickup_timestamp) → row ID
speed_mph round(dist_miles / (drop_timestamp - pickup_timestamp)) → row ID

Mapping

Each column that we want to use must be mapped to a combination of fields and row IDs according to some rule. There are many ways to approach this mapping, and the taxi dataset gives us a good overview of possibilities.

0 columns → 1 field

cab_type: contains one row for each type of cab. Each column, representing one ride, has a bit set in exactly one row of this field. The mapping is a simple enumeration, for example yellow=0, green=1, etc. The values of the bits in this field are determined by the source of the data. That is, we’re importing data from several disparate sources: NYC yellow taxi cabs, NYC green taxi cabs, and Uber cars. For each source, the single row to be set in the cab_type field is constant.

1 column → 1 field

The following three fields are mapped in a simple direct way from single columns of the original data.

dist_miles: each row represents rides of a certain distance. The mapping is simple: as an example, row 1 represents rides with a distance in the interval [0.5, 1.5]. That is, we round the floating point value of distance to an integer, and use that as the row ID directly. Generally, the mapping from a floating point value to a row ID could be arbitrary. The rounding mapping is concise to implement, which simplifies importing and analysis. As an added bonus, it’s human-readable. We’ll see this pattern used several times.

In PDK parlance, we define a Mapper, which is simply a function that returns integer row IDs. PDK has a number of predefined mappers that can be described with a few parameters. One of these is LinearFloatMapper, which applies a linear function to the input, and casts it to an integer, so the rounding is handled implicitly. In code:

lfm := pdk.LinearFloatMapper{
    Min: -0.5,
    Max: 3600.5,
    Res: 3601,
}

Min and Max define the linear function, and Res determines the maximum allowed value for the output row ID - we chose these values to produce a “round to nearest integer” behavior. Other predefined mappers have their own specific parameters, usually two or three.

This mapper function is the core operation, but we need a few other pieces to define the overall process, which is encapsulated in the BitMapper object. This object defines which field(s) of the input data source to use (Fields), how to parse them (Parsers), what mapping to use (Mapper), and the name of the field to use (Frame). TODO update so this makes sense

pdk.BitMapper{
    Frame:   "dist_miles",
    Mapper:  lfm,
    Parsers: []pdk.Parser{pdk.FloatParser{}},
    Fields:  []int{fields["trip_distance"]},
},

These same objects are represented in the JSON definition file:

{
    "Fields": {
        "Trip_distance": 10
    },
    "Mappers": [
        {
            "Name": "lfm0",
            "Min": -0.5,
            "Max": 3600.5,
            "Res": 3600
        }
    ],
    "BitMappers": [
        {
            "Frame": "dist_miles",
            "Mapper": {
                "Name": "lfm0"
            },
            "Parsers": [
                {"Name": "FloatParser"}
            ],
            "Fields": "Trip_distance"
        }
    ]
}

Here, we define a list of Mappers, each including a name, which we use to refer to the mapper later, in the list of BitMappers. We can also do this with Parsers, but a few simple Parsers that need no configuration are available by default. We also have a list of Fields, which is simply a map of field names (in the source data) to column indices (in Pilosa). We use these names in the BitMapper definitions to keep things human-readable.

total_amount_dollars: Here we use the rounding mapping again, so each row represents rides with a total cost that rounds to the row’s ID. The BitMapper definition is very similar to the previous one.

passenger_count: This column contains small integers, so we use one of the simplest possible mappings: the column value is the row ID.

1 column → multiple fields

When working with a composite data type like a timestamp, there are plenty of mapping options. In this case, we expect to see interesting periodic trends, so we want to encode the cyclic components of time in a way that allows us to look at them independently during analysis.

We do this by storing time data in four separate fields for each timestamp: one each for the year, month, day, and time of day. The first three are mapped directly. For example, a ride with a date of 2015/06/24 will have a bit set in row 2015 of field “year”, row 6 of field “month”, and row 24 of field “day”.

We might continue this pattern with hours, minutes, and seconds, but we don’t have much use for that level of precision here, so instead we use a “bucketing” approach. That is, we pick a resolution (30 minutes), divide the day into buckets of that size, and create a row for each one. So a ride with a time of 6:45AM has a bit set in row 13 of field “time_of_day”.

We do all of this for each timestamp of interest, one for pickup time and one for dropoff time. That gives us eight total fields for two timestamps: pickup_year, pickup_month, pickup_day, pickup_time, drop_year, drop_month, drop_day, drop_time.

Multiple columns → 1 field

The ride data also contains geolocation data: latitude and longitude for both pickup and dropoff. We just want to be able to produce a rough overview heatmap of ride locations, so we use a grid mapping. We divide the area of interest into a 100x100 grid in latitude-longitude space, label each cell in this grid with a single integer, and use that integer as the row ID.

We do all of this for each location of interest, one for pickup and one for dropoff. That gives us two fields for two locations: pickup_grid_id, drop_grid_id.

Again, there are many mapping options for location data. For example, we might convert to a different coordinate system, apply a projection, or aggregate locations into real-world regions such as neighborhoods. Here, the simple approach is sufficient.

Complex mappings

We also anticipate looking for trends in ride duration and speed, so we want to capture this information during the import process. For the field duration_minutes, we compute a row ID as round((drop_timestamp - pickup_timestamp).minutes). For the field speed_mph, we compute row ID as round(dist_miles / (drop_timestamp - pickup_timestamp).minutes). These mapping calculations are straightforward, but because they require arithmetic operations on multiple columns, they are a bit too complex to capture in the basic mappers available in PDK. Instead, we define custom mappers to do the work:

durm := pdk.CustomMapper{
    Func: func(fields ...interface{}) interface{} {
        start := fields[0].(time.Time)
        end := fields[1].(time.Time)
        return end.Sub(start).Minutes()
    },
    Mapper: lfm,
}

Import process

After designing this schema and mapping, we capture it in a JSON definition file that can be read by the PDK import tool. Running pdk taxi runs the import based on the information in this file. For more details, see the PDK section, or check out the code itself.

Queries

Now we can run some example queries.

Count per cab type can be retrieved, sorted, with a single PQL call.

TopN(cab_type)
{"results":[[{"id":1,"count":1992943},{"id":0,"count":7057}]]}

High traffic location IDs can be retrieved with a similar call. These IDs correspond to latitude, longitude pairs, which can be recovered from the mapping that generates the IDs.

TopN(pickup_grid_id)
{"results":[[{"id":5060,"count":40620},{"id":4861,"count":38145},{"id":4962,"count":35268},...]]}

Average of total_amount per passenger_count can be computed with some postprocessing. We use a small number of TopN calls to retrieve counts of rides by passenger_count, then use those counts to compute an average.

queries = ''
pcounts = range(10)
for i in pcounts:
    queries += "TopN(Row(passenger_count=%d), total_amount_dollars)" % i
resp = requests.post(qurl, data=queries)

average_amounts = []
for pcount, topn in zip(pcounts, resp.json()['results']):
    wsum = sum([r['count'] * r['key'] for r in topn])
    count = sum([r['count'] for r in topn])
    average_amounts.append(float(wsum)/count)
Note that the BSI-powered Sum query now provides an alternative approach to this kind of query.

For more examples and details, see this ipython notebook.


View markdown source on Github. Last updated 4 months ago.