from __future__ import print_function, division, absolute_import
from collections import defaultdict
import logging
import uuid
from tornado import gen
import tornado.locks
try:
from cytoolz import merge
except ImportError:
from toolz import merge
from .client import Future, _get_global_client, Client
from .metrics import time
from .utils import tokey, log_errors
from .worker import get_client
logger = logging.getLogger(__name__)
class VariableExtension(object):
""" An extension for the scheduler to manage queues
This adds the following routes to the scheduler
* variable-set
* variable-get
* variable-delete
"""
def __init__(self, scheduler):
self.scheduler = scheduler
self.variables = dict()
self.waiting = defaultdict(set)
self.waiting_conditions = defaultdict(tornado.locks.Condition)
self.started = tornado.locks.Condition()
self.scheduler.handlers.update({'variable_set': self.set,
'variable_get': self.get})
self.scheduler.stream_handlers['variable-future-release'] = self.future_release
self.scheduler.stream_handlers['variable_delete'] = self.delete
self.scheduler.extensions['variables'] = self
def set(self, stream=None, name=None, key=None, data=None, client=None):
if key is not None:
record = {'type': 'Future', 'value': key}
self.scheduler.client_desires_keys(keys=[key], client='variable-%s' % name)
else:
record = {'type': 'msgpack', 'value': data}
try:
old = self.variables[name]
except KeyError:
pass
else:
if old['type'] == 'Future' and old['value'] != key:
self.release(old['value'], name)
if name not in self.variables:
self.started.notify_all()
self.variables[name] = record
@gen.coroutine
def release(self, key, name):
while self.waiting[key, name]:
yield self.waiting_conditions[name].wait()
self.scheduler.client_releases_keys(keys=[key],
client='variable-%s' % name)
del self.waiting[key, name]
def future_release(self, name=None, key=None, token=None, client=None):
self.waiting[key, name].remove(token)
if not self.waiting[key, name]:
self.waiting_conditions[name].notify_all()
@gen.coroutine
def get(self, stream=None, name=None, client=None, timeout=None):
start = time()
while name not in self.variables:
if timeout is not None:
left = timeout - (time() - start)
else:
left = None
if left and left < 0:
raise gen.TimeoutError()
yield self.started.wait(timeout=left)
record = self.variables[name]
if record['type'] == 'Future':
key = record['value']
token = uuid.uuid4().hex
ts = self.scheduler.tasks.get(key)
state = ts.state if ts is not None else 'lost'
msg = {'token': token, 'state': state}
if state == 'erred':
msg['exception'] = ts.exception_blame.exception
msg['traceback'] = ts.exception_blame.traceback
record = merge(record, msg)
self.waiting[key, name].add(token)
raise gen.Return(record)
@gen.coroutine
def delete(self, stream=None, name=None, client=None):
with log_errors():
try:
old = self.variables[name]
except KeyError:
pass
else:
if old['type'] == 'Future':
yield self.release(old['value'], name)
del self.waiting_conditions[name]
del self.variables[name]
[docs]class Variable(object):
""" Distributed Global Variable
This allows multiple clients to share futures and data between each other
with a single mutable variable. All metadata is sequentialized through the
scheduler. Race conditions can occur.
Values must be either Futures or msgpack-encodable data (ints, lists,
strings, etc..) All data will be kept and sent through the scheduler, so
it is wise not to send too much. If you want to share a large amount of
data then ``scatter`` it and share the future instead.
.. warning::
This object is experimental and has known issues in Python 2
Examples
--------
>>> from dask.distributed import Client, Variable # doctest: +SKIP
>>> client = Client() # doctest: +SKIP
>>> x = Variable('x') # doctest: +SKIP
>>> x.set(123) # docttest: +SKIP
>>> x.get() # docttest: +SKIP
123
>>> future = client.submit(f, x) # doctest: +SKIP
>>> x.set(future) # doctest: +SKIP
See Also
--------
Queue: shared multi-producer/multi-consumer queue between clients
"""
def __init__(self, name=None, client=None, maxsize=0):
self.client = client or _get_global_client()
self.name = name or 'variable-' + uuid.uuid4().hex
@gen.coroutine
def _set(self, value):
if isinstance(value, Future):
yield self.client.scheduler.variable_set(key=tokey(value.key),
name=self.name)
else:
yield self.client.scheduler.variable_set(data=value,
name=self.name)
[docs] def set(self, value, **kwargs):
""" Set the value of this variable
Parameters
----------
value: Future or object
Must be either a Future or a msgpack-encodable value
"""
return self.client.sync(self._set, value, **kwargs)
@gen.coroutine
def _get(self, timeout=None):
d = yield self.client.scheduler.variable_get(timeout=timeout,
name=self.name,
client=self.client.id)
if d['type'] == 'Future':
value = Future(d['value'], self.client, inform=True, state=d['state'])
if d['state'] == 'erred':
value._state.set_error(d['exception'], d['traceback'])
self.client._send_to_scheduler({'op': 'variable-future-release',
'name': self.name,
'key': d['value'],
'token': d['token']})
else:
value = d['value']
raise gen.Return(value)
[docs] def get(self, timeout=None, **kwargs):
""" Get the value of this variable """
return self.client.sync(self._get, timeout=timeout, **kwargs)
[docs] def delete(self):
""" Delete this variable
Caution, this affects all clients currently pointing to this variable.
"""
if self.client.status == 'running': # TODO: can leave zombie futures
self.client._send_to_scheduler({'op': 'variable_delete',
'name': self.name})
def __getstate__(self):
return (self.name, self.client.scheduler.address)
def __setstate__(self, state):
name, address = state
try:
client = get_client(address)
assert client.scheduler.address == address
except (AttributeError, AssertionError):
client = Client(address, set_as_default=False)
self.__init__(name=name, client=client)