Warning
This documentation is for an old version of IPython. You can find docs for newer versions here.
Module: parallel.client.client
¶
A semi-synchronous Client for the ZMQ cluster
Authors:
- MinRK
3 Classes¶
-
class
IPython.parallel.client.client.
ExecuteReply
(msg_id, content, metadata)¶ Bases:
IPython.utils.capture.RichOutput
wrapper for finished Execute results
-
__init__
(msg_id, content, metadata)¶
-
-
class
IPython.parallel.client.client.
Metadata
(*args, **kwargs)¶ Bases:
dict
Subclass of dict for initializing metadata values.
Attribute access works on keys.
These objects have a strict set of keys - errors will raise if you try to add new keys.
-
__init__
(*args, **kwargs)¶
-
-
class
IPython.parallel.client.client.
Client
(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)¶ Bases:
IPython.utils.traitlets.HasTraits
A semi-synchronous client to the IPython ZMQ cluster
Parameters: url_file : str/unicode; path to ipcontroller-client.json
This JSON file should contain all the information needed to connect to a cluster, and is likely the only argument needed. Connection information for the Hub’s registration. If a json connector file is given, then likely no further configuration is necessary. [Default: use profile]
profile : bytes
The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.
cluster_id : str
String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)
context : zmq.Context
Pass an existing zmq.Context instance, otherwise the client will create its own.
debug : bool
flag for lots of message printing for debug purposes
timeout : int/float
time (in seconds) to wait for connection replies from the Hub [Default: 10]
#————– session related args —————-
config : Config object
If specified, this will be relayed to the Session for configuration
username : str
set username for the session object
#————– ssh related args —————-
# These are args for configuring the ssh tunnel to be used
# credentials are used to forward connections over ssh to the Controller
# Note that the ip given in `addr` needs to be relative to sshserver
# The most basic case is to leave addr as pointing to localhost (127.0.0.1),
# and set sshserver as the same machine the Controller is on. However,
# the only requirement is that sshserver is able to see the Controller
# (i.e. is within the same trusted network).
sshserver : str
A string of the form passed to ssh, i.e. ‘server.tld’ or 'user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.
sshkey : str; path to ssh private key file
This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.
password : str
Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.
paramiko : bool
flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]
Attributes
ids
Always up-to-date ids property. history (list of msg_ids) a list of msg_ids, keeping track of all the execution messages you have submitted in order. outstanding (set of msg_ids) a set of msg_ids that have been submitted, but whose results have not yet been received. results (dict) a dict of all our results, keyed by msg_id block (bool) determines default behavior when block not specified in execution methods Methods
spin
()Flush any registration notifications and execution results waiting in the ZMQ queue. wait
([jobs, timeout])waits on one or more jobs, for up to timeout seconds. execution methods apply legacy: execute, run data movement push, pull, scatter, gather query methods queue_status, get_result, purge, result_status control methods abort, shutdown -
__init__
(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)¶
-
abort
(jobs=None, targets=None, block=None)¶ Abort specific jobs from the execution queues of target(s).
This is a mechanism to prevent jobs that have already been submitted from executing.
Parameters: jobs : msg_id, list of msg_ids, or AsyncResult
The jobs to be aborted
If unspecified/None: abort all outstanding jobs.
-
activate
(targets='all', suffix='')¶ Create a DirectView and register it with IPython magics
Defines the magics %px, %autopx, %pxresult, %%px
Parameters: targets: int, list of ints, or ‘all’
The engines on which the view’s magics will run
suffix: str [default: ‘’]
The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc.activate(targets=0, suffix='0')
will give you the magics%px0
,%pxresult0
, etc. for running magics just on engine 0.
-
clear
(targets=None, block=None)¶ Clear the namespace in target(s).
-
close
(linger=None)¶ Close my zmq Sockets
If linger, set the zmq LINGER socket option, which allows discarding of messages.
-
db_query
(query, keys=None)¶ Query the Hub’s TaskRecord database
This will return a list of task record dicts that match query
Parameters: query : mongodb query dict
The search dict. See mongodb query docs for details.
keys : list of strs [optional]
The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.
-
direct_view
(targets='all')¶ construct a DirectView object.
If no targets are specified, create a DirectView using all engines.
rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.
That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.
Parameters: targets: list,slice,int,etc. [default: use all engines]
The engines to use for the View
-
get_result
(indices_or_msg_ids=None, block=None)¶ Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
If the client already has the results, no request to the Hub will be made.
This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.
It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves
Parameters: indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
block : bool
Whether to wait for the result to be done
Returns: AsyncResult
A single AsyncResult object will always be returned.
AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
Examples
In [10]: r = client.apply()
-
hub_history
()¶ Get the Hub’s history
Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.
Any msg_id returned here is a valid argument to get_result.
Returns: msg_ids : list of strs
list of all msg_ids, ordered by task submission time.
-
ids
¶ Always up-to-date ids property.
-
load_balanced_view
(targets=None)¶ construct a DirectView object.
If no arguments are specified, create a LoadBalancedView using all engines.
Parameters: targets: list,slice,int,etc. [default: use all engines]
The subset of engines across which to load-balance
-
purge_everything
()¶ Clears all content from previous Tasks from both the hub and the local client
In addition to calling purge_results(“all”) it also deletes the history and other bookkeeping lists.
-
purge_hub_results
(jobs=[], targets=[])¶ Tell the Hub to forget results.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_results(‘all’) to scrub everything from the Hub’s db.
Parameters: jobs : str or list of str or AsyncResult objects
the msg_ids whose results should be forgotten.
targets : int/str/list of ints/strs
The targets, by int_id, whose entire history is to be purged.
default : None
-
purge_local_results
(jobs=[], targets=[])¶ Clears the client caches of results and their metadata.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_local_results(‘all’) to scrub everything from the Clients’s results and metadata caches.
After this call all AsyncResults are invalid and should be discarded.
If you must “reget” the results, you can still do so by using client.get_result(msg_id) or client.get_result(asyncresult). This will redownload the results from the hub if they are still available (i.e client.purge_hub_results(...) has not been called.
Parameters: jobs : str or list of str or AsyncResult objects
the msg_ids whose results should be purged.
targets : int/list of ints
The engines, by integer ID, whose entire result histories are to be purged.
Raises: RuntimeError : if any of the tasks to be purged are still outstanding.
-
purge_results
(jobs=[], targets=[])¶ Clears the cached results from both the hub and the local client
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_results(‘all’) to scrub every cached result from both the Hub’s and the Client’s db.
Equivalent to calling both purge_hub_results() and purge_client_results() with the same arguments.
Parameters: jobs : str or list of str or AsyncResult objects
the msg_ids whose results should be forgotten.
targets : int/str/list of ints/strs
The targets, by int_id, whose entire history is to be purged.
default : None
-
queue_status
(targets='all', verbose=False)¶ Fetch the status of engine queues.
Parameters: targets : int/str/list of ints/strs
the engines whose states are to be queried. default : all
verbose : bool
Whether to return lengths only, or lists of ids for each element
-
resubmit
(indices_or_msg_ids=None, metadata=None, block=None)¶ Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
Parameters: indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
block : bool
Whether to wait for the result to be done
Returns: AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
-
result_status
(msg_ids, status_only=True)¶ Check on the status of the result(s) of the apply request with msg_ids.
If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.
Parameters: msg_ids : list of msg_ids
- if int:
Passed as index to self.history for convenience.
status_only : bool (default: True)
- if False:
Retrieve the actual results of completed tasks.
Returns: results : dict
There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If status_only is False, then completed results will be keyed by their msg_id.
-
send_apply_request
(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None)¶ construct and send an apply message via a socket.
This is the principal method with which all engine execution is performed by views.
-
send_execute_request
(socket, code, silent=True, metadata=None, ident=None)¶ construct and send an execute request via a socket.
-
shutdown
(targets='all', restart=False, hub=False, block=None)¶ Terminates one or more engine processes, optionally including the hub.
Parameters: targets: list of ints or ‘all’ [default: all]
Which engines to shutdown.
hub: bool [default: False]
Whether to include the Hub. hub=True implies targets=’all’.
block: bool [default: self.block]
Whether to wait for clean shutdown replies or not.
restart: bool [default: False]
NOT IMPLEMENTED whether to restart engines after shutting them down.
-
spin
()¶ Flush any registration notifications and execution results waiting in the ZMQ queue.
-
spin_thread
(interval=1)¶ call Client.spin() in a background thread on some regular interval
This helps ensure that messages don’t pile up too much in the zmq queue while you are working on other things, or just leaving an idle terminal.
It also helps limit potential padding of the received timestamp on AsyncResult objects, used for timings.
Parameters: interval : float, optional
The interval on which to spin the client in the background thread (simply passed to time.sleep).
Notes
For precision timing, you may want to use this method to put a bound on the jitter (in seconds) in received timestamps used in AsyncResult.wall_time.
-
stop_spin_thread
()¶ stop background spin_thread, if any
-
wait
(jobs=None, timeout=-1)¶ waits on one or more jobs, for up to timeout seconds.
Parameters: jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
ints are indices to self.history strs are msg_ids default: wait on all outstanding messages
timeout : float
a time in seconds, after which to give up. default is -1, which means no timeout
Returns: True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
-