In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from os import listdir
from os.path import isfile, join
import re
import datetime
import numpy as np

import importlib
import detect_maneuver # put it in a .py so it can be used by the SOCRATES detection part also
importlib.reload(detect_maneuver);
In [2]:
satcat = pd.read_csv(f'../../playground/satcat_all.csv')

Let's identify the incident from SOCRATES first

In [3]:
path = '../../data/socrates/'
files = [ (match[0],match[1]) for f in listdir(path) if isfile(join(path, f))  if (match:=re.search('^socrates_([0-9]{14})\.csv(\.gz)?$', f))]
files

# Build single dataset
odf = pd.DataFrame()
for file,date in files:
    tmp_df = pd.read_csv(path + file)
    odf = pd.concat([odf,tmp_df])
In [4]:
socrates = odf.copy()
socrates = socrates.sort_values(by="tca_time", ascending=False)
socrates = socrates[socrates.max_prob != 1]
socrates = socrates[socrates.rel_velo_kms > 0.2]
socrates['tca_time'] = socrates['tca_time'].astype('datetime64[ns]')
socrates['extract_date'] = socrates['extract_date'].astype('datetime64[ns]')

# `tca_rounded` is to allow grouping of the same event with slightly different `tca_time`
socrates['tca_rounded'] = socrates['tca_time'].dt.round('30min')

# socrates
In [5]:
path = f'../../../siads591 data/filtered_raw/payload.pkl.gz' # path to the data file
df = pd.read_pickle(path, compression="gzip")

unique_active_satellites = df[df.index > "2020-11-01"].NORAD_CAT_ID.unique()
In [6]:
unique_active_satellites
Out[6]:
array([46818, 40053, 43518, ..., 47434, 47415, 47449], dtype=uint16)

Filtering for active satellies in LEO only resulted in the removal of only 1.35% of Socrates entries! That means most of the Socrates entries are related to active LEO satellites. (232204 -> 229079)

In [7]:
active_socrates = socrates.copy()

# each group represents a conjunction event
active_socrates = active_socrates.groupby(by=["sat1_norad","sat2_norad","tca_rounded"]).agg({'max_prob':'max', 'tca_time':'max', 'extract_date':['min','nunique']})
active_socrates.columns = active_socrates.columns.to_flat_index()
active_socrates = active_socrates.reset_index()
active_socrates.columns = ['sat1','sat2','tca_r','max_prob','tca','extract_date','nunique']

# we are evaluating individual satellite maneuvers, so we break up each pair into separate entries
active_socrates = active_socrates.melt(id_vars=['tca_r', 'max_prob', 'tca', 'extract_date', 'nunique'], value_vars=["sat1","sat2"])
active_socrates = active_socrates.drop(columns=['variable'])
active_socrates = active_socrates.rename(columns={'value':'norad_id'})

# filter only for satellites that could potentially move
active_socrates = active_socrates[(active_socrates.norad_id.isin(unique_active_satellites))]
active_socrates = active_socrates.sort_values(by="max_prob", ascending=False)
# active_socrates

base_socrates_prob = pd.DataFrame({'extract_date':socrates.extract_date.unique()})
base_socrates_prob["max_prob"] = 0
base_socrates_prob = base_socrates_prob.set_index("extract_date")
In [8]:
combined_maneuver_functions = {
    'INCLINATION': [
        ("rolling_10_neightor_diff", lambda x:x.rolling(10, min_periods=1).mean().shift(-9) - x.rolling(10, min_periods=1).mean(), [0.008]),
    ],
    'SEMIMAJOR_AXIS': [
        ("rolling_3_neightor_diff", lambda x:x.rolling(3, min_periods=1).mean().shift(-2) - x.rolling(3, min_periods=1).mean(), [0.025]),
    ],
}
In [9]:
# active_socrates contain all the interesting things that we want to look at
# each row represents an active satellite in an event, sorted by max max_prob
it = active_socrates.iterrows()
In [10]:
# generate_single_inline = True
generate_single_inline = False

# loop them 1 by 1 manually
while True:
    row = next(it)[1]
    if row["max_prob"] < 0.005:
        break
    norad_id = row['norad_id']
    tca_r = row['tca_r']

    soc_data = socrates[((socrates.sat1_norad == norad_id) | (socrates.sat2_norad == norad_id)) & (socrates.tca_rounded == tca_r)].drop_duplicates(subset=["extract_date"]).sort_values(by="extract_date").copy()
#     if len(soc_data) <= 1: # with only a single prediction nothing visible gets plotted
#         continue
    # display(soc_data)
    sat = df[df.NORAD_CAT_ID == norad_id].reset_index().copy()
    sat = sat.drop_duplicates(subset=["EPOCH"])
    sat = sat.set_index("EPOCH")[['SEMIMAJOR_AXIS_x1000','INCLINATION_x10000']]
    sat['INCLINATION'] = sat['INCLINATION_x10000'].astype(np.float64) / 10000
    sat['SEMIMAJOR_AXIS'] = sat['SEMIMAJOR_AXIS_x1000'].astype(np.float64) / 1000
    sat = sat[['INCLINATION','SEMIMAJOR_AXIS']]

    sat_short = sat.loc[(sat.index >= (soc_data.extract_date.min() - pd.Timedelta("14 days"))) & (sat.index <= (soc_data.tca_time.mean() + pd.Timedelta("5 days")))]
    
    probs = base_socrates_prob.copy()
    probs.update(soc_data[['max_prob','extract_date']].set_index("extract_date"))
    probs = probs.loc[((probs.index >= soc_data.extract_date.min()) & (probs.index <= soc_data.tca_time.mean())) | (probs.max_prob >0)]
    
    fixed = detect_maneuver.remove_strange_data(sat_short)
    maneuver_results, combined_results = detect_maneuver.find_maneuvers(fixed, combined_maneuver_functions)
    
    stuff = (fixed, combined_results, f'{satcat.loc[(satcat.NORAD_CAT_ID == norad_id),"SATNAME"].values[0]} ({norad_id}) ')
    
    fig = plt.figure(figsize=(10,6))
    ax1 = fig.add_subplot(2, 1, 1)
    ax2 = fig.add_subplot(2, 1, 2, sharex = ax1)
    
    
    detect_maneuver.plot_combined_maneuvers(fig, ax1, *stuff)
    fig.tight_layout(pad=1.5)
    fig.set_facecolor("white")
    detect_maneuver.plot_extra_lines(ax2,sat_short,probs,soc_data)
    fig.legend()

#     ax3 = fig.add_subplot(3, 1, 3)
#     detect_maneuver.plot_combined_maneuvers(fig, ax3, sat, None, "")
#     plt.setp(ax2.get_xticklabels(), visible=True)
    
    if generate_single_inline:
        break
    fig.savefig(f'../../images/socrates_rmm/{len(combined_results)}_{round(probs.max_prob.max()*100, 3)}%_{satcat.loc[(satcat.NORAD_CAT_ID == norad_id),"SATNAME"].values[0]}_{norad_id}_{soc_data.tca_time.mean().strftime("%Y-%m-%d_%H%M")}.png', facecolor='white', transparent=False)
    plt.close()
print("Done")
Done