# Execute this cell to define the functions for calling the Fluxtream upload API for the # credentials entered below import json, subprocess, urllib, csv # By default, the upload function will send data to the main server at fluxtream.org. # If you want to have this use a different fluxtream server, change it here # and make sure the username and password entered below are valid on that server. global fluxtream_server fluxtream_server = "fluxtream.org" def setup_fluxtream_credentials(): # Call the Fluxtream guest API, documented at # https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-GettheIDfortheguest # Make sure it works and harvest the Guest ID for future use global fluxtream_server, fluxtream_username, fluxtream_password, fluxtream_guest_id # Make sure we have fluxtream credentials set properly if not('fluxtream_server' in globals() and 'fluxtream_username' in globals() and 'fluxtream_password' in globals()): raise Exception("Need to enter Fluxtream credentials before uploading data. See above.") cmd = ['curl', '-v'] cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)] cmd += ['https://%s/api/guest' % fluxtream_server] result_str = subprocess.check_output(cmd) #print ' Result=%s' % (result_str) try: response = json.loads(result_str) if 'id' in response: fluxtream_guest_id = int(response['id']) else: raise Exception('Received unexpected response %s while trying to check credentials for %s on %s' % (response, fluxtream_username, fluxtream_server)) print 'Verified credentials for user %s on %s work. Guest ID=%d' % (fluxtream_username, fluxtream_server, fluxtream_guest_id) except: print "Attempt to check credentials of user %s failed" % (fluxtream_username) print "Server returned response of: %s" % (result_str) print "Check login to https://%s works and re-enter your Fluxtream credentials above" % (fluxtream_server) raise def fluxtream_upload(dev_nickname, channel_names, data): global fluxtream_server, fluxtream_username, fluxtream_password # Make sure we have some data to send if data == None or len(data)<1: print 'Nothing to upload to %s %s' % (dev_nickname, channel_names) return # Make sure we have fluxtream credentials set properly if not('fluxtream_server' in globals() and 'fluxtream_username' in globals() and 'fluxtream_password' in globals()): raise Exception("Need to enter Fluxtream credentials before uploading data. See above.") # Send to BodyTrack upload API, documented at # https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata cmd = ['curl', '-v'] cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)] cmd += ['-d', 'dev_nickname=%s' % dev_nickname] cmd += ['-d', 'channel_names=%s' % json.dumps(channel_names)] cmd += ['-d', 'data=%s' % json.dumps(data)] cmd += ['https://%s/api/bodytrack/upload' % fluxtream_server] print 'Uploading %d data points to %s\'s account on server %s, device %s, channels %s' % (len(data), fluxtream_username, fluxtream_server, dev_nickname, channel_names) # If you're having trouble debugging this function, uncomment the following two print statements # to see the exact curl command and result string #print ' Cmd=%s' % (cmd) result_str = subprocess.check_output(cmd) #print ' Result=%s' % (result_str) try: response = json.loads(result_str) if response['result'] != 'OK': raise Exception('Received non-OK response %s while trying to upload to %s' % (response, dev_nickname)) print 'Upload to %s %s (%d rows, %d to %d) succeeded' % (dev_nickname, channel_names, len(data), data[0][0], data[-1][0]) except: print "Attempt to upload to %s as user %s failed. Check that your credentials are ok" % (fluxtream_server, fluxtream_username) print "Server returned response: %s" % (result_str) raise # To get your own data, pass in the global fluxtream_guest_id which is computed # in setup_fluxtream_credentials() when you execute the Fluxtream login cell. # To get a buddy's data, you first need to figure out what their Guest ID is. # This will show up in the Chrome developer console in tile requests when you # look at their data in the timeline or BodyTrack app. # For example, if the test account is my buddy, I would select # 'View test test's data' from the upper right # hand menu, turn on developer tools, and go to the Fluxtream # timeline tab. In the developer tools' network tab I would # see fetches that look like: # 7.21370.json # /api/bodytrack/tiles/1/BodyMedia.activityType # The value between 'tiles' and the device_name.channel_name is # that account's Guest ID. In that case, I would call # fluxtream_get_sources_list with an arg of 1. def fluxtream_get_sources_list(guest_id): global fluxtream_server, fluxtream_username, fluxtream_password # Make sure we have fluxtream credentials set properly. if not('fluxtream_server' in globals() and 'fluxtream_username' in globals() and 'fluxtream_password' in globals()): raise Exception("Need to enter Fluxtream credentials. See above.") # Send to BodyTrack upload API, documented at # https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata cmd = ['curl', '-v'] cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)] cmd += ['https://%s/api/bodytrack/users/%d/sources/list' % (fluxtream_server, guest_id)] result_str = subprocess.check_output(cmd) #print ' Result=%s' % (result_str) try: response = json.loads(result_str) print 'Read of sources list for guest_id=%d succeeded' % (guest_id) return response except: print "Attempt to upload to %s as user %s failed. Check that your credentials are ok" % (fluxtream_server, fluxtream_username) print "Server returned response: %s" % (result_str) raise def fluxtream_get_device_names(sources_list): device_names = [] for dev in sources_list['sources']: device_names.append(dev['name']) return device_names def fluxtream_get_device_info(device_name, sources_list): for dev in sources_list['sources']: if(dev['name'] == device_name): return dev return None def fluxtream_get_channel_names(device_name, sources_list): dev_info = fluxtream_get_device_info(device_name, sources_list) channel_names = [] for channel in dev_info['channels']: channel_names.append(channel['name']) return channel_names def fluxtream_get_channel_info(device_name, channel_name, sources_list): dev_info = fluxtream_get_device_info(device_name, sources_list) # Check to make sure that we found info for the requested device. # If not, return None if not dev_info: return None for channel_info in dev_info['channels']: if(channel_info['name'] == channel_name): return channel_info return None # Takes a guest_id, an array of . strings, and a time range and returns a CSV reader. # Iterate over the rows using reader.next(), which returns a row array with entries corresponding to # Epoch, [dev_ch_names] # Where Epoch is the epoch timestamp (aka unixtime) for the values in the row, and the i+1'th column of the row # corresponds to the channel in dev_ch_names[i] # See comment on fluxtream_get_sources_list for info about how to choose the value for guest_id def fluxtream_get_csv(guest_id, dev_ch_names, start_time, end_time): global fluxtream_server, fluxtream_username, fluxtream_password # Make sure we have fluxtream credentials set properly. if not('fluxtream_server' in globals() and 'fluxtream_username' in globals() and 'fluxtream_password' in globals()): raise Exception("Need to enter Fluxtream credentials. See above.") # Send to BodyTrack upload API, documented at # https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata cmd = ['curl', '-v'] cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)] # Need to convert the dev_ch_names array into json and URL encode it to create the channels arg # TODO: how do we confirm that dev_ch_names is in fact an array? ch_spec_str = json.dumps(dev_ch_names) ch_spec_str = urllib.quote(ch_spec_str) cmd += ['https://%s/api/bodytrack/exportCSV/%d/fluxtream-export-from-%d-to-%d.csv?channels=%s' % (fluxtream_server, guest_id, int(start_time), int(end_time), ch_spec_str)] #print ' cmd=%s' % (cmd) result_str = subprocess.check_output(cmd) #print ' Result=%s' % (result_str) # If the API call worked, result_str should be a CSV file # with the first line a header consisting of EpochTime, [dev_ch_names] # TODO: how do we check if it did work? # Create a CSV reader that iterates over the lines of the response csv_reader = csv.reader(result_str.splitlines(), delimiter=',') header = csv_reader.next() # Do some checks to make sure we got something reasonable if len(header) != len(dev_ch_names)+1: raise Exception("Expected header for CSV export of %s to contain %d columns, but only found %d. Please double check that dev_ch_names are all valid" % (dev_ch_names, len(dev_ch_names)+1, len(header))) # Check the columns are what we expect for i in range(0,len(dev_ch_names)): if(dev_ch_names[i] != header[i+1]): raise Exception("Expected column %d of CSV header to be %s, but found %s instead. Please double check that dev_ch_names are all valid" % (i+1, dev_ch_names[i], header[i+1])) # At this point, we can be confident that the columns map to Epoch, [dev_ch_names] as expected. # Return the csv reader. Iterate over the rows using reader.next() return csv_reader # Execute and fill in the fields below to set your Fluxtream credentials. from IPython.html import widgets # Widget definitions from IPython.display import display # Used to display widgets in the notebook def set_fluxtream_password(this): global fluxtream_username, fluxtream_password fluxtream_username = fluxtream_username_widget.value fluxtream_password = fluxtream_password_widget.value fluxtream_password_widget.value = '' setup_fluxtream_credentials() print "To make persistent for future restarts, insert a cell, paste in:" print "" print "global fluxtream_username, fluxtream_password" print "fluxtream_username = \"%s\"" % (fluxtream_username) print "fluxtream_password = \"xxx\"" print "setup_fluxtream_credentials()" print "" print "replace xxx with your password, and execute that cell instead." print "Only do this if you're keeping this copy of your iPython notebook private," print "and remove that cell before sharing" display(widgets.HTMLWidget(value='Fluxtream Username')) fluxtream_username_widget = widgets.TextWidget() display(fluxtream_username_widget) display(widgets.HTMLWidget(value='Fluxtream Password')) fluxtream_password_widget = widgets.TextWidget() display(fluxtream_password_widget) set_fluxtream_login_button = widgets.ButtonWidget(description='Set Fluxtream credentials') set_fluxtream_login_button.on_click(set_fluxtream_password) display(set_fluxtream_login_button) # Enter Fluxtream username and password and click "Set Fluxtream credentials" button. # Password field will blank afterwards, but variables will be set # Execute to list the devices and channel names available in this Fluxtream account # Note that you will need to re-execute this cell after new data is uploaded # if you want to use up-to-date time bounds for a given channel import pprint global fluxtream_guest_id, guest_id, sources_list, dev_name_list # Default to using the data of the currently logged in user. # To use data for someone else, modify the line below to set # guest_id to some other value. See comments above # fluxtream_get_sources_list definition above for details. guest_id = fluxtream_guest_id # Get the info about all the sources that this user has # in their account sources_list = fluxtream_get_sources_list(guest_id) # Uncomment this if you want to see more details about the # structure of sources_list #pprint.pprint(sources_list) # Get the list of devices and channel names for this guest dev_name_list = fluxtream_get_device_names(sources_list) for dev_name in dev_name_list: channel_names = fluxtream_get_channel_names(dev_name, sources_list) print "Device '%s', %d channels:" % (dev_name, len(channel_names)) print " %s" % (channel_names) # Modify the values below for setting up which source # channels you want to process, and where to put the resulting computed values. # The naming scheme is . to specify a given channel of a given device. # Change the keys of channel_proc_map to the channel names you want to use for input. # Change the values in channel_proc_map to the channel name you want to use for output # of the values computed for a given input channel. # Execute to setup module_info_map based on those settings. # The output of the cell above shows what the station and modules names are for the # Netatmo account you've bound the access_token to. global guest_id, sources_list, dev_name_list, channel_proc_map, channel_info_map channel_proc_map = {'Nonin3150.SpO2': 'Nonin3150.SpO2_thresh_93', 'Nonin3150.SpO2': 'Nonin3150.SpO2_thresh_90'} # Modify this cell to change the function computed for a given source channel. # This one computes how much an SpO2 reading drops below a threshold. # The input args are the epoch timestamp, the source value, the source channel name, # and the destination channel name. In this case, we don't care about the timestamp # and parse the threshold to use from the channel name. # Note that the type of source_value may be a string, since some sources of input, # such as CSV resaders, are unable to distinguish the type of a a value and always return # strings. If you need to treat the value as a numeric type, you'll # need to convert it yourself def compute_channel_fn(timestamp, source_value, source_ch_name, dest_ch_name): thresh = 93.0 dest_ch_elts = dest_ch_name.split('.')[1].split('_') if len(dest_ch_elts) == 3 and dest_ch_elts[0] == "SpO2" and dest_ch_elts[1] == "thresh": thresh = float(dest_ch_elts[2]) #print "Thresh = %d (dest channel = %s)" % (thresh, dest_ch_name) source_f = float(source_value) if(source_f>thresh): return None else: dest_f = thresh-source_f #print "Source = %f, dest = %f" % (source_f, dest_f) return dest_f # Modify this cell to choose whether to just run compute_channel_fn for # new data (timestamps from the source_ch that are > the max timestamp # in dest_ch) (recompute_all=False), or force a recompute everything # (recompute_all=True). # If compute_channel_fn is the same as it's been and you just want to run it # on new data that's come in, do the former (recompute_all=False). # If you've changed compute_channel_fn and want to run it on everything, # do the latter (recompute_all=True) recompute_all=False #recompute_all=True # Execute this cell to read in the selected range of data from all the # source channels based on recompute_all, run compute_channel_fn on each # data point, and upload the result to the dest channel global guest_id, sources_list, dev_name_list, channel_proc_map, channel_info_map, csv_reader_map # Update sources_list in case there's new data available on any # of the relevant channels (current min_time and max_time for each channel are # part of what gets returned by the Fluxtream get sources API call) sources_list = fluxtream_get_sources_list(guest_id) # Parse populate channel_info_map with keys for each source/dest channel name # and values with the info on that channel as provided by sources_list channel_info_map = {} for dev_ch_name in channel_proc_map.keys() + channel_proc_map.values(): # Split the device and module names dev_ch_name_elts = dev_ch_name.split('.') # Get the channel_info object for this channel_info = fluxtream_get_channel_info(dev_ch_name_elts[0], dev_ch_name_elts[1], sources_list) if channel_info == None: # This is ok for a destination channel, but not a source channel if dev_ch_name in channel_proc_map.keys(): raise Exception("Can't find channel info for source channel %s; recheck device and channel names list" % dev_ch_name) else: print "Can't find channel info for dest channel %s; will create it" % dev_ch_name # Store the channel info in channel_info_map channel_info_map[dev_ch_name]=channel_info print "" print "Successfully setup channel_info_map" print "" # Next, iterate over the source channels. For each, compute time ranges to process. # If the dest channel doesn't exist, use the full time range of the source channel. # If the dest channel does exist, by default just compute the data points that are # later than the last timestamp in the dest channel. # Read the CSV files for all the source channels. # The keys of csv_reader_map are the source channels. # The values are csv_reader objects. Each call to # csv_reader.next() will return a row consisting of # t, source_ch[t] # Where t is the epoch timestamp (aka unixtime) for the sensor reading in that row, # and source_ch[t] is the value of the source sensor channel at time t csv_reader_map = {} for dev_ch_name in channel_proc_map.keys(): source_ch = dev_ch_name source_info = channel_info_map[source_ch] dest_ch = channel_proc_map[dev_ch_name] dest_info = channel_info_map[dest_ch] # End time is always the max_time for the source channel end_time = source_info['max_time'] # Start time is min_time for the source channel if the # the dest_ch doesn't exist yet or if recompute_all is set. # Otherwise it is max_time of the dest channel so only new # values are computed start_time = source_info['min_time'] if recompute_all==False and dest_info != None: start_time = dest_info['max_time'] print "Processing only new data in %s that isn't in %s: %f to %f" %(source_ch, dest_ch, start_time, end_time) else: print "Processing all data for %s: %f to %f" % (source_ch, start_time, end_time) if end_time <= start_time: print " No time range to process for guest %d, channel %s" % (guest_id, dev_ch_name) else: # Get a CSV for the source channel for the desired time range csv_reader_map[source_ch] = fluxtream_get_csv(guest_id, [source_ch], start_time, end_time) print "Successfully read data for guest %d, channel %s: %f to %f" % (guest_id, source_ch, start_time, end_time) print "" print "Done: Read CSV data for %d source channels: %s" % (len(csv_reader_map.keys()), csv_reader_map.keys()) print "" # Now process the data from all the source channels and upload # the results to the corresponding destination channels. # The csv_reader objects the previous cell created are used in this cell to # compute and upload data. # Note that this you need to execute the previous cell each time before executing this one # because the process of iterating over a given csv_reader object # In the loop below consumes the entries and they're not available to a subsequent run of the # loop # Define a function for doing a partial upload of data to a given dest_ch def partial_upload(dest_ch, upload_data): if len(upload_data)>0: # For upload, we need to split the device and channel name # pieces of dest_ch apart, and put the channel name part in # an array dest_dev_nickname, dest_ch_name = dest_ch.split('.') # print "Uploading %d data points to dest %s" % (len(upload_data), dest_ch) fluxtream_upload(dest_dev_nickname, [dest_ch_name], upload_data) else: # No data print "No data to upload for dest %s" % (dest_ch) # For each csv_reader returned, call # compute_channel_fn(timestamp, source_value, source_ch_name, dest_ch_name) # create an output data array for upload, and upload it to the Fluxtream server # and account set up in the credentials entry section above for source_ch in csv_reader_map.keys(): # Get the name of the output channel from channel_proc_map dest_ch = channel_proc_map[source_ch] print "Processing %s -> %s" % (source_ch, dest_ch) # Retrieve the csv_reader object for this source channel from csv_reader_map, # which was set up in the previous loop csv_reader = csv_reader_map[source_ch] # Iterate over the lines in the CSV file for the source channel. # Call compute_channel_fn on each, and add each line that returns # non-null to upload_data for the given timestamp # We may need to do this in multiple batches if there are too many rows for # a reasonable upload. # Setup the upload data array upload_data=[] for row in csv_reader: # Make sure the row has two entries: timestamp and source value # and read them into local variables assert(len(row)==2) timestamp = float(row[0]) source_value = row[1] comp_val = compute_channel_fn(timestamp, source_value, source_ch, dest_ch) if comp_val != None: #print "%d (%f): %s -> %d" % (csv_reader.line_num, timestamp, source_value, comp_val) upload_data.append([timestamp,comp_val]) # Check if upload_data is big enough we should upload now, # and if so clear upload_data afterwards if(len(upload_data)>=1000): partial_upload(dest_ch, upload_data) upload_data = [] # Upload any remaining rows in upload_data partial_upload(dest_ch, upload_data) print "" print "Done: Uploaded computed data for %d source channels: %s" % (len(csv_reader_map.keys()), csv_reader_map.keys())