#!/usr/bin/env python # coding: utf-8 # # What borough has the fastest NYC taxi drivers? # In[ ]: from __future__ import division # In[ ]: get_ipython().run_line_magic('reload_ext', 'autotime') # In[ ]: from blaze import by, compute, transform, Data, sin, cos, atan2, sqrt, radians, summary, greatest, symbol from odo import odo, drop, resource, Temp, CSV, S3 # In[ ]: connect_args = dict(sslmode='verify-ca') # In[ ]: d = Data('redshift://cio@localhost:15439/dev::trip', connect_args=connect_args) # In[ ]: d.count() # In[ ]: def haversine_distance(start, stop, R=3959): """Compute the distance between two sets of `start` and `stop` lat, lon points """ # http://andrew.hedges.name/experiments/haversine/ lat1, lon1 = start lat2, lon2 = stop dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(radians(dlat) / 2.0) ** 2 + cos(lat1) * cos(lat2) * sin(radians(dlon) / 2.0) ** 2 return R * 2 * atan2(sqrt(greatest(a, 0.0)), sqrt(greatest(1.0 - a, 0.0))) # In[ ]: max_distance = haversine_distance(start=[40.477399, -74.259090], stop=[40.917577, -73.700272]) max_distance # ## Reduce the data to the area defined as NYC # In[ ]: # http://hafen.github.io/taxi/#reading-in-to-r min_lat, max_lat = 40.477399, 40.917577 min_lon, max_lon = -74.259090, -73.700272 valid = d[ (d.trip_distance > 0) & (d.trip_distance <= max_distance) & (d.trip_time_in_secs > 0) & (d.passenger_count > 0) & (d.passenger_count <= 5) & (d.pickup_latitude >= min_lat) & (d.pickup_latitude <= max_lat) & (d.dropoff_latitude >= min_lat) & (d.dropoff_latitude <= max_lat) & (d.pickup_longitude >= min_lon) & (d.pickup_longitude <= max_lon) & (d.dropoff_longitude >= min_lon) & (d.dropoff_longitude <= max_lon) ] # In[ ]: valid # In[ ]: computed_distance = haversine_distance(start=[valid.pickup_latitude, valid.pickup_longitude], stop=[valid.dropoff_latitude, valid.dropoff_longitude]) trip_time_in_hours = valid.trip_time_in_secs.coerce('float64') / 3600.0 calcd = transform(valid, avg_speed_in_mph=valid.trip_distance / trip_time_in_hours, trip_time_in_hours=trip_time_in_hours) # In[ ]: calcd # ### Assume that > 120 MPH is invalid # In[ ]: data = calcd[(calcd.avg_speed_in_mph > 5) & (calcd.avg_speed_in_mph <= 120)][ [ 'avg_speed_in_mph', 'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude' ] ] # In[ ]: data # In[ ]: print(compute(data)) # ## A small aside, with `odo` # ### Let's look at the distribution of rides by borough # # #### Q: Where do we get borough geolocation data? # #### A: From NYC's own API! # In[ ]: import requests as r # In[ ]: resp = r.get( 'https://data.cityofnewyork.us/api/geospatial/tqmj-j8zm', params=dict(method='export', format='KML') ) # In[ ]: import lxml import lxml.etree # #### Parse the KML with `lxml` # In[ ]: kml = lxml.etree.fromstring(resp.text.encode('utf8')) # In[ ]: print(resp.text) # #### `Placemark` elements delineate boroughs # In[ ]: placemarks = kml.xpath("//*[local-name()='Placemark']") # #### Create some functions to pull out the borough names and points # In[ ]: from toolz.curried import map from toolz import pipe, concat import io def find_borough_name(text): return re.search(r'.*(Brooklyn|Manhattan|Bronx|Staten Island|Queens).*', text).group(1) def parse_coords(p, name): coords = p.xpath('.//*[local-name()="coordinates"]/text()') return pipe(coords, map(str.split), map(lambda x: '\n'.join(map(lambda y: '%s,%s' % (y, name), x))), '\n'.join) def parse_kml(placemarks): result = [] for p in placemarks: desc, = p.xpath(".//*[local-name()='description']") name = find_borough_name(desc.text) result.append(parse_coords(p, name)) return '\n'.join(result) # #### Parse the location data # In[ ]: geodata = parse_kml(placemarks) geodata[:geodata.find('\n')] # #### Inspect our data with pandas # In[ ]: df = pd.read_csv(io.StringIO(geodata), names=['lon', 'lat', 'name']).drop_duplicates().reset_index(drop=True) # In[ ]: df.head() # In[ ]: df.describe() # #### After we're satisfied, let's throw everything into redshift for later analysis # In[ ]: boros = odo( odo(df, Temp(S3(CSV))), 'redshift://cio@localhost:15439/dev::boros', connect_args=connect_args ) # In[ ]: len(df) # #### Compute the bounding box for each borough # In[ ]: bounds = sa.select([ boros.c.name, sa.func.min(boros.c.lon).label('min_lon'), sa.func.min(boros.c.lat).label('min_lat'), sa.func.max(boros.c.lon).label('max_lon'), sa.func.max(boros.c.lat).label('max_lat') ]).group_by(boros.c.name).alias() # In[ ]: print(bounds) # #### Get out the SQLAlchemy table when blaze isn't enough # In[ ]: data # In[ ]: nyc = compute(data).alias() # In[ ]: joined = nyc.join( bounds, onclause=( (nyc.c.pickup_longitude >= bounds.c.min_lon) & (nyc.c.pickup_longitude <= bounds.c.max_lon) & (nyc.c.pickup_latitude >= bounds.c.min_lat) & (nyc.c.pickup_latitude <= bounds.c.max_lat) & (nyc.c.dropoff_longitude >= bounds.c.min_lon) & (nyc.c.dropoff_longitude <= bounds.c.max_lon) & (nyc.c.dropoff_latitude >= bounds.c.min_lat) & (nyc.c.dropoff_latitude <= bounds.c.max_lat) ) ) # In[ ]: sel = sa.select([ bounds.c.name, nyc.c.avg_speed_in_mph, nyc.c.pickup_latitude, nyc.c.pickup_longitude, nyc.c.dropoff_latitude, nyc.c.dropoff_longitude ]).select_from(joined).alias() # In[ ]: per_boro_speeds = sa.select([ sel.c.name, sa.func.avg(sel.c.avg_speed_in_mph).label('avg_speed'), sa.func.count(sel.c.avg_speed_in_mph).label('nsamples') ]).group_by(sel.c.name) # In[ ]: import sqlparse # In[ ]: print(sqlparse.format(str(per_boro_speeds), reindent=True)) # In[ ]: speeds = odo( per_boro_speeds.order_by(sa.desc(per_boro_speeds.c.avg_speed)), pd.DataFrame, connect_args=connect_args ) # In[ ]: speeds