IPython’s multiengine interface

The multiengine interface represents one possible way of working with a set of IPython engines. The basic idea behind the multiengine interface is that the capabilities of each engine are directly and explicitly exposed to the user. Thus, in the multiengine interface, each engine is given an id that is used to identify the engine and give it work to do. This interface is very intuitive and is designed with interactive usage in mind, and is thus the best place for new users of IPython to begin.

Starting the IPython controller and engines

To follow along with this tutorial, you will need to start the IPython controller and four IPython engines. The simplest way of doing this is to use the ipcluster command:

$ ipcluster local -n 4

For more detailed information about starting the controller and engines, see our introduction to using IPython for parallel computing.

Creating a MultiEngineClient instance

The first step is to import the IPython IPython.kernel.client module and then create a MultiEngineClient instance:

In [1]: from IPython.kernel import client

In [2]: mec = client.MultiEngineClient()

This form assumes that the ipcontroller-mec.furl is in the ~./ipython/security directory on the client’s host. If not, the location of the FURL file must be given as an argument to the constructor:

In [2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl')

To make sure there are engines connected to the controller, use can get a list of engine ids:

In [3]: mec.get_ids()
Out[3]: [0, 1, 2, 3]

Here we see that there are four engines ready to do work for us.

Quick and easy parallelism

In many cases, you simply want to apply a Python function to a sequence of objects, but in parallel. The multiengine interface provides two simple ways of accomplishing this: a parallel version of map() and @parallel function decorator.

Parallel map

Python’s builtin map() functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the multiengine interface in IPython already has a parallel version of map() that works just like its serial counterpart:

In [63]: serial_result = map(lambda x:x**10, range(32))

In [64]: parallel_result = mec.map(lambda x:x**10, range(32))

In [65]: serial_result==parallel_result
Out[65]: True

Note

The multiengine interface version of map() does not do any load balancing. For a load balanced version, see the task interface.

See also

The map() method has a number of options that can be controlled by the mapper() method. See its docstring for more information.

Parallel function decorator

Parallel functions are just like normal function, but they can be called on sequences and in parallel. The multiengine interface provides a decorator that turns any Python function into a parallel function:

In [10]: @mec.parallel()
   ....: def f(x):
   ....:     return 10.0*x**4
   ....:

In [11]: f(range(32))    # this is done in parallel
Out[11]:
[0.0,10.0,160.0,...]

See the docstring for the parallel() decorator for options.

Running Python commands

The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the execute() method.

Blocking execution

In blocking mode, the MultiEngineClient object (called mec in these examples) submits the command to the controller, which places the command in the engines’ queues for execution. The execute() call then blocks until the engines are done executing the command:

# The default is to run on all engines
In [4]: mec.execute('a=5')
Out[4]:
<Results List>
[0] In [1]: a=5
[1] In [1]: a=5
[2] In [1]: a=5
[3] In [1]: a=5

In [5]: mec.execute('b=10')
Out[5]:
<Results List>
[0] In [2]: b=10
[1] In [2]: b=10
[2] In [2]: b=10
[3] In [2]: b=10

Python commands can be executed on specific engines by calling execute using the targets keyword argument:

In [6]: mec.execute('c=a+b',targets=[0,2])
Out[6]:
<Results List>
[0] In [3]: c=a+b
[2] In [3]: c=a+b


In [7]: mec.execute('c=a-b',targets=[1,3])
Out[7]:
<Results List>
[1] In [3]: c=a-b
[3] In [3]: c=a-b


In [8]: mec.execute('print c')
Out[8]:
<Results List>
[0] In [4]: print c
[0] Out[4]: 15

[1] In [4]: print c
[1] Out[4]: -5

[2] In [4]: print c
[2] Out[4]: 15

[3] In [4]: print c
[3] Out[4]: -5

This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The execute() method returns a Python dict that contains useful information:

In [9]: result_dict = mec.execute('d=10; print d')

In [10]: for r in result_dict:
   ....:     print r
   ....:
   ....:
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'}

Non-blocking execution

In non-blocking mode, execute() submits the command to be executed and then returns a PendingResult object immediately. The PendingResult object gives you a way of getting a result at a later time through its get_result() method or r attribute. This allows you to quickly submit long running commands without blocking your local Python/IPython session:

# In blocking mode
In [6]: mec.execute('import time')
Out[6]:
<Results List>
[0] In [1]: import time
[1] In [1]: import time
[2] In [1]: import time
[3] In [1]: import time

# In non-blocking mode
In [7]: pr = mec.execute('time.sleep(10)',block=False)

# Now block for the result
In [8]: pr.get_result()
Out[8]:
<Results List>
[0] In [2]: time.sleep(10)
[1] In [2]: time.sleep(10)
[2] In [2]: time.sleep(10)
[3] In [2]: time.sleep(10)

# Again in non-blocking mode
In [9]: pr = mec.execute('time.sleep(10)',block=False)

# Poll to see if the result is ready
In [10]: pr.get_result(block=False)

# A shorthand for get_result(block=True)
In [11]: pr.r
Out[11]:
<Results List>
[0] In [3]: time.sleep(10)
[1] In [3]: time.sleep(10)
[2] In [3]: time.sleep(10)
[3] In [3]: time.sleep(10)

Often, it is desirable to wait until a set of PendingResult objects are done. For this, there is a the method barrier(). This method takes a tuple of PendingResult objects and blocks until all of the associated results are ready:

In [72]: mec.block=False

# A trivial list of PendingResults objects
In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)]

# Wait until all of them are done
In [74]: mec.barrier(pr_list)

# Then, their results are ready using get_result or the r attribute
In [75]: pr_list[0].r
Out[75]:
<Results List>
[0] In [20]: time.sleep(3)
[1] In [19]: time.sleep(3)
[2] In [20]: time.sleep(3)
[3] In [19]: time.sleep(3)

The block and targets keyword arguments and attributes

Most methods in the multiengine interface (like execute()) accept block and targets as keyword arguments. As we have seen above, these keyword arguments control the blocking mode and which engines the command is applied to. The MultiEngineClient class also has block and targets attributes that control the default behavior when the keyword arguments are not provided. Thus the following logic is used for block and targets:

  • If no keyword argument is provided, the instance attributes are used.
  • Keyword argument, if provided override the instance attributes.

The following examples demonstrate how to use the instance attributes:

In [16]: mec.targets = [0,2]

In [17]: mec.block = False

In [18]: pr = mec.execute('a=5')

In [19]: pr.r
Out[19]:
<Results List>
[0] In [6]: a=5
[2] In [6]: a=5

# Note targets='all' means all engines
In [20]: mec.targets = 'all'

In [21]: mec.block = True

In [22]: mec.execute('b=10; print b')
Out[22]:
<Results List>
[0] In [7]: b=10; print b
[0] Out[7]: 10

[1] In [6]: b=10; print b
[1] Out[6]: 10

[2] In [7]: b=10; print b
[2] Out[7]: 10

[3] In [6]: b=10; print b
[3] Out[6]: 10

The block and targets instance attributes also determine the behavior of the parallel magic commands.

Parallel magic commands

We provide a few IPython magic commands (%px, %autopx and %result) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to execute() and get_result(). The %px magic executes a single Python command on the engines specified by the targets attribute of the MultiEngineClient instance (by default this is 'all'):

# Make this MultiEngineClient active for parallel magic commands
In [23]: mec.activate()

In [24]: mec.block=True

In [25]: import numpy

In [26]: %px import numpy
Executing command on Controller
Out[26]:
<Results List>
[0] In [8]: import numpy
[1] In [7]: import numpy
[2] In [8]: import numpy
[3] In [7]: import numpy


In [27]: %px a = numpy.random.rand(2,2)
Executing command on Controller
Out[27]:
<Results List>
[0] In [9]: a = numpy.random.rand(2,2)
[1] In [8]: a = numpy.random.rand(2,2)
[2] In [9]: a = numpy.random.rand(2,2)
[3] In [8]: a = numpy.random.rand(2,2)


In [28]: %px print numpy.linalg.eigvals(a)
Executing command on Controller
Out[28]:
<Results List>
[0] In [10]: print numpy.linalg.eigvals(a)
[0] Out[10]: [ 1.28167017  0.14197338]

[1] In [9]: print numpy.linalg.eigvals(a)
[1] Out[9]: [-0.14093616  1.27877273]

[2] In [10]: print numpy.linalg.eigvals(a)
[2] Out[10]: [-0.37023573  1.06779409]

[3] In [9]: print numpy.linalg.eigvals(a)
[3] Out[9]: [ 0.83664764 -0.25602658]

The %result magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the get_result() method:

In [29]: %result
Out[29]:
<Results List>
[0] In [10]: print numpy.linalg.eigvals(a)
[0] Out[10]: [ 1.28167017  0.14197338]

[1] In [9]: print numpy.linalg.eigvals(a)
[1] Out[9]: [-0.14093616  1.27877273]

[2] In [10]: print numpy.linalg.eigvals(a)
[2] Out[10]: [-0.37023573  1.06779409]

[3] In [9]: print numpy.linalg.eigvals(a)
[3] Out[9]: [ 0.83664764 -0.25602658]

The %autopx magic switches to a mode where everything you type is executed on the engines given by the targets attribute:

In [30]: mec.block=False

In [31]: %autopx
Auto Parallel Enabled
Type %autopx to disable

In [32]: max_evals = []
<IPython.kernel.multiengineclient.PendingResult object at 0x17b8a70>

In [33]: for i in range(100):
   ....:     a = numpy.random.rand(10,10)
   ....:     a = a+a.transpose()
   ....:     evals = numpy.linalg.eigvals(a)
   ....:     max_evals.append(evals[0].real)
   ....:
   ....:
<IPython.kernel.multiengineclient.PendingResult object at 0x17af8f0>

In [34]: %autopx
Auto Parallel Disabled

In [35]: mec.block=True

In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
Executing command on Controller
Out[36]:
<Results List>
[0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[0] Out[13]: Average max eigenvalue is:  10.1387247332

[1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[1] Out[12]: Average max eigenvalue is:  10.2076902286

[2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[2] Out[13]: Average max eigenvalue is:  10.1891484655

[3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[3] Out[12]: Average max eigenvalue is:  10.1158837784

Moving Python objects around

In addition to executing code on engines, you can transfer Python objects to and from your IPython session and the engines. In IPython, these operations are called push() (sending an object to the engines) and pull() (getting an object from the engines).

Basic push and pull

Here are some examples of how you use push() and pull():

In [38]: mec.push(dict(a=1.03234,b=3453))
Out[38]: [None, None, None, None]

In [39]: mec.pull('a')
Out[39]: [1.03234, 1.03234, 1.03234, 1.03234]

In [40]: mec.pull('b',targets=0)
Out[40]: [3453]

In [41]: mec.pull(('a','b'))
Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]]

In [42]: mec.zip_pull(('a','b'))
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]

In [43]: mec.push(dict(c='speed'))
Out[43]: [None, None, None, None]

In [44]: %px print c
Executing command on Controller
Out[44]:
<Results List>
[0] In [14]: print c
[0] Out[14]: speed

[1] In [13]: print c
[1] Out[13]: speed

[2] In [14]: print c
[2] Out[14]: speed

[3] In [13]: print c
[3] Out[13]: speed

In non-blocking mode push() and pull() also return PendingResult objects:

In [47]: mec.block=False

In [48]: pr = mec.pull('a')

In [49]: pr.r
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]

Push and pull for functions

Functions can also be pushed and pulled using push_function() and pull_function():

In [52]: mec.block=True

    In [53]: def f(x):
       ....:     return 2.0*x**4
       ....:

    In [54]: mec.push_function(dict(f=f))
    Out[54]: [None, None, None, None]

    In [55]: mec.execute('y = f(4.0)')
    Out[55]:
    <Results List>
    [0] In [15]: y = f(4.0)
    [1] In [14]: y = f(4.0)
    [2] In [15]: y = f(4.0)
    [3] In [14]: y = f(4.0)


    In [56]: px print y
    Executing command on Controller
    Out[56]:
    <Results List>
    [0] In [16]: print y
    [0] Out[16]: 512.0

    [1] In [15]: print y
    [1] Out[15]: 512.0

    [2] In [16]: print y
    [2] Out[16]: 512.0

    [3] In [15]: print y
    [3] Out[15]: 512.0

Dictionary interface

As a shorthand to push() and pull(), the MultiEngineClient class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses push() and pull():

In [50]: mec.block=True

In [51]: mec['a']=['foo','bar']

In [52]: mec['a']
Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']]

Scatter and gather

Sometimes it is useful to partition a sequence and push the partitions to different engines. In MPI language, this is know as scatter/gather and we follow that terminology. However, it is important to remember that in IPython’s MultiEngineClient class, scatter() is from the interactive IPython session to the engines and gather() is from the engines back to the interactive IPython session. For scatter/gather operations between engines, MPI should be used:

In [58]: mec.scatter('a',range(16))
Out[58]: [None, None, None, None]

In [59]: px print a
Executing command on Controller
Out[59]:
<Results List>
[0] In [17]: print a
[0] Out[17]: [0, 1, 2, 3]

[1] In [16]: print a
[1] Out[16]: [4, 5, 6, 7]

[2] In [17]: print a
[2] Out[17]: [8, 9, 10, 11]

[3] In [16]: print a
[3] Out[16]: [12, 13, 14, 15]


In [60]: mec.gather('a')
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

Other things to look at

How to do parallel list comprehensions

In many cases list comprehensions are nicer than using the map function. While we don’t have fully parallel list comprehensions, it is simple to get the basic effect using scatter() and gather():

In [66]: mec.scatter('x',range(64))
Out[66]: [None, None, None, None]

In [67]: px y = [i**10 for i in x]
Executing command on Controller
Out[67]:
<Results List>
[0] In [19]: y = [i**10 for i in x]
[1] In [18]: y = [i**10 for i in x]
[2] In [19]: y = [i**10 for i in x]
[3] In [18]: y = [i**10 for i in x]


In [68]: y = mec.gather('y')

In [69]: print y
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]

Parallel exceptions

In the multiengine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a CompositeError exception class that will be raised in most cases. The CompositeError class is a special type of exception that wraps one or more other types of exceptions. Here is how it works:

In [76]: mec.block=True

In [77]: mec.execute('1/0')
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<ipython console> in <module>()

/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
    432         targets, block = self._findTargetsAndBlock(targets, block)
    433         result = blockingCallFromThread(self.smultiengine.execute, lines,
--> 434             targets=targets, block=block)
    435         if block:
    436             result = ResultList(result)

/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
     72             result.raiseException()
     73         except Exception, e:
---> 74             raise e
     75     return result
     76

CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero

Notice how the error message printed when CompositeError is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions:

In [80]: try:
   ....:     mec.execute('1/0')
   ....: except client.CompositeError, e:
   ....:     e.raise_exception()
   ....:
   ....:
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<ipython console> in <module>()

/ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
    156             raise IndexError("an exception with index %i does not exist"%excid)
    157         else:
--> 158             raise et, ev, etb
    159
    160 def collect_exceptions(rlist, method):

ZeroDivisionError: integer division or modulo by zero

If you are working in IPython, you can simple type %debug after one of these CompositeError exceptions is raised, and inspect the exception instance:

In [81]: mec.execute('1/0')
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<ipython console> in <module>()

/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
    432         targets, block = self._findTargetsAndBlock(targets, block)
    433         result = blockingCallFromThread(self.smultiengine.execute, lines,
--> 434             targets=targets, block=block)
    435         if block:
    436             result = ResultList(result)

/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
     72             result.raiseException()
     73         except Exception, e:
---> 74             raise e
     75     return result
     76

CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero

In [82]: %debug
>

/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
     73         except Exception, e:
---> 74             raise e
     75     return result

ipdb> e.
e.__class__         e.__getitem__       e.__new__           e.__setstate__      e.args
e.__delattr__       e.__getslice__      e.__reduce__        e.__str__           e.elist
e.__dict__          e.__hash__          e.__reduce_ex__     e.__weakref__       e.message
e.__doc__           e.__init__          e.__repr__          e._get_engine_str   e.print_tracebacks
e.__getattribute__  e.__module__        e.__setattr__       e._get_traceback    e.raise_exception
ipdb> e.print_tracebacks()
[0:execute]:
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<string> in <module>()

ZeroDivisionError: integer division or modulo by zero

[1:execute]:
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<string> in <module>()

ZeroDivisionError: integer division or modulo by zero

[2:execute]:
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<string> in <module>()

ZeroDivisionError: integer division or modulo by zero

[3:execute]:
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<string> in <module>()

ZeroDivisionError: integer division or modulo by zero

Note

The above example appears to be broken right now because of a change in how we are using Twisted.

All of this same error handling magic even works in non-blocking mode:

In [83]: mec.block=False

In [84]: pr = mec.execute('1/0')

In [85]: pr.r
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)

/ipython1-client-r3021/docs/examples/<ipython console> in <module>()

/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
    170
    171     def _get_r(self):
--> 172         return self.get_result(block=True)
    173
    174     r = property(_get_r)

/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
    131                 return self.result
    132         try:
--> 133             result = self.client.get_pending_deferred(self.result_id, block)
    134         except error.ResultNotCompleted:
    135             return default

/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
    385
    386     def get_pending_deferred(self, deferredID, block):
--> 387         return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
    388
    389     def barrier(self, pendingResults):

/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
     72             result.raiseException()
     73         except Exception, e:
---> 74             raise e
     75     return result
     76

CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero