from __future__ import division
%reload_ext autotime
from blaze import by, compute, transform, Data, sin, cos, atan2, sqrt, radians, summary, greatest, symbol
from odo import odo, drop, resource, Temp, CSV, S3
connect_args = dict(sslmode='verify-ca')
d = Data('redshift://cio@localhost:15439/dev::trip', connect_args=connect_args)
d.count()
def haversine_distance(start, stop, R=3959):
"""Compute the distance between two sets of `start` and `stop` lat, lon points
"""
# http://andrew.hedges.name/experiments/haversine/
lat1, lon1 = start
lat2, lon2 = stop
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(radians(dlat) / 2.0) ** 2 + cos(lat1) * cos(lat2) * sin(radians(dlon) / 2.0) ** 2
return R * 2 * atan2(sqrt(greatest(a, 0.0)), sqrt(greatest(1.0 - a, 0.0)))
max_distance = haversine_distance(start=[40.477399, -74.259090], stop=[40.917577, -73.700272])
max_distance
# http://hafen.github.io/taxi/#reading-in-to-r
min_lat, max_lat = 40.477399, 40.917577
min_lon, max_lon = -74.259090, -73.700272
valid = d[
(d.trip_distance > 0) &
(d.trip_distance <= max_distance) &
(d.trip_time_in_secs > 0) &
(d.passenger_count > 0) & (d.passenger_count <= 5) &
(d.pickup_latitude >= min_lat) & (d.pickup_latitude <= max_lat) &
(d.dropoff_latitude >= min_lat) & (d.dropoff_latitude <= max_lat) &
(d.pickup_longitude >= min_lon) & (d.pickup_longitude <= max_lon) &
(d.dropoff_longitude >= min_lon) & (d.dropoff_longitude <= max_lon)
]
valid
computed_distance = haversine_distance(start=[valid.pickup_latitude, valid.pickup_longitude],
stop=[valid.dropoff_latitude, valid.dropoff_longitude])
trip_time_in_hours = valid.trip_time_in_secs.coerce('float64') / 3600.0
calcd = transform(valid,
avg_speed_in_mph=valid.trip_distance / trip_time_in_hours,
trip_time_in_hours=trip_time_in_hours)
calcd
data = calcd[(calcd.avg_speed_in_mph > 5) & (calcd.avg_speed_in_mph <= 120)][
[
'avg_speed_in_mph',
'pickup_latitude',
'pickup_longitude',
'dropoff_latitude',
'dropoff_longitude'
]
]
data
print(compute(data))
odo
¶import requests as r
resp = r.get(
'https://data.cityofnewyork.us/api/geospatial/tqmj-j8zm',
params=dict(method='export', format='KML')
)
import lxml
import lxml.etree
lxml
¶kml = lxml.etree.fromstring(resp.text.encode('utf8'))
print(resp.text)
Placemark
elements delineate boroughs¶placemarks = kml.xpath("//*[local-name()='Placemark']")
from toolz.curried import map
from toolz import pipe, concat
import io
def find_borough_name(text):
return re.search(r'.*(Brooklyn|Manhattan|Bronx|Staten Island|Queens).*', text).group(1)
def parse_coords(p, name):
coords = p.xpath('.//*[local-name()="coordinates"]/text()')
return pipe(coords,
map(str.split),
map(lambda x: '\n'.join(map(lambda y: '%s,%s' % (y, name), x))),
'\n'.join)
def parse_kml(placemarks):
result = []
for p in placemarks:
desc, = p.xpath(".//*[local-name()='description']")
name = find_borough_name(desc.text)
result.append(parse_coords(p, name))
return '\n'.join(result)
geodata = parse_kml(placemarks)
geodata[:geodata.find('\n')]
df = pd.read_csv(io.StringIO(geodata), names=['lon', 'lat', 'name']).drop_duplicates().reset_index(drop=True)
df.head()
df.describe()
boros = odo(
odo(df, Temp(S3(CSV))),
'redshift://cio@localhost:15439/dev::boros',
connect_args=connect_args
)
len(df)
bounds = sa.select([
boros.c.name,
sa.func.min(boros.c.lon).label('min_lon'),
sa.func.min(boros.c.lat).label('min_lat'),
sa.func.max(boros.c.lon).label('max_lon'),
sa.func.max(boros.c.lat).label('max_lat')
]).group_by(boros.c.name).alias()
print(bounds)
data
nyc = compute(data).alias()
joined = nyc.join(
bounds,
onclause=(
(nyc.c.pickup_longitude >= bounds.c.min_lon) &
(nyc.c.pickup_longitude <= bounds.c.max_lon) &
(nyc.c.pickup_latitude >= bounds.c.min_lat) &
(nyc.c.pickup_latitude <= bounds.c.max_lat) &
(nyc.c.dropoff_longitude >= bounds.c.min_lon) &
(nyc.c.dropoff_longitude <= bounds.c.max_lon) &
(nyc.c.dropoff_latitude >= bounds.c.min_lat) &
(nyc.c.dropoff_latitude <= bounds.c.max_lat)
)
)
sel = sa.select([
bounds.c.name,
nyc.c.avg_speed_in_mph,
nyc.c.pickup_latitude,
nyc.c.pickup_longitude,
nyc.c.dropoff_latitude,
nyc.c.dropoff_longitude
]).select_from(joined).alias()
per_boro_speeds = sa.select([
sel.c.name,
sa.func.avg(sel.c.avg_speed_in_mph).label('avg_speed'),
sa.func.count(sel.c.avg_speed_in_mph).label('nsamples')
]).group_by(sel.c.name)
import sqlparse
print(sqlparse.format(str(per_boro_speeds), reindent=True))
speeds = odo(
per_boro_speeds.order_by(sa.desc(per_boro_speeds.c.avg_speed)),
pd.DataFrame,
connect_args=connect_args
)
speeds