Tasker is a Python package for organizing and processing scientific data. It is a framework for constructing your own organization scheme and processing code. Let's dive into an example:
We're going to do a trivial, multi-step computation, first as a simple script, and then with Tasker.
First, let's set up a directory to hold our computation, and put a token data file in it.
!mkdir -p 'taskerdemo_dir'
%%file taskerdemo_dir/anumber.txt
1.5
Writing taskerdemo_dir/anumber.txt
import time
import numpy as np
import pandas
scale = 10
# Make some random data
raw_random_nums = np.random.normal(size=(100,))
random_nums = pandas.Series(raw_random_nums * scale)
# Read the parameter we stored earlier
shift = float(open('taskerdemo_dir/anumber.txt').read().strip())
# Transform the data
nums = random_nums + shift
# Compute some statistics
summary = {'mean': nums.mean(), 'std': nums.std(), 'count': nums.count()}
std_err = summary['std'] / np.sqrt(summary['count'])
summary['std_err'] = std_err
# Write summary to a text file
open('taskerdemo_dir/oldstats.txt', 'w').write(''.join(
['%s: %.2g\n' % (k, v) for k, v in summary.items()]))
Great! That was easy. But there are a few drawbacks to this method:
Imagine that these computations actually take a long time. If we change something, or have to restart this Python session, we'll need to recompute everything, instead of reusing past results where possible.
This code works fine for "mydir," but when it's time to run it on "mydir2", "mydir3", etc., there is extra work involved in managing it all.
We're sick of always waiting for the computation to finish, so we write code to save the results to disk. When we want to use these results in other notebooks or scripts, we write more code to load them there. When we want to revisit this notebook, we'll change it to load the past results instead of re-computing. And when we someday do need to re-compute, we'll write yet more code to switch this notebook between the two behaviors.
Now, let's look at the same computation, written using some of the features of Tasker. Again, imagine each step in this computation being non-trivial and taking a long time.
import tasker
# Make a Tasker instance for this directory.
# Omit the argument if you just want to use the current directory.
mydir = tasker.Tasker('taskerdemo_dir')
First, let's set some parameters. For convenience only, each Tasker instance provides a conf
attribute.
# You can set and get parameters as attributes...
mydir.conf.scale = 10
print mydir.conf.scale
# Or, equivalently, use "conf" as a dictionary:
print mydir.conf['scale']
10 10
Now, let's define some tasks. Tasker uses Python decorators (the things with the "@
" sign) to turn ordinary functions into tasks within mydir
. The stores
decorator means that the values returned by the function will be stored to files for later reuse.
# Store this result using Python's "pickle" module.
@mydir.stores(tasker.Pickle('random_nums.pickle'))
def raw_random(tsk):
"""Random numbers in normal distribution"""
return np.random.normal(size=(100,))
# Store as an HDF5 file.
@mydir.stores(tasker.Pandas('random.h5'))
# The output of raw_random() is automatically read from random_nums.pickle and
# passed to the function.
def random(tsk, rawnums=mydir.raw_random):
"""Scaled Pandas series of random numbers"""
return pandas.Series(rawnums * mydir.conf.scale)
# This task doesn't store anything, and is always re-computed.
@mydir
# You can name arbitrary files as inputs, and then read them yourself.
# Note that we just give the name of anumber.txt --- it's implicit that
# it will be in mydir.
def random_shift(tsk, nums=mydir.random, shiftfile='anumber.txt'):
"""Random numbers with shifted mean"""
shift = float(open(shiftfile).read().strip())
return nums + shift
# Store as a JSON file (faithfully stores basic Python data types,
# but also human-readable.)
@mydir.stores(tasker.JSON('summary.json'))
def summary(tsk, nums=mydir.random_shift):
"""Descriptive statistics about the random numbers"""
return {'mean': nums.mean(), 'std': nums.std(), 'count': nums.count()}
@mydir
def std_err(tsk, summary=mydir.summary):
return summary['std'] / np.sqrt(summary['count'])
# Multiple outputs are OK too!
# You can name arbitrary files as outputs, and write them yourself.
@mydir.stores(tasker.JSON('stats.json'), 'stats.txt')
def report_everything(tsk, std_err=mydir.std_err, summary=mydir.summary):
"""Summary of everything we computed"""
summary['std_err'] = std_err
open('stats.txt', 'w').write(''.join(
['%s: %.2g\n' % (k, v) for k, v in summary.items()]))
return summary, None # None is placeholder for stats.txt
# Clear results of any previous computations.
# Ordinarily, we would NOT want to do this.
mydir.clear()
# List defined tasks
mydir.menu()
Tasks for /Users/nkeim/python-notebooks/taskerdemo_dir: Done? Name Description -------------------------------------------------------------------------------- raw_random Random numbers in normal distribution random Scaled Pandas series of random numbers random_shift Random numbers with shifted mean summary Descriptive statistics about the random numbers std_err report_everything Summary of everything we computed
These tasks have become attributes of our Tasker instance. To get the value(s) computed by a task, simply call it like a function:
mydir.std_err()
1.0208596469324347
mydir.menu()
Tasks for /Users/nkeim/python-notebooks/taskerdemo_dir: Done? Name Description -------------------------------------------------------------------------------- + raw_random Random numbers in normal distribution + random Scaled Pandas series of random numbers + random_shift Random numbers with shifted mean + summary Descriptive statistics about the random numbers + std_err report_everything Summary of everything we computed
Whoa! The "+
" next to each task means that to compute std_err
, Tasker computed everything it depends on. Those values are available too:
print mydir.summary()
print mydir.summary()['mean']
{u'std': 10.208596469324347, u'count': 100, u'mean': 3.4315143202916225} 3.43151432029
The files we specified earlier now contain the results.
!ls taskerdemo_dir
anumber.txt random.h5 summary.json oldstats.txt random_nums.pickle
Of course, the files contain the same data you get from the Tasker instance:
!cat taskerdemo_dir/summary.json
{ "std": 10.208596469324347, "count": 100, "mean": 3.4315143202916225 }
Recall that report_everything()
writes to both a JSON file (courtesy of Tasker) and to stats.txt
in some non-standard format. When we ask for the value of report_everything()
, we get the contents of the JSON file, but we're just told where the other one is:
mydir.report_everything()
[{u'count': 100, u'mean': 3.4315143202916225, u'std': 10.208596469324347, u'std_err': 1.0208596469324347}, path(u'/Users/nkeim/python-notebooks/taskerdemo_dir/stats.txt')]
Let's change the contents of anumber.txt
, and see what happens.
%%file taskerdemo_dir/anumber.txt
-10
Overwriting taskerdemo_dir/anumber.txt
mydir.menu()
Tasks for /Users/nkeim/python-notebooks/taskerdemo_dir: Done? Name Description -------------------------------------------------------------------------------- + raw_random Random numbers in normal distribution + random Scaled Pandas series of random numbers + random_shift Random numbers with shifted mean summary Descriptive statistics about the random numbers std_err report_everything Summary of everything we computed
Look! The stored results that depended on anumber.txt
— summary
, std_err
, and report_everything
— are now invalid. They'll be re-computed the next time they are needed. (Since random_shift
does not store its results, it was not invalidated.)
Users of the make
tool for compiling C code will be familiar with this behavior, which works by comparing the modification times on the input and output files.
mydir.p
path(u'/Users/nkeim/python-notebooks/taskerdemo_dir')
mydir.p.glob('*.txt')
[path(u'/Users/nkeim/python-notebooks/taskerdemo_dir/anumber.txt'), path(u'/Users/nkeim/python-notebooks/taskerdemo_dir/oldstats.txt'), path(u'/Users/nkeim/python-notebooks/taskerdemo_dir/stats.txt')]
mydir.p / 'testing'
path(u'/Users/nkeim/python-notebooks/taskerdemo_dir/testing')
Tasks are always executed in the directory of the parent Tasker
instance. In the definition of report_everything()
reproduced here, notice how we write open('stats.txt', 'w')
, not open('mydir/stats.txt, 'w')
.
@mydir.stores(tasker.JSON('stats.json'), 'stats.txt')
def report_everything(tsk, std_err=mydir.std_err, summary=mydir.summary):
"""Summary of everything we computed"""
summary['std_err'] = std_err
open('stats.txt', 'w').write(''.join(
['%s: %.2g\n' % (k, v) for k, v in summary.items()]))
return summary, None # None is placeholder for stats.txt
You can borrow this directory-changing feature for your own purposes by using the Tasker
instance as a context:
import os
print os.getcwd()
with mydir:
print os.getcwd()
print os.getcwd()
/Users/nkeim/python-notebooks /Users/nkeim/python-notebooks/taskerdemo_dir /Users/nkeim/python-notebooks
All this can get a little confusing. Just remember that the "p
" attribute of your Tasker
instance is an absolute path (starts with "/
", or "C:\
", or whatever), so it works the same everywhere.
print mydir.p.glob('*.txt')[0]
with mydir:
print mydir.p.glob('*.txt')[0] # Does exactly the same thing.
/Users/nkeim/python-notebooks/taskerdemo_dir/anumber.txt /Users/nkeim/python-notebooks/taskerdemo_dir/anumber.txt
tsk
argument¶The first argument to a task (conventionally called tsk
) is the task object itself. One use for tsk
is that its output_files
attribute is a list of output files. This lets you get all your filenames from the header information at the top of the task.
The report_everything()
task can therefore be rewritten as
@mydir.stores(tasker.JSON('stats.json'), 'stats.txt')
def report_everything(tsk, std_err=mydir.std_err, summary=mydir.summary):
"""Summary of everything we computed"""
summary['std_err'] = std_err
open(tsk.output_files[1], 'w').write(''.join(
['%s: %.2g\n' % (k, v) for k, v in summary.items()]))
return summary, None # None is placeholder for stats.txt
Useful methods of a task (e.g. mydir.summary
) include
clear()
: Delete the task's output files.force()
: Delete the output files and return the re-computed value.is_current()
: True if nothing needs to be re-computed.report()
: Return this task, and any task it depends on, that need to be re-computed.Useful methods of the Tasker instance (e.g. mydir
) include
clear()
: Delete output files of all defined tasks.menu()
: See above.which(filename)
: Find out which task(s) create the output file filename
.unlock()
: Re-enable tasks after a hard crash. (See the section on locking, below.)It would be nice to let multiple notebooks, scripts, etc. use a common set of task definitions. Using any text editor, you can create a taskfile.py
file in your data directory. For example:
%%file taskerdemo_dir/taskfile.py
import tasker
def use(dirname):
mydir = tasker.Tasker(dirname) # Create the Tasker instance
mydir.conf.scale = 10 # Configure it
# Create 2 example tasks (cribbed from above):
# Store this result using Python's "pickle" module.
@mydir.stores(tasker.Pickle('random_nums.pickle'))
def raw_random(tsk):
"""Random numbers in normal distribution"""
return np.random.normal(size=(100,))
# Store as an HDF5 file.
@mydir.stores(tasker.Pandas('random.h5'))
# The output of raw_random() is automatically read from random_nums.pickle and
# passed to the function.
def random(tsk, rawnums=mydir.raw_random):
"""Scaled Pandas series of random numbers"""
return pandas.Series(rawnums * mydir.conf.scale)
return mydir # Very important!
Writing taskerdemo_dir/taskfile.py
Now, whenever you call
tasker.use('mydir')
Tasker looks for taskfile.py
in mydir
, finds the use()
function defined within it, and calls it with the path to the directory:
mydir_fromfile = tasker.use('taskerdemo_dir')
mydir_fromfile.menu()
Tasks for /Users/nkeim/python-notebooks/taskerdemo_dir: Done? Name Description -------------------------------------------------------------------------------- + raw_random Random numbers in normal distribution + random Scaled Pandas series of random numbers
Often you will have a bunch of subdirectories, each containing a single movie or the results of a single experiment, that all need to be processed in a similar way. Rather than putting an identical taskfile.py
in each directory, you can place a single taskfile_sub.py
in the parent directory, and tasker.use()
will find it.
In a situation like this, you may want to also put a taskfile.py
in the parent directory, to help you manage and organize all the subdirectories. Tasker includes a special SetTasker
class to get you started.
Tasker
objects are Python objects¶Lastly, as you're writing and editing your taskfile, remember that you can judiciously add lots of other goodies to your Tasker
instances. For example,
use()
function could parse that name (available as mydir.name
) and give the object a new attribute (e.g. mydir.voltage
).mydir.params = tasker.JSON(mydir.p / 'params.json')
lets you do
mydir.params.save({'voltage': 3.4, 'objective': '10x', 'good': True})
mydir.params() # Read from disk
{u'good': True, u'objective': u'10x', u'voltage': 3.4}
For ultimate flexibility, you can subclass Tasker
as you would any other Python class. Then you can customize __init__()
, or add properties and methods that are not tasks at all. For example, you may already have a function that takes lots of arguments, some of which you'd like to specify, but most of which depend on the experiment you happen to be analyzing. By defining a method, you can make all of the latter parameters implicit, potentially improving the readability of your notebooks, etc.
When you have many similar directories to process (see the bit about subdirectory taskfiles above), Tasker is beautifully suited to processing them in parallel. It's beyond the scope of this tutorial, but actually not that hard — each parallel "job" for a worker consists of something like
import sys, tasker
dirpath = sys.argv[1] # ...or however you tell workers where to work
t = tasker.use(dirpath)
t.the_task_I_want()
The results of the_task_I_want
will be stored on disk and ready for you to use later. Because you're using Tasker, if something goes wrong with your massive parallel computation, only the parts that failed will have to be re-run.
That said, a proper tutorial on this feature is missing. Especially important are Tasker's facilities for monitoring the progress of your jobs, including estimated time to completion.
To preserve a modicum of sanity in this world (especially when parallel computing is involved), no two Tasker instances can ever run the same task in the same directory at the same time. A lock file is created when a task is started, and deleted when it finishes (or aborts). The downside is that a catastrophe like a hard crash or sudden power loss can leave the lock in place. To recover, use the unlock()
method of a Tasker instance.
With the exception of parallel computing, we've basically covered everything. Tasker was designed to be minimal. But of course there are details, and for those, the docstrings, the source code, and the author are all helpful.