#!/usr/bin/env python # coding: utf-8 # [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/fonnesbeck/Bios8366/blob/master/notebooks/Section7_4-Parallel-Processing.ipynb) # # # Parallel Processing in Python # # An obvious way to improve the performance of Python code is to make it run in parallel. Any relatively new computer will have multiple cores, which means that several processors can operate on the same data stored on memory. However, most of the code we have written in the course so far does not take advantage of more than one of them. In addition there is now widespread availability of computing clusters, such as [those offered for use by Amazon](http://aws.amazon.com/ec2/) (Vanderbilt also has [its own cluster](http://www.accre.vanderbilt.edu)). Clusters allow several computers to work together by exchanging data over a network. # # Parallel computing involves breaking a task into several independent sub-tasks, distributing these sub-tasks to available processors or computers, then coordinating the execution of these tasks and combining their outputs in an appropriate way. # # There are several different models for parallel processing, including: # # * **Message passing**: processes or other program components running in parallel communicate by sending and receiving messages, which allows for easy synchronization. # * **Multi-threading**: within a single process, some architectures allow for the existence of several "threads", which execute independently, though they share the memory and state of the process in which they reside. Multi-threading is good for increasing *throughput* and reducing *latency*. # * **Task farming**: a master process delegates independent calculations to available processors (task farm), and collects their outputs when complete. # * **Single program, multiple data (SPMD)** Probably the most common type of parallel processing, in which tasks are split up and run simultaneously on multiple processors with different input in order to obtain results faster. All tasks execute their copy of the same program simultaneously. # * **Multiple program, multiple data (MPMD)** Like SPMD, except each task may be executing a different program. # ## `multiprocessing` # # The simplest way (though probably not the best) for performing parallel computing in Python is via the built-in process-based library for concurrent computing, called `multiprocessing`. # In[ ]: import multiprocessing import os import numpy as np # The `multiprocessing` module parallelizes by launching multiple *processes*, each with a seperate interpretor. You may have already heard about *threads*. Processes and threads are not the same: # # * processes are independent of one another, each having their own state, memory and address spaces # * threads share resources, and are therefore interdependent; they are subunits of the same process # # Since processes are independent, they now have independent Global Interpreter Locks (GILs), its best to run multiprocessing on multiple CPUs. You can check how many you have on your machine: # In[ ]: multiprocessing.cpu_count() # ### `Process` class # # The `Process` class encapsulates a task running in a process. It will usually have a `target` argument that is some callable (function/method) that is executed when the process runs, along with optional arguments that can be passed to the target. # # A `Process` has several methods, with some useful ones being: # # * `is_alive`: Returns `True` if the process is running. # * `join`: Waits for the process to finish its work and terminate. An optional `timeout` argument can be passed. # * `run`: When the process starts, this method is called to invoke the `target`. # * `terminate`: Kills the process forcibly, without any cleanup. # # A `Process` also has several other non-callable attributes, such as `pid`, `name` and `authkey`. # # Here is a trivial example of using the `Process` class, showing that each has its own process ID. # In[ ]: import os def job(n): print('I am working on job {0} running on PID {1}'.format(n, os.getpid())) jobs = [] for i in range(5): p = multiprocessing.Process(target=job, args=(i,)) jobs.append(p) p.start() # In[ ]: jobs # We can easily subclass `Process` to our liking: # In[ ]: class FibProcess(multiprocessing.Process): def __init__(self, n): self.n = n multiprocessing.Process.__init__(self) def run(self): a, b = 0, 1 for i in range(self.n): a, b = b, a + b # In[ ]: p = FibProcess(10000) p.start() print(p.pid) p.join() print(p.exitcode) # ### `Queue` class # # Of course, despite being independent, we would like our processes to communicate with one another, for example, to share data. One way to facilitate this in `multiprocessing` is via the `Queue` class, a thread/process safe, first-in-first-out (FIFO) data structure that can store any serializable Python object. # # A `Queue`'s important methods include: # # * `put`: Adds item to `Queue`. # * `get`: Fetches next item from `Queue`. # * `close`: Closes the `Queue`, preventing the addition of more data. # * `empty`: Returns True if the `Queue` is empty. # * `full`: Returns True if full. # * `qsize`: Retuns approximate current number of items in `Queue`. # # A subclass of `Queue` is the `JoinableQueue`, which has additional methods, notably `join`, which waits until all the items have been processed, blocking the addition of new items. # # In[ ]: from multiprocessing import Queue q = Queue() q.put(-1) q.put('foobar') q.put(5) print(q.get()) print(q.get()) # Here's a toy example of `Queue` usage, where a process is fed items from a function, until it receives a `None` object. # # First, a function to execute a task with items from a `Queue`: # In[ ]: def consumer(q): while True: thing = q.get() if thing is None: break print('Consuming {}'.format(thing)) print("\nFinished consuming") # Complementing this is another function that provisions the `Queue` with items: # In[ ]: def producer(sequence, q): for thing in sequence: q.put(thing) # Initialize the `Queue` and start the `consumer` process: # In[ ]: queue = multiprocessing.Queue() consumer_process = multiprocessing.Process(target=consumer, args=[queue]) consumer_process.start() # Feed the `Queue` and process until finished: # In[ ]: stuff = [42, 'foobar', True, range(5)] producer(stuff, queue) # In[ ]: queue.put(None) consumer_process.join() # Two things to be aware of: # # 1. if you `terminate` a process that is still accessing a queue, the queue may become corrupted # 2. you should make sure that any queue to which a given process has given data is clear before joining the process, or you will get a deadlock condition # # ### `Pool` class # # We often have a task that we want to split up among several worker processes in parallel. The `Pool` class creates a number of processes and the methods for passing work to them. A `Pool` has the following key methods: # # * `apply`: Executes a passed function in a process and returns the result. # * `apply_async`: Same as apply, but the result is returned asynchronously via a *callback* # * `map`: A parallel version of `apply`, which splits an iterable of data into chunks and farms chunks out to processes. # * `map_async`: Asynchronous `map`. # ### Example: parallel bootstrap # # As an example, we will choose a statistical computing task that is [*embarassingly parallel*](http://en.wikipedia.org/wiki/Embarrassingly_parallel). This function generates statistics of bootstrapped samples from a dataset. # In[ ]: def bootstrap(data, nsamples, f): boot_samples = data[np.random.randint(len(data), size=(nsamples, len(data)))] return [f(s) for s in boot_samples] # In[ ]: pool = multiprocessing.Pool(processes=4) # In[ ]: some_data = np.random.poisson(4, 25) # In[ ]: result = pool.apply_async(bootstrap, (some_data, 1000, np.mean)) # The result is an `ApplyResult` object: # In[ ]: result # We may then want to take the result and calculate a confidence interval based on the quantiles. # In[ ]: def bootstrap_ci(boot, alpha=0.05): lower_index = int(np.floor((0.5*alpha)*len(boot))) upper_index = int(np.floor((1.-0.5*alpha)*len(boot))) return boot[lower_index], boot[upper_index] # In[ ]: bootstrap_ci(np.sort(result.get())) # In[ ]: # Clean up pool.close() pool.join() # But, since we used `Pool.apply`, this is not a parallel task. We need to use `map`. # In[ ]: def mapped_bootstrap(n): return bootstrap(some_data, n, np.mean) # In[ ]: pool = multiprocessing.Pool(processes=4) # In[ ]: map_result = pool.map_async(mapped_bootstrap, [250]*4) # In[ ]: map_result # In[ ]: parallel_results = map_result.get() [len(p) for p in parallel_results] # In[ ]: bootstrap_ci(np.sort(np.ravel(parallel_results))) # In[ ]: pool.close() pool.join() # The multiprocessing package is very useful for highly parallel tasks that do not need to communicate with each other, other than when sending the initial data to the pool of processes and when and collecting the results. # ## Jupyter parallel # # The IPython architecture consists of four components, which reside in the `ipyparallel` package: # # 1. **Engine** The IPython engine is a Python instance that accepts Python commands over a network connection. When multiple engines are started, parallel and distributed computing becomes possible. An important property of an IPython engine is that it blocks while user code is being executed. # # 2. **Hub** The hub keeps track of engine connections, schedulers, clients, as well as persist all task requests and results in a database for later use. # # 3. **Schedulers** All actions that can be performed on the engine go through a Scheduler. While the engines themselves block when user code is run, the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines. # # 4. **Client** The primary object for connecting to a cluster. # # ![IPython architecture](images/ipython_architecture.png) # (courtesy Min Ragan-Kelley) # # This architecture is implemented using the ØMQ messaging library and the associated Python bindings in `pyzmq`. # # ## Start your engines! # # In order to use IPython for parallel computing, you will need to start the IPython # controller and two or more IPython engines. The simplest way of doing this is # with the "clusters" tab in Jupyter Notebooks, the IPython Parallel tab in Jupyter Lab, or you can use the `ipcluster` command in a terminal: # # $ ipcluster start --n=4 # # This command will start 4 IPython engines on the current host, which is appropriate for many desktop multicore systems. You can also setup IPython clusters that span many nodes in a computing cluster, but this is beyond the scope of this lecture, but you can get more information from # [the IPython.parallel docs](http://ipython.org/ipython-doc/dev/parallel/parallel_process.html).. # # To use the IPython cluster in our Python programs or notebooks, we start by creating an instance of `ipyparallel.Client`: # In[ ]: from ipyparallel import Client # In[ ]: cli = Client() # This creates a client using the default profile; you can pass an optional `profile="my_profile"` argument if you have a different one running. # # Using the `ids` attribute we can retreive a list of ids for the IPython engines in the cluster: # In[ ]: cli.ids # We can use a `DirectView` object for execution of tasks, which an be accessed simply by indexing the client: # In[ ]: dv0 = cli[0] dv0 # The above shows just a single engine, but we want all of them: # In[ ]: dview = cli[:] dview # We can get a view on whatever combination of engines we want: # In[ ]: cli[::2] # In[ ]: cli[1::2] # The `block` flag specifies whether to wait for the result, or return an `AsyncResult` object immediately: # In[ ]: dview.block = True # Finally, since we want to use IPython's parallel magic commands, we set the `DirectView` to be `active`: # In[ ]: dview.activate() # Each of these engines are ready to execute tasks. We can selectively run code on individual engines. For example, we can simply use `os.getpid` to return the process ID that the engine is running on. Here is the notebook process: # In[ ]: import os os.getpid() # Here is a single engine's process ID: # In[ ]: dv0.apply_sync(os.getpid) # And here are all the engines, run simultaneously: # In[ ]: dview.apply_sync(os.getpid) # Let's now consider a useful function that we might want to run in parallel. Here is a version of the approximate Bayesian computing (ABC) algorithm that we have seen in previous lectures. # In[ ]: import numpy def abc(y, N, epsilon=[0.2, 0.8]): trace = [] while len(trace) < N: # Simulate from priors mu = numpy.random.normal(0, 10) sigma = numpy.random.uniform(0, 20) x = numpy.random.normal(mu, sigma, 50) #if (np.linalg.norm(y - x) < epsilon): if ((abs(x.mean() - y.mean()) < epsilon[0]) & (abs(x.std() - y.std()) < epsilon[1])): trace.append([mu, sigma]) return trace # In[ ]: import numpy as np y = np.random.normal(4, 2, 50) # Let's try running this on one of the cluster engines: # In[ ]: dv0.block = True dv0.apply(abc, y, 10) # This fails with a `NameError` because NumPy has not been imported on the engine to which we sent the task. Each engine has its own namespace, so we need to import whatever modules we will need prior to running our code: # In[ ]: cli[0].execute("import numpy") # In[ ]: dv0.apply(abc, y, 10) # A more efficient way is to simultaneously import modules into the local and the engine namespaces simultaneously, using a context manager: # In[ ]: with dview.sync_imports(): import numpy # Easier yet, you can use the parallel cell magic to import everywhere: # In[ ]: get_ipython().run_cell_magic('px', '', 'import numpy\n') # You can also use the `require` decorator for functions that you wish to use on engines. # In[ ]: from ipyparallel import require @require("numpy") def abc(y, N, epsilon=[0.2, 0.8]): trace = [] while len(trace) < N: # Simulate from priors mu = numpy.random.normal(0, 10) sigma = numpy.random.uniform(0, 20) x = numpy.random.normal(mu, sigma, 50) #if (np.linalg.norm(y - x) < epsilon): if ((abs(x.mean() - y.mean()) < epsilon[0]) & (abs(x.std() - y.std()) < epsilon[1])): trace.append([mu, sigma]) return trace # A simple way to run code on an engine is via the `execute` method: # In[ ]: dv0.execute('x=3') # In[ ]: dv0['x'] # ### Data transfer # # We will often want to send data to our engines, or retrieve objects from them. `DirectView` has `push` and `pull` methods for achieving this. # # Recall that Python namespaces are just dictionaries. So, we can update an engine's namespace by pushing a dictionary: # In[ ]: dv0.push({'foo': -3, 'bar': np.arange(10)}) # In[ ]: dv0.pull(('x', 'bar')) # Additionally, `DirectView` objects also have `scatter` and `gather` methods, to distribute data among engines. `scatter` accepts any container or Numpy `array` type, while `gather` assembles the respective return objects in the Client. # In[ ]: # Some Gaussian data y # In[ ]: dview = cli[:2] # In[ ]: # Send to engines dview.scatter('y', y) dview['y'] # In[ ]: # Remote execution of function dview.execute('sum_y = sum(y)') # In[ ]: # Aggregation on client sum(dview.gather('sum_y')) # The `map` method essentially combines `scatter` and `gather` into a single call: # In[ ]: result = dview.map(lambda x: sum(x**2), np.split(y, 5)) sum(result) # ### Load balancing # # The `DirectView` objects we have used so far strictly allocate particular tasks to particular engines. This is often inefficient, when tasks take variable amounts of time, leaving some engines idle while some are overworked. We can use a **load balanced** view to distribute memory approximately equally among engines, to minimize idle time. # In[ ]: lb_view = cli.load_balanced_view() lb_view # A `LoadBalancedView`, though it works with all the engines (or specified subsets of engines), behaves as if it is working with a single engine. # # If you do not specify the engines when the `LoadBalancedView` is created, it will use all the engines that are available when it assigns tasks. # In[ ]: for i in range(10): pid = lb_view.apply_sync(os.getpid) print('Task {0} ran on process {1}'.format(i, pid)) # In[ ]: get_ipython().run_cell_magic('px', '', 'import numpy as np\n\ndef abc(y, N, epsilon=[0.2, 0.8]):\n\n trace = []\n\n while len(trace) < N:\n\n # Simulate from priors\n mu = numpy.random.normal(0, 10)\n sigma = numpy.random.uniform(0, 20)\n\n x = numpy.random.normal(mu, sigma, 50)\n\n #if (np.linalg.norm(y - x) < epsilon):\n if ((abs(x.mean() - y.mean()) < epsilon[0]) &\n (abs(x.std() - y.std()) < epsilon[1])):\n trace.append([mu, sigma])\n\n return trace\n') # In[ ]: tasks = lb_view.map_async(lambda y, n: abc(y, n), y, [20]*5) # In[ ]: tasks.msg_ids # In[ ]: result = np.concatenate(tasks.get()) result[:10] # Another way that you can dispatch tasks to engines is via the `parallel` decorator. This decorator is a method of the `DirectView` class that controls our engine pool. The decorated function is then disparched to the engines using the `map` method that the decorator adds to the class. # In[ ]: @lb_view.parallel(block=True) def abc(y, N, epsilon=[0.2, 0.8]): trace = [] while len(trace) < N: # Simulate from priors mu = numpy.random.normal(0, 10) sigma = numpy.random.uniform(0, 20) x = numpy.random.normal(mu, sigma, 50) #if (np.linalg.norm(y - x) < epsilon): if ((abs(x.mean() - y.mean()) < epsilon[0]) & (abs(x.std() - y.std()) < epsilon[1])): trace.append([mu, sigma]) return trace # In[ ]: abc.map([y]*4, [25]*4) # ### Parallel magics # # The `%px` cell magic is a "parallel execution" statement, which will run the code in that cell on all the engines. # In[ ]: get_ipython().run_cell_magic('px', '', 'import os\nprint(os.getpid())\n') # In[ ]: get_ipython().run_line_magic('px', 'b = numpy.random.random()') # In[ ]: get_ipython().run_line_magic('px', 'b') # `%pxresult` displays the output of the last request: # In[ ]: get_ipython().run_line_magic('pxresult', '') # The `%pxconfig` magic allows you to configure blocking for the parallel magics. # In[ ]: # Switch to asynchronous get_ipython().run_line_magic('pxconfig', '--block') # Remember that each engine is just another IPython, so anyting you can do in an IPython session you can also do on an engine. # In[ ]: get_ipython().run_line_magic('px', '%matplotlib inline') # In[ ]: get_ipython().run_line_magic('px', 'y = np.random.normal(4, 2, 50)') # In[ ]: get_ipython().run_line_magic('px', 'samples = abc(y, 100)') # In[ ]: get_ipython().run_cell_magic('px', '', "import matplotlib.pyplot as plt\nimport os\ntsamples = numpy.transpose(samples)\nplt.hist(tsamples[0])\nplt.hist(tsamples[1])\n_ = plt.title('PID %i' % os.getpid())\n") # For profiling, we can also use the `%timeit` magic to compare performance on the engines: # In[ ]: get_ipython().run_cell_magic('px', '', '%%timeit\ns = abc(y, 10)\n') # ## Dask # # # # # Dask is a parallel computing library that scales the existing Python ecosystem. This tutorial will introduce Dask and parallel data analysis more generally. # # Dask can scale down to your laptop and up to a cluster. Here, we'll use an environment you setup on your laptop to analyze medium sized datasets in parallel locally. # Dask provides multi-core and distributed parallel execution on larger-than-memory datasets. # # We can think of Dask at a high and a low level # # * **High level collections:** Dask provides high-level Array, Bag, and DataFrame # collections that mimic NumPy, lists, and Pandas but can operate in parallel on # datasets that don't fit into memory. Dask's high-level collections are # alternatives to NumPy and Pandas for large datasets. # * **Low Level schedulers:** Dask provides dynamic task schedulers that # execute task graphs in parallel. These execution engines power the # high-level collections mentioned above but can also power custom, # user-defined workloads. These schedulers are low-latency (around 1ms) and # work hard to run computations in a small memory footprint. Dask's # schedulers are an alternative to direct use of `threading` or # `multiprocessing` libraries in complex cases or other task scheduling # systems like `Luigi` or `IPython parallel`. # # Different users operate at different levels but it is useful to understand # both. # In[ ]: get_ipython().system('conda install -y scikit-image holidays dask') # A simple way to parallelize for-loop style code with Dask is with `dask.delayed`. Often, this is the only function that you will need to convert functions for use with Dask. # # We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. # In[ ]: from dask.distributed import Client client = Client(n_workers=4) # First let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally. # In[ ]: from time import sleep def inc(x): sleep(1) return x + 1 def add(x, y): sleep(1) return x + y # We time the execution of this normal code using the `%%time` magic # In[ ]: get_ipython().run_cell_magic('time', '', '\nx = inc(1)\ny = inc(2)\nz = add(x, y)\n') # Those two increment calls *could* be called in parallel, because they are totally independent of one-another. # # We'll transform the `inc` and `add` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly. # Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it. # In[ ]: from dask import delayed # The following runs immediately because all it is doing is building a graph. # In[ ]: get_ipython().run_cell_magic('time', '', '\nx = delayed(inc)(1)\ny = delayed(inc)(2)\nz = delayed(add)(x, y)\n') # To get the result, call `compute`. Notice that this runs faster than the original code. # In[ ]: get_ipython().run_line_magic('time', 'z.compute()') # The `z` object is a lazy `Delayed` object. This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another. We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`. # In[ ]: z # In[ ]: z.visualize() # `for` loops are one of the most common things that we want to parallelize. We can use `dask.delayed` on `inc` and `sum` to parallelize the computation below: # In[ ]: data = [1, 2, 3, 4, 5, 6, 7, 8] # In[ ]: results = [] for x in data: y = delayed(inc)(x) results.append(y) total = delayed(sum)(results) print("Before computing:", total) # Let's see what type of thing total is result = total.compute() print("After computing :", result) # After it's computed # Often we want to delay only *some* functions, running a few of them immediately. This is especially helpful when those functions are fast and help us to determine what other slower functions we should call. This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`. # # In the example below we iterate through a list of inputs. If that input is even then we want to call `inc`. If the input is odd then we want to call `double`. This `is_even` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed. # In[ ]: def double(x): sleep(1) return 2 * x def is_even(x): return not x % 2 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # In[ ]: results = [] for x in data: if is_even(x): # even y = delayed(double)(x) else: # odd y = delayed(inc)(x) results.append(y) total = delayed(sum)(results) # In[ ]: get_ipython().run_line_magic('time', 'total.compute()') # In[ ]: total.visualize() # ### Dask DataFrames # # Pandas is great for tabular datasets that fit in memory. Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM. The demo dataset we're working with is only about 200MB, so that you can download it in a reasonable time, but `dask.dataframe` will scale to datasets much larger than memory. # # The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame` API. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints. # This downloads and extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is originally from [here](http://stat-computing.org/dataexpo/2009/the-data.html). # In[ ]: get_ipython().run_line_magic('run', 'prep.py -d accounts') get_ipython().run_line_magic('run', 'prep.py -d flights') # In[ ]: import os import dask filename = os.path.join('..','data', 'accounts.*.csv') filename # Filename includes a glob pattern `*`, so all files in the path matching that pattern will be read into the same Dask DataFrame. # In[ ]: import dask.dataframe as dd df = dd.read_csv(filename) df.head() # In[ ]: len(df) # A few things happened here: Dask investigated the input path and found that there are three matching files. Then, a set of jobs was intelligently created for each chunk - one per original CSV file in this case. Each file was loaded into a pandas dataframe, had `len()` applied to it, and the subtotals were combined to give you the final grand total. # Lets try this with an extract of flights in the USA across several years. This data is specific to flights out of the three airports in the New York City area. # In[ ]: get_ipython().system('tar -xvf ../data/nycflights.tar.gz') df = dd.read_csv(os.path.join('nycflights', '*.csv'), parse_dates={'Date': [0, 1, 2]}) # Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes. # In[ ]: df # We can view a chunk of the data: # In[ ]: df.head() # In[ ]: df.tail() # Fails! # Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes, `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions. # # In this case, the datatypes inferred in the sample are incorrect. The first `n` rows have no value for `CRSElapsedTime` (which pandas infers as a `float`), and later on turn out to be strings (`object` dtype). Note that Dask gives an informative error message about the mismatch. When this happens you have a few options: # # - Specify dtypes directly using the `dtype` keyword. This is the recommended solution, as it's the least error prone (better to be explicit than implicit) and also the most performant. # - Increase the size of the `sample` keyword (in bytes) # - Use `assume_missing` to make `dask` assume that columns inferred to be `int` (which don't allow missing values) are actually floats (which do allow missing values). In our particular case this doesn't apply. # # In our case we'll use the first option and directly specify the `dtypes` of the offending columns. # In[ ]: df = dd.read_csv(os.path.join('nycflights', '*.csv'), parse_dates={'Date': [0, 1, 2]}, dtype={'TailNum': str, 'CRSElapsedTime': float, 'Cancelled': bool}) # In[ ]: df.tail() # In[ ]: holidays = dd.read_parquet(os.path.join('..', 'data', "holidays")) # In[ ]: holidays.head() # We compute the maximum of the `DepDelay` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums # # ```python # maxes = [] # for fn in filenames: # df = pd.read_csv(fn) # maxes.append(df.DepDelay.max()) # # final_max = max(maxes) # ``` # # We could wrap that `pd.read_csv` with `dask.delayed` so that it runs in parallel. Regardless, we're still having to think about loops, intermediate results (one per file) and the final reduction (`max` of the intermediate maxes). This is just noise around the real task, which pandas solves with # # ```python # df = pd.read_csv(filename, dtype=dtype) # df.DepDelay.max() # ``` # # `dask.dataframe` lets us write pandas-like code, that operates on larger than memory datasets in parallel. # In[ ]: get_ipython().run_line_magic('time', 'df.DepDelay.max().compute()') # This writes the delayed computation for us and then runs it. # # Some things to note: # # 1. As with `dask.delayed`, we need to call `.compute()` when we're done. Up until this point everything is lazy. # 2. Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible. # - This lets us handle datasets that are larger than memory # - This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?) # # As with `Delayed` objects, you can view the underlying task graph using the `.visualize` method: # In[ ]: df.DepDelay.max().visualize() # ## Exercises # # In this section we do a few `dask.dataframe` computations. If you are comfortable with Pandas then these should be familiar. You will have to think about when to call `compute`. # # 1. In total, how many non-cancelled flights were taken from each airport? # 2. What was the average departure delay from each airport? # In[ ]: # Write your answer here # When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once. # # For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. Since dask operations are lazy, those values aren't the final results yet. They're just the recipe required to get the result. # # If we compute them with two calls to compute, there is no sharing of intermediate computations. # In[ ]: non_cancelled = df[~df.Cancelled] mean_delay = non_cancelled.DepDelay.mean() std_delay = non_cancelled.DepDelay.std() # In[ ]: get_ipython().run_cell_magic('time', '', '\nmean_delay_res = mean_delay.compute()\nstd_delay_res = std_delay.compute()\n') # But let's try by passing both to a single `compute` call. # In[ ]: get_ipython().run_cell_magic('time', '', '\nmean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)\n') # Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. In particular, using `dask.compute` only does the following once: # # - the calls to `read_csv` # - the filter (`df[~df.Cancelled]`) # - some of the necessary reductions (`sum`, `count`) # # ### How does this compare to Pandas? # # Pandas is more mature and fully featured than `dask.dataframe`. If your data fits in memory then you should use Pandas. The `dask.dataframe` module gives you a limited `pandas` experience when you operate on datasets that don't fit comfortably in memory. Dask.dataframe only really becomes meaningful for problems significantly larger than this, when Pandas breaks with the dreaded # # MemoryError: ... # # Furthermore, the distributed scheduler allows the same dataframe expressions to be executed across a cluster. To enable massive "big data" processing, one could execute data ingestion functions such as `read_csv`, where the data is held on storage accessible to every worker node (e.g., amazon's S3), and because most operations begin by selecting only some columns, transforming and filtering the data, only relatively small amounts of data need to be communicated between the machines. # # Dask.dataframe operations use `pandas` operations internally. Generally they run at about the same speed except in the following two cases: # # 1. Dask introduces a bit of overhead, around 1ms per task. This is usually negligible. # 2. When Pandas releases the GIL `dask.dataframe` can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don't release the GIL, multiple processes would be needed to get the same speedup. # ## Dask DataFrame Data Model # # For the most part, a Dask DataFrame feels like a pandas DataFrame. # So far, the biggest difference we've seen is that Dask operations are lazy; they build up a task graph instead of executing immediately (more details coming in [Schedulers](05_distributed.ipynb)). # This lets Dask do operations in parallel and out of core. # # A Dask DataFrame is composed of many pandas DataFrames. For `dask.dataframe` the chunking happens only along the index. # # # # We call each chunk a *partition*, and the upper / lower bounds are *divisions*. # Dask *can* store information about the divisions. For now, partitions come up when you write custom functions to apply to Dask DataFrames # # Parallel and Distributed Machine Learning # # [Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning. # ## Types of Scaling # # There are a couple of distinct scaling problems you might face. # The scaling strategy depends on which problem you're facing. # # 1. CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc. # 2. Memory-bound: Data is larger than RAM, and sampling isn't an option. # # ![](images/ml-dimensions.png) # * For in-memory problems, just use scikit-learn (or your favorite ML library). # * For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator # * For large datasets, use `dask_ml` estimators # Let's generate some random data. # In[ ]: from sklearn.datasets import make_classification X, y = make_classification(n_samples=10000, n_features=4, random_state=0) X[:8] # In[ ]: y[:8] # We'll fit a Support Vector Classifier. # In[ ]: from sklearn.svm import SVC # Create the estimator and fit it. # In[ ]: estimator = SVC(random_state=0) estimator.fit(X, y) # Inspect the learned attributes. # In[ ]: estimator.support_vectors_[:4] # Check the accuracy. # In[ ]: estimator.score(X, y) # ## Hyperparameter Optimization # # As we have seen, we learn the best hyperparameters in scikit-learn using `GridSearchCV`, which does a brute-force search over a grid of hyperparameter combinations. # In[ ]: from sklearn.model_selection import GridSearchCV # In[ ]: get_ipython().run_cell_magic('time', '', "estimator = SVC(gamma='auto', random_state=0, probability=True)\nparam_grid = {\n 'C': [0.001, 10.0],\n 'kernel': ['rbf', 'poly'],\n}\n\ngrid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)\ngrid_search.fit(X, y)\n") # ## Single-machine parallelism with scikit-learn # # ![](images/unmerged_grid_search_graph.svg) # # Scikit-Learn has nice *single-machine* parallelism, via Joblib. # Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword. # This controls the number of CPU cores that will be used. # In[ ]: get_ipython().run_cell_magic('time', '', 'grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)\ngrid_search.fit(X, y)\n') # ## Multi-machine parallelism with Dask # # ![](images/merged_grid_search_graph.svg) # # Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model. # # If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a distributed cluster. That would mean putting something in the call to `Client` something like # # ``` # c = Client('tcp://my.scheduler.address:8786') # ``` # # Details on the many ways to create a cluster can be found [here](https://docs.dask.org/en/latest/setup/single-distributed.html). # Let's try it on a larger problem (more hyperparameters). # In[ ]: import joblib import dask.distributed c = dask.distributed.Client() # In[ ]: param_grid = { 'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0], # Uncomment this for larger Grid searches on a cluster # 'kernel': ['rbf', 'poly', 'linear'], # 'shrinking': [True, False], } grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1) # In[ ]: get_ipython().run_cell_magic('time', '', 'with joblib.parallel_backend("dask", scatter=[X, y]):\n grid_search.fit(X, y)\n') # In[ ]: grid_search.best_params_, grid_search.best_score_ # ## Exercise # # Run parallel chains of the `disaster_model` example from PyMC and return the resulting traces to your client, for plotting and summarization. # In[ ]: # Write your answer here # ## References # # [Scientific Python Lectures](http://github.com/jrjohansson/scientific-python-lectures) by Robert Johansson # # [Using IPython for Parallel Computing](http://ipython.org/ipython-doc/dev/parallel/)