Error while calling Dask cluster in Nameko worker

Hi,

I’m trying to use Dask within a Nameko service, but I’m running into some issues.
Here is a sample code I use:


from nameko.rpc import rpc

from dask.distributed import Client
import dask.array as da

# creates a local Dask Cluster
client = Client()

class TestService(object):

    @rpc
    def task(self) -> str:

        dim = 100000
        chunk = 10000

        x = da.random.random((dim, dim), chunks=(chunk, chunk))
        y = da.exp(x).sum()

        y.compute()

Unfortunately, the following exception is raised at startup:


Traceback (most recent call last):
  
   ....

    main(args)
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/cli/run.py", line 179, in main
    import_service(path)
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/cli/run.py", line 44, in import_service
    __import__(module_name)
  File "/home/user/test_service/start.py", line 5, in <module>
    from test_service import TestService
  File "/home/user/test_service/test_service.py", line 16, in <module>
    from dask.distributed import Client
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/distributed.py", line 11, in <module>
    from distributed import *
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/__init__.py", line 7, in <module>
    from .actor import Actor, ActorFuture
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/actor.py", line 5, in <module>
    from .client import Future
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/client.py", line 59, in <module>
    from .batched import BatchedSend
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/batched.py", line 10, in <module>
    from .core import CommClosedError
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/core.py", line 28, in <module>
    from .comm import (
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/__init__.py", line 25, in <module>
    _register_transports()
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/__init__.py", line 17, in _register_transports
    from . import inproc, tcp, ws
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 387, in <module>
    class BaseTCPConnector(Connector, RequireEncryptionMixin):
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 389, in BaseTCPConnector
    _resolver = netutil.ExecutorResolver(close_executor=False, executor=_executor)
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/util.py", line 288, in __new__
    instance.initialize(*args, **init_kwargs)
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/netutil.py", line 427, in initialize
    self.io_loop = IOLoop.current()
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/tornado/ioloop.py", line 263, in current
    loop = asyncio.get_event_loop()
  File "/home/user/.pyenv/versions/3.9.9/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.

I worked this around by setting an asyncio event loop before dask.distributed is imported:

import asyncio

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

from nameko.rpc import rpc
from dask.distributed import Client
import dask.array as da
client = Client()

class TestService(object):
    ...

Now, when the RPC is called and the task is executed, I’m getting a different error:

ERROR:    | 2022-04-04 09:58:56.758 | nameko.containers::/...te-packages/nameko/containers.py:399 
error handling worker <WorkerContext [test_service.task] at 0x7fb47a9bb8b0>: 'coroutine' object is not iterable
Traceback (most recent call last):
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/containers.py", line 392, in _run_worker
    result = method(*worker_ctx.args, **worker_ctx.kwargs)
  File "/home/user/test_service/test_service.py", line 48, in pretend_task
    y.compute()
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/user/test_service/.venv/lib/python3.9/site-packages/dask/base.py", line 572, in compute
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
TypeError: 'coroutine' object is not iterable
/home/user/test_service/.venv/lib/python3.9/site-packages/nameko/containers.py:417: RuntimeWarning: coroutine 'Client._gather' was never awaited
  del exc_info

I tried to place the Client() call within the RPC (among other attempts) but I clearly need to understand why I’m getting this error when interacting with Dask. Any suggestion? Thanks!

probably because asyncio and eventlet works differently. Nameko is build on top of eventlet and uses implicit switching between co-routines. Asyncio uses explicit yield.

1 Like

AFAIK there is no simple way to mix and match asyncio and eventlet concurrency models in the same Python process. You will get eventlloop-related errors and it’s not worth the time to debug those. Best to keep code that uses nameko/eventlet in a separate process or service, away from the async/await code.

I understand, it seems that mixing up Nameko with Dask may not be as straightforward as I hoped, thank you for your help!