Hi,
I’m trying to use Dask within a Nameko service, but I’m running into some issues.
Here is a sample code I use:
from nameko.rpc import rpc
from dask.distributed import Client
import dask.array as da
# creates a local Dask Cluster
client = Client()
class TestService(object):
@rpc
def task(self) -> str:
dim = 100000
chunk = 10000
x = da.random.random((dim, dim), chunks=(chunk, chunk))
y = da.exp(x).sum()
y.compute()
Unfortunately, the following exception is raised at startup:
Traceback (most recent call last):
....
main(args)
File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/cli/run.py", line 179, in main
import_service(path)
File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/cli/run.py", line 44, in import_service
__import__(module_name)
File "/home/user/test_service/start.py", line 5, in <module>
from test_service import TestService
File "/home/user/test_service/test_service.py", line 16, in <module>
from dask.distributed import Client
File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/distributed.py", line 11, in <module>
from distributed import *
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/__init__.py", line 7, in <module>
from .actor import Actor, ActorFuture
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/actor.py", line 5, in <module>
from .client import Future
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/client.py", line 59, in <module>
from .batched import BatchedSend
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/batched.py", line 10, in <module>
from .core import CommClosedError
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/core.py", line 28, in <module>
from .comm import (
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/__init__.py", line 25, in <module>
_register_transports()
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/__init__.py", line 17, in _register_transports
from . import inproc, tcp, ws
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 387, in <module>
class BaseTCPConnector(Connector, RequireEncryptionMixin):
File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 389, in BaseTCPConnector
_resolver = netutil.ExecutorResolver(close_executor=False, executor=_executor)
File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/util.py", line 288, in __new__
instance.initialize(*args, **init_kwargs)
File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/netutil.py", line 427, in initialize
self.io_loop = IOLoop.current()
File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/ioloop.py", line 263, in current
loop = asyncio.get_event_loop()
File "/home/user/.pyenv/versions/3.9.9/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.
I worked this around by setting an asyncio
event loop before dask.distributed
is imported:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
from nameko.rpc import rpc
from dask.distributed import Client
import dask.array as da
client = Client()
class TestService(object):
...
Now, when the RPC is called and the task is executed, I’m getting a different error:
ERROR: | 2022-04-04 09:58:56.758 | nameko.containers::/...te-packages/nameko/containers.py:399
error handling worker <WorkerContext [test_service.task] at 0x7fb47a9bb8b0>: 'coroutine' object is not iterable
Traceback (most recent call last):
File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/containers.py", line 392, in _run_worker
result = method(*worker_ctx.args, **worker_ctx.kwargs)
File "/home/user/test_service/test_service.py", line 48, in pretend_task
y.compute()
File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/base.py", line 572, in compute
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
TypeError: 'coroutine' object is not iterable
/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/containers.py:417: RuntimeWarning: coroutine 'Client._gather' was never awaited
del exc_info
I tried to place the Client()
call within the RPC (among other attempts) but I clearly need to understand why I’m getting this error when interacting with Dask. Any suggestion? Thanks!