Collect data from https://archive.analytics.mybinder.org and aggregate into fewer, more efficient .feather files, for easier loading and plotting of ALL THE DATA.
Rather than loading all the original event data, the ref
column is dropped,
counting builds per spec (not quite the same as repo, since spec includes the unresolved ref)
on a given time interval (hourly or daily).
This reduces the storage size by ~120x without losing any data I'm interested in.
It's not that big, but file load times are long with the original JSON, and it all fits in a pretty small (~100MB) feather file.
which | files | total size |
---|---|---|
original .jsonl |
1199 | 6.3GB |
daily files, counts by hour .feather |
1199 | 147MB |
monthly files, counts by day .feather |
40 | 56MB |
import datetime
import pandas as pd
import altair as alt
from pathlib import Path
from functools import lru_cache
from urllib.request import urlretrieve
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm
category_columns = ["schema", "version", "provider", "status", "origin"]
def categoricalize(df):
"""Ensure categorical columns are categorical
For more efficient storage, processing
"""
dtypes = {}
for col in category_columns:
if col in df.columns:
dtypes[col] = "category"
if dtypes:
return df.astype(dtypes)
else:
return df
def uncategoricalize(df):
"""revert categories
groupby is _super_ slow on categoricals
https://github.com/pandas-dev/pandas/issues/32976
"""
dtypes = {}
for col, dtype in df.dtypes.items():
if isinstance(dtype, pd.CategoricalDtype):
dtypes[col] = dtype.categories.dtype
if dtypes:
return df.astype(dtypes)
else:
return df
@lru_cache(maxsize=60)
def _read_df(path):
df = pd.read_json(path, lines=True)
return categoricalize(df)
@lru_cache(maxsize=10)
def _read_dfs(*paths):
dfs = [_read_df(path) for path in paths]
if len(dfs) == 1:
return dfs[0]
else:
# concatenate, preserve categoricals with new values
return (
categoricalize(pd.concat(dfs, ignore_index=True))
.sort_values(["timestamp"])
.reset_index(drop=True)
)
Quickest way to get the whole archive (~6.5G):
gsutil -m rsync -r gs://binder-events-archive/ ./events/
events_dir = Path("events")
agg_dir = Path("aggregated")
agg_dir.mkdir(exist_ok=True)
daily_dir = agg_dir.joinpath("daily")
daily_dir.mkdir(exist_ok=True)
weekly_dir = agg_dir.joinpath("weekly")
weekly_dir.mkdir(exist_ok=True)
monthly_dir = agg_dir.joinpath("monthly")
monthly_dir.mkdir(exist_ok=True)
for parent in (daily_dir, weekly_dir, monthly_dir):
parent.joinpath("by-hour").mkdir(exist_ok=True)
parent.joinpath("by-day").mkdir(exist_ok=True)
# daily-by-hour
# weekly-by-day
jsonl_fmt = f"{events_dir}/events-%Y-%m-%d.jsonl"
daily_fmt = f"{daily_dir}/daily-%Y-%m-%d.feather"
weekly_fmt = f"{weekly_dir}/weekly-%Y-%m-%d"
def rounded_count(df, freq="H"):
# copy to avoid leaving our new column in the df
df = df.copy()
# add counting column
df["n"] = 1
df["timestamp"] = df.timestamp.dt.round(freq)
# exclude ref from aggregations
groupby = list(set(df.columns).difference({"ref", "n"}))
# uncategoricalize because groupby is crazy slow with categoricals
# must faster to copy the whole df multiple times!
return uncategoricalize(df).groupby(groupby).n.count().reset_index()
def _agg_and_save(src_list, window, date_str, debug=False):
for src in src_list:
if not os.path.exists(src):
print(f"Missing file to aggregate by {window} for {date_str}: {src}")
return
dest_fmt = str(agg_dir.joinpath("{window}/{agg}/{window}-{agg}-{date_str}.feather"))
dest_hourly = dest_fmt.format(window=window, agg="by-hour", date_str=date_str)
dest_daily = dest_fmt.format(window=window, agg="by-day", date_str=date_str)
if os.path.exists(dest_hourly) and os.path.exists(dest_daily):
if debug:
print(f"already have {dest_hourly} and {dest_daily}")
return
df = _read_dfs(*src_list)
if debug:
print(f"Aggregating {len(df)} rows to {dest_hourly} and {dest_hourly}")
h = rounded_count(df, freq="H")
h.to_feather(dest_hourly)
if debug:
print(
f"Wrote {len(h)}/{len(df)} ({len(h) / len(df):.0%}) rows to {dest_hourly}"
)
d = rounded_count(df, freq="D")
d.to_feather(dest_daily)
if debug:
print(f"Wrote {len(d)}/{len(df)} ({len(d) / len(df):.0%}) rows to {dest_daily}")
def aggregate_day(day):
# hourly counts by r
src = day.strftime(jsonl_fmt)
date_str = day.strftime("%Y-%m-%d")
_agg_and_save([src], "daily", date_str)
def aggregate_week(day):
iso_day = day.isocalendar()
week_start = day - datetime.timedelta(days=iso_day.weekday - 1)
date_str = day.strftime("%Y-w%W")
src_list = []
for i in range(7):
day = week_start + datetime.timedelta(days=i)
assert day.isocalendar().week == iso_day.week
src = day.strftime(jsonl_fmt)
src_list.append(src)
_agg_and_save(src_list, "weekly", date_str)
def aggregate_month(day):
src_list = []
month = day.month
day = datetime.date(year=day.year, month=day.month, day=1)
date_str = day.strftime("%Y-%m")
while day.month == month:
src_list.append(day.strftime(jsonl_fmt))
day = day + datetime.timedelta(days=1)
_agg_and_save(src_list, "monthly", date_str)
def aggregate(start_date=datetime.date(2019, 1, 1), end_date=datetime.date.today()):
day = start_date
total_days = int((end_date - start_date).total_seconds() // (24 * 3600))
days = tqdm(unit="day", desc="days", total=total_days)
weeks = tqdm(unit="week", desc="weeks", total = total_days // 7)
months = tqdm(unit="month", desc="months", total = total_days // 30)
while day < end_date:
aggregate_day(day)
if day.isocalendar().weekday == 7:
aggregate_week(day)
weeks.update(1)
if (day + datetime.timedelta(days=1)).month != day.month:
aggregate_month(day)
months.update(1)
day += datetime.timedelta(days=1)
days.update(1)
days.close()
weeks.close()
months.close()
aggregate()
days: 0%| | 0/1202 [00:00<?, ?day/s]
weeks: 0%| | 0/171 [00:00<?, ?week/s]
months: 0%| | 0/40 [00:00<?, ?month/s]
Missing file to aggregate by daily for 2022-04-13: events/events-2022-04-13.jsonl Missing file to aggregate by daily for 2022-04-14: events/events-2022-04-14.jsonl Missing file to aggregate by daily for 2022-04-15: events/events-2022-04-15.jsonl Missing file to aggregate by daily for 2022-04-16: events/events-2022-04-16.jsonl
import matplotlib.pyplot as plt
import glob
def get_monthly_data(by="day"):
frames = [pd.read_feather(f) for f in monthly_dir.glob(f"by-{by}/*.feather")]
return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))
def get_weekly_data(by="day"):
frames = [pd.read_feather(f) for f in weekly_dir.glob(f"by-{by}/*.feather")]
return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))
df = get_monthly_data()
df.origin.fillna("gke.mybinder.org", inplace=True)
len(df)
1830117
origins = {
'binder.mybinder.turing.ac.uk': "turing.mybinder.org",
"binder.mybinder.ovh": "ovh.mybinder.org",
"notebooks.gesis.org": "gesis.mybinder.org",
"gke2.mybinder.org": "gke.mybinder.org",
"gke1.mybinder.org": "gke.mybinder.org",
}
df["federation"] = df.origin.apply(lambda x: origins.get(x, x)).str.split(".").str[0]
# df.origin.unique()
df2021 = df[df.timestamp.dt.year==2021].copy()
len(df2021)
650306
print(f"Total launches in 2021: {df2021.n.sum():,d}")
Total launches in 2021: 10,295,058
Create derivative 'repo' column, stripping unresolved ref from spec
# start with spec as-is, most providers don't include ref
df2021["repo"] = df2021.spec
strip_spec = df2021.provider.isin({"Git", "GitLab"})
df2021.loc[strip_spec, ["repo"]] = df2021[strip_spec].spec.str.split("/", 1).str[0]
github = df2021.provider.isin({"GitHub", "Gist"})
df2021.loc[github, ["repo"]] = df2021[github].spec.str.split("/", 2).str[:2].str.join("/")
print(f"Total unique repos in 2021: {len(df2021.repo.unique()):,d}")
Total unique repos in 2021: 54,825
len(df2021[["provider", "repo"]].apply(lambda row: f"{row.provider}:{row.repo}", axis=1).unique())
54825
df.provider.value_counts()
GitHub 1687411 Git 87951 GitLab 28324 Gist 24225 Zenodo 1575 Hydroshare 266 Dataverse 202 Figshare 163 Name: provider, dtype: int64
df2021.repo.value_counts().head(10)
ipython/ipython-in-depth 3172 jupyterlab/jupyterlab-demo 3122 fonsp/pluto-on-binder 2860 explosion/spacy-io-binder 2089 binder-examples/r 1944 scikit-learn/scikit-learn 1913 binder-examples/requirements 1859 qiskit-community/qiskit-textbook 1508 scikit-image/scikit-image 1472 apache/spark 1406 Name: repo, dtype: int64
counts = uncategoricalize(df).groupby(["timestamp", "federation"]).n.sum()
counts = counts.unstack() # .fillna(0)
seven_day_counts = counts.rolling("7D").mean()
seven_day_counts.plot()
plt.title("Daily user sessions (7-day average)")
# counts.plot(kind="area", stacked=True)
Text(0.5, 1.0, 'Daily user sessions (7-day average)')
seven_day_counts.plot(kind="area", stacked=True)
plt.title("Daily user sessions (cumulative)")
Text(0.5, 1.0, 'Daily user sessions (cumulative)')