Warning

This documentation is for an old version of IPython. You can find docs for newer versions here.

Module: parallel.client.client

A semi-synchronous Client for the ZMQ cluster

Authors:

  • MinRK

3 Classes

class IPython.parallel.client.client.ExecuteReply(msg_id, content, metadata)

Bases: IPython.utils.capture.RichOutput

wrapper for finished Execute results

__init__(msg_id, content, metadata)
class IPython.parallel.client.client.Metadata(*args, **kwargs)

Bases: dict

Subclass of dict for initializing metadata values.

Attribute access works on keys.

These objects have a strict set of keys - errors will raise if you try to add new keys.

__init__(*args, **kwargs)
class IPython.parallel.client.client.Client(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)

Bases: IPython.utils.traitlets.HasTraits

A semi-synchronous client to the IPython ZMQ cluster

Parameters:

url_file : str/unicode; path to ipcontroller-client.json

This JSON file should contain all the information needed to connect to a cluster, and is likely the only argument needed. Connection information for the Hub’s registration. If a json connector file is given, then likely no further configuration is necessary. [Default: use profile]

profile : bytes

The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.

cluster_id : str

String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)

context : zmq.Context

Pass an existing zmq.Context instance, otherwise the client will create its own.

debug : bool

flag for lots of message printing for debug purposes

timeout : int/float

time (in seconds) to wait for connection replies from the Hub [Default: 10]

#————– session related args —————-

config : Config object

If specified, this will be relayed to the Session for configuration

username : str

set username for the session object

#————– ssh related args —————-

# These are args for configuring the ssh tunnel to be used

# credentials are used to forward connections over ssh to the Controller

# Note that the ip given in `addr` needs to be relative to sshserver

# The most basic case is to leave addr as pointing to localhost (127.0.0.1),

# and set sshserver as the same machine the Controller is on. However,

# the only requirement is that sshserver is able to see the Controller

# (i.e. is within the same trusted network).

sshserver : str

A string of the form passed to ssh, i.e. ‘server.tld’ or 'user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.

sshkey : str; path to ssh private key file

This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.

password : str

Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.

paramiko : bool

flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]

Attributes

ids Always up-to-date ids property.
history (list of msg_ids) a list of msg_ids, keeping track of all the execution messages you have submitted in order.
outstanding (set of msg_ids) a set of msg_ids that have been submitted, but whose results have not yet been received.
results (dict) a dict of all our results, keyed by msg_id
block (bool) determines default behavior when block not specified in execution methods

Methods

spin() Flush any registration notifications and execution results waiting in the ZMQ queue.
wait([jobs, timeout]) waits on one or more jobs, for up to timeout seconds.
execution methods apply legacy: execute, run
data movement push, pull, scatter, gather
query methods queue_status, get_result, purge, result_status
control methods abort, shutdown
__init__(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)
abort(jobs=None, targets=None, block=None)

Abort specific jobs from the execution queues of target(s).

This is a mechanism to prevent jobs that have already been submitted from executing.

Parameters:

jobs : msg_id, list of msg_ids, or AsyncResult

The jobs to be aborted

If unspecified/None: abort all outstanding jobs.

activate(targets='all', suffix='')

Create a DirectView and register it with IPython magics

Defines the magics %px, %autopx, %pxresult, %%px

Parameters:

targets: int, list of ints, or ‘all’

The engines on which the view’s magics will run

suffix: str [default: ‘’]

The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

e.g. rc.activate(targets=0, suffix='0') will give you the magics %px0, %pxresult0, etc. for running magics just on engine 0.

clear(targets=None, block=None)

Clear the namespace in target(s).

close(linger=None)

Close my zmq Sockets

If linger, set the zmq LINGER socket option, which allows discarding of messages.

db_query(query, keys=None)

Query the Hub’s TaskRecord database

This will return a list of task record dicts that match query

Parameters:

query : mongodb query dict

The search dict. See mongodb query docs for details.

keys : list of strs [optional]

The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.

direct_view(targets='all')

construct a DirectView object.

If no targets are specified, create a DirectView using all engines.

rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.

That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.

Parameters:

targets: list,slice,int,etc. [default: use all engines]

The engines to use for the View

get_result(indices_or_msg_ids=None, block=None)

Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.

If the client already has the results, no request to the Hub will be made.

This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.

It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves

Parameters:

indices_or_msg_ids : integer history index, str msg_id, or list of either

The indices or msg_ids of indices to be retrieved

block : bool

Whether to wait for the result to be done

Returns:

AsyncResult

A single AsyncResult object will always be returned.

AsyncHubResult

A subclass of AsyncResult that retrieves results from the Hub

Examples

In [10]: r = client.apply()
hub_history()

Get the Hub’s history

Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.

Any msg_id returned here is a valid argument to get_result.

Returns:

msg_ids : list of strs

list of all msg_ids, ordered by task submission time.

ids

Always up-to-date ids property.

load_balanced_view(targets=None)

construct a DirectView object.

If no arguments are specified, create a LoadBalancedView using all engines.

Parameters:

targets: list,slice,int,etc. [default: use all engines]

The subset of engines across which to load-balance

purge_everything()

Clears all content from previous Tasks from both the hub and the local client

In addition to calling purge_results(“all”) it also deletes the history and other bookkeeping lists.

purge_hub_results(jobs=[], targets=[])

Tell the Hub to forget results.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results(‘all’) to scrub everything from the Hub’s db.

Parameters:

jobs : str or list of str or AsyncResult objects

the msg_ids whose results should be forgotten.

targets : int/str/list of ints/strs

The targets, by int_id, whose entire history is to be purged.

default : None

purge_local_results(jobs=[], targets=[])

Clears the client caches of results and their metadata.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_local_results(‘all’) to scrub everything from the Clients’s results and metadata caches.

After this call all AsyncResults are invalid and should be discarded.

If you must “reget” the results, you can still do so by using client.get_result(msg_id) or client.get_result(asyncresult). This will redownload the results from the hub if they are still available (i.e client.purge_hub_results(...) has not been called.

Parameters:

jobs : str or list of str or AsyncResult objects

the msg_ids whose results should be purged.

targets : int/list of ints

The engines, by integer ID, whose entire result histories are to be purged.

Raises:

RuntimeError : if any of the tasks to be purged are still outstanding.

purge_results(jobs=[], targets=[])

Clears the cached results from both the hub and the local client

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results(‘all’) to scrub every cached result from both the Hub’s and the Client’s db.

Equivalent to calling both purge_hub_results() and purge_client_results() with the same arguments.

Parameters:

jobs : str or list of str or AsyncResult objects

the msg_ids whose results should be forgotten.

targets : int/str/list of ints/strs

The targets, by int_id, whose entire history is to be purged.

default : None

queue_status(targets='all', verbose=False)

Fetch the status of engine queues.

Parameters:

targets : int/str/list of ints/strs

the engines whose states are to be queried. default : all

verbose : bool

Whether to return lengths only, or lists of ids for each element

resubmit(indices_or_msg_ids=None, metadata=None, block=None)

Resubmit one or more tasks.

in-flight tasks may not be resubmitted.

Parameters:

indices_or_msg_ids : integer history index, str msg_id, or list of either

The indices or msg_ids of indices to be retrieved

block : bool

Whether to wait for the result to be done

Returns:

AsyncHubResult

A subclass of AsyncResult that retrieves results from the Hub

result_status(msg_ids, status_only=True)

Check on the status of the result(s) of the apply request with msg_ids.

If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.

Parameters:

msg_ids : list of msg_ids

if int:

Passed as index to self.history for convenience.

status_only : bool (default: True)

if False:

Retrieve the actual results of completed tasks.

Returns:

results : dict

There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If status_only is False, then completed results will be keyed by their msg_id.

send_apply_request(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None)

construct and send an apply message via a socket.

This is the principal method with which all engine execution is performed by views.

send_execute_request(socket, code, silent=True, metadata=None, ident=None)

construct and send an execute request via a socket.

shutdown(targets='all', restart=False, hub=False, block=None)

Terminates one or more engine processes, optionally including the hub.

Parameters:

targets: list of ints or ‘all’ [default: all]

Which engines to shutdown.

hub: bool [default: False]

Whether to include the Hub. hub=True implies targets=’all’.

block: bool [default: self.block]

Whether to wait for clean shutdown replies or not.

restart: bool [default: False]

NOT IMPLEMENTED whether to restart engines after shutting them down.

spin()

Flush any registration notifications and execution results waiting in the ZMQ queue.

spin_thread(interval=1)

call Client.spin() in a background thread on some regular interval

This helps ensure that messages don’t pile up too much in the zmq queue while you are working on other things, or just leaving an idle terminal.

It also helps limit potential padding of the received timestamp on AsyncResult objects, used for timings.

Parameters:

interval : float, optional

The interval on which to spin the client in the background thread (simply passed to time.sleep).

Notes

For precision timing, you may want to use this method to put a bound on the jitter (in seconds) in received timestamps used in AsyncResult.wall_time.

stop_spin_thread()

stop background spin_thread, if any

wait(jobs=None, timeout=-1)

waits on one or more jobs, for up to timeout seconds.

Parameters:

jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects

ints are indices to self.history strs are msg_ids default: wait on all outstanding messages

timeout : float

a time in seconds, after which to give up. default is -1, which means no timeout

Returns:

True : when all msg_ids are done

False : timeout reached, some msg_ids still outstanding

1 Function

IPython.parallel.client.client.spin_first(f)

Call spin() to sync state prior to calling the method.