Nameko DeadLock - Sharing Experience

Hi all,

I have been experiencing a series of unexpected Deadlock in Production. I
spent some times working on recreating those deadlock locally and trying to
fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors... If
anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell

n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to avoid
any deadlock by overriding the RcpProxy use_confirms property to False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self.reply_listener)

1 Like

Hi Marco,

How are you running your services? calling `python deadlock.py` i get an
error about missing amqp url, and with `nameko run deadlock` things seem to
work fine. how come you are using your own runner? if you do, note that you
(at least the sample code) are missing the call to `eventlet.monkey_patch`
which needs to happen at the top of your module (before other imports)

Best,
David

···

On Friday, 26 May 2017 23:41:14 UTC+1, ma...@clearmetal.com wrote:

Hi all,

I have been experiencing a series of unexpected Deadlock in Production. I
spent some times working on recreating those deadlock locally and trying to
fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors... If
anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell
>>n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to avoid
any deadlock by overriding the RcpProxy use_confirms property to False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self.reply_listener)

Hi Marco,

Are you using Nameko 2.5.3 here? If so, please upgrade. That version has
some problems with deadlock when the prefetch count is very low.

I can reproduce your deadlock on 2.5.3, but not on 2.6.0. The interaction
of the prefetch and worker count is complex in Nameko<2.5.4. You need
available prefetch count to consume acks, and available workers to send
them. Increasing one or the other can sometimes unstick a stuck cluster,
but it depends very much on the test-case and the timing. Your "fix" of
disabling message acknowledgements is probably only disrupting the timing
sufficiently to avoid a race-condition, rather than fixing the problem.

Fortunately everything is much simpler from Nameko 2.5.4 onwards and you
should not experience any of these deadlocks.

Finally, I'm not sure why you've implemented your own runner, but the
approach you've used is exactly the way I would recommend: the "run"
function and "import_services" from nameko.cli.run do the heavy-lifting.
The "eventlet.monkey_patch" that David mentioned is actually included when
you import that module.

Matt.

···

On Saturday, May 27, 2017 at 9:09:28 AM UTC+1, David Szotten wrote:

Hi Marco,

How are you running your services? calling `python deadlock.py` i get an
error about missing amqp url, and with `nameko run deadlock` things seem to
work fine. how come you are using your own runner? if you do, note that you
(at least the sample code) are missing the call to `eventlet.monkey_patch`
which needs to happen at the top of your module (before other imports)

Best,
David

On Friday, 26 May 2017 23:41:14 UTC+1, ma...@clearmetal.com wrote:

Hi all,

I have been experiencing a series of unexpected Deadlock in Production. I
spent some times working on recreating those deadlock locally and trying to
fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors... If
anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell
>>n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to avoid
any deadlock by overriding the RcpProxy use_confirms property to False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self
.reply_listener)

1 Like

Thanks Matt and David for your quick reply.
I upgraded to nameko 2.6.0 and it fixes the deadlock as you mentioned.

I am still not sure to understand the relation between max_workers and
prefetch_count. The Max workers seems to set the size of the GreenPool at
the service level (used to process all the service entrypoints). The
prefetch_count seems to be applied at the channel (almost connection)
level. In the nameko implementation, a specific container service has
multiples event_handler or rpc entrypoints which creates multiples queues
on rabbitmq side and multiples channel consequently. Those 2 metrics does
not seem to be conceptually at different level...

Also, from what I understand, the prefetch_count essentially allow a
performance improvement. Consequently, it should not be linked to any
deadlock issue, simply speed.

Please lemme know if I got something wrong.

PS: concerning the implementation of our own runner, it essentially comes
from being able to merge variables from our settings.py and nameko.yml. In
this example, there is no need for it.

···

On Tuesday, May 30, 2017 at 6:43:16 AM UTC-7, Matt Yule-Bennett wrote:

Hi Marco,

Are you using Nameko 2.5.3 here? If so, please upgrade. That version has
some problems with deadlock when the prefetch count is very low.

I can reproduce your deadlock on 2.5.3, but not on 2.6.0. The interaction
of the prefetch and worker count is complex in Nameko<2.5.4. You need
available prefetch count to consume acks, and available workers to send
them. Increasing one or the other can sometimes unstick a stuck cluster,
but it depends very much on the test-case and the timing. Your "fix" of
disabling message acknowledgements is probably only disrupting the timing
sufficiently to avoid a race-condition, rather than fixing the problem.

Fortunately everything is much simpler from Nameko 2.5.4 onwards and you
should not experience any of these deadlocks.

Finally, I'm not sure why you've implemented your own runner, but the
approach you've used is exactly the way I would recommend: the "run"
function and "import_services" from nameko.cli.run do the heavy-lifting.
The "eventlet.monkey_patch" that David mentioned is actually included when
you import that module.

Matt.

On Saturday, May 27, 2017 at 9:09:28 AM UTC+1, David Szotten wrote:

Hi Marco,

How are you running your services? calling `python deadlock.py` i get an
error about missing amqp url, and with `nameko run deadlock` things seem to
work fine. how come you are using your own runner? if you do, note that you
(at least the sample code) are missing the call to `eventlet.monkey_patch`
which needs to happen at the top of your module (before other imports)

Best,
David

On Friday, 26 May 2017 23:41:14 UTC+1, ma...@clearmetal.com wrote:

Hi all,

I have been experiencing a series of unexpected Deadlock in Production.
I spent some times working on recreating those deadlock locally and trying
to fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors... If
anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell
>>n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to avoid
any deadlock by overriding the RcpProxy use_confirms property to False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self
.reply_listener)

Glad everything is fixed now.

At the risk of airing our dirty laundry, I’ll try to explain the previous
problems with deadlocking.

Background:

Deadlocks were always related to AMQP-based entrypoints (@rpc,
@event_handler and @consume). All messages are consumed from RabbitMQ using
the `QueueConsumer` SharedExtension. It knows which queues should be
consumed from, maintains a single connection and channel, and creates a
kombu Consumer to receive messages, which are then distributed back out to
the various ‚Äúproviders‚ÄĚ that have delegated consumption to it.

This is not a great design (it would be better to have a channel per
consumer and a shared connection), but it’s how things currently work.

The prefetch_count value is equal to the size of the worker pool
(max_workers), which defaults to 10. This is also not very clever, because
they’re two distinct concepts and should be managed separately, but it
works OK in a limited set of scenarios. Since RabbitMQ 3.3.0 the prefetch
count is applied to the consumers, meaning it is possible to have more
unack’d messages than available workers.

Prior to Nameko 2.5.3:

Since the very beginning there has been a nasty bug that could cause
deadlocks. The root cause was the fact that the AMQP entrypoints blocked on
the worker pool ‚ÄĒ so when a message arrives, the entrypoint would wait for
a worker to be free until it returned control back to the QueueConsumer.
Messages are only ack’d after the entrypoint has completed (after the
worker thread has exited) and it was the QueueConsumer thread’s
responsibility to send them. So while the entrypoint was blocking for the
worker pool, no messages could be ack’d. If the prefetch count was
exhausted at that point, no more messages would be delivered, and if the
currently running workers were making RPC calls to other services, they
would wait forever for their RPC replies (also AMQP messages) to be
delivered. Since we ack after the entrypoint has completed, these hanging
workers consumed all the available prefetch.

This long-standing bug went undetected because you’d have to execute 10
concurrent workers across multiple AMQP entrypoints, that also made RPC
calls, before it deadlocked. The gist at


demonstrates all of these failure cases.

In Nameko 2.5.3:

We (hastily) increased the prefetch count to be the size of the worker pool
+ 1. This was done because we identified that there was a performance
bottleneck in having the QueueConsumer thread ack the messages, and we
thought that having a spare prefetch would allow it to happen earlier. In
fact this was bogus and the fix only really solved the performance problem
accidentally (more details in https://github.com/nameko/nameko/pull/429).

Having the prefetch_count equal to max_workers + 1 made it much easier to
trigger the deadlock described above. In fact, all you needed to do was
make more than max workers requests to a single AMQP entrypoint that also
made RPC requests. This obviously caused a few people to start noticing
deadlocks, which altered us to the underlying problem.

An additional twist to the failure modes was the introduction of AMQP
consumer heartbeats in Nameko 2.4.4. As well as being responsible ack’ing
messages, the QueueConsumer thread is also responsible for sending
heartbeats. When an entrypoint blocked on the worker pool, the heartbeat
would stop. If it was stopped for long enough, the RabbitMQ broker would
tear down the connection from the other end. This resulted in a lot of “Rpc
disconnected waiting for reply‚ÄĚ errors from RpcProxies, and queues in
Rabbit that appeared to have temporarily ‚Äúlost‚ÄĚ their consumers. Since
prefetch count applies to every consumer, whereas the worker pool is
shared, you could see these errors even if the service wasn’t actually
deadlocked, just overloaded, and it would recover again afterwards. Again,
the gist at


demonstrates all of these failure cases.

Nameko 2.5.4 and later:

Happily, a very simple change to NOT block the queue consumer on the worker
pool solves all of these problems. We rolled this out as Nameko 2.5.4. In
addition, messages are now ack’d from the main thread, rather then
delegating it to the QueueConsumer. This fully addresses the performance
problem we were trying to fix in 2.5.3.

···

On Wednesday, May 31, 2017 at 5:11:40 PM UTC+1, ma...@clearmetal.com wrote:

Thanks Matt and David for your quick reply.
I upgraded to nameko 2.6.0 and it fixes the deadlock as you mentioned.

I am still not sure to understand the relation between max_workers and
prefetch_count. The Max workers seems to set the size of the GreenPool at
the service level (used to process all the service entrypoints). The
prefetch_count seems to be applied at the channel (almost connection)
level. In the nameko implementation, a specific container service has
multiples event_handler or rpc entrypoints which creates multiples queues
on rabbitmq side and multiples channel consequently. Those 2 metrics does
not seem to be conceptually at different level...

Also, from what I understand, the prefetch_count essentially allow a
performance improvement. Consequently, it should not be linked to any
deadlock issue, simply speed.

Please lemme know if I got something wrong.

PS: concerning the implementation of our own runner, it essentially comes
from being able to merge variables from our settings.py and nameko.yml. In
this example, there is no need for it.

On Tuesday, May 30, 2017 at 6:43:16 AM UTC-7, Matt Yule-Bennett wrote:

Hi Marco,

Are you using Nameko 2.5.3 here? If so, please upgrade. That version has
some problems with deadlock when the prefetch count is very low.

I can reproduce your deadlock on 2.5.3, but not on 2.6.0. The interaction
of the prefetch and worker count is complex in Nameko<2.5.4. You need
available prefetch count to consume acks, and available workers to send
them. Increasing one or the other can sometimes unstick a stuck cluster,
but it depends very much on the test-case and the timing. Your "fix" of
disabling message acknowledgements is probably only disrupting the timing
sufficiently to avoid a race-condition, rather than fixing the problem.

Fortunately everything is much simpler from Nameko 2.5.4 onwards and you
should not experience any of these deadlocks.

Finally, I'm not sure why you've implemented your own runner, but the
approach you've used is exactly the way I would recommend: the "run"
function and "import_services" from nameko.cli.run do the heavy-lifting.
The "eventlet.monkey_patch" that David mentioned is actually included when
you import that module.

Matt.

On Saturday, May 27, 2017 at 9:09:28 AM UTC+1, David Szotten wrote:

Hi Marco,

How are you running your services? calling `python deadlock.py` i get an
error about missing amqp url, and with `nameko run deadlock` things seem to
work fine. how come you are using your own runner? if you do, note that you
(at least the sample code) are missing the call to `eventlet.monkey_patch`
which needs to happen at the top of your module (before other imports)

Best,
David

On Friday, 26 May 2017 23:41:14 UTC+1, ma...@clearmetal.com wrote:

Hi all,

I have been experiencing a series of unexpected Deadlock in Production.
I spent some times working on recreating those deadlock locally and trying
to fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors...
If anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell
>>n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to
avoid any deadlock by overriding the RcpProxy use_confirms property to
False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self
.reply_listener)

Thanks Matt for taking the time to put this long response. It is really
really insightful about what is happening behind the scene.

···

On Thursday, June 1, 2017 at 3:52:01 AM UTC-7, Matt Yule-Bennett wrote:

Glad everything is fixed now.

At the risk of airing our dirty laundry, I’ll try to explain the previous
problems with deadlocking.

Background:

Deadlocks were always related to AMQP-based entrypoints (@rpc,
@event_handler and @consume). All messages are consumed from RabbitMQ using
the `QueueConsumer` SharedExtension. It knows which queues should be
consumed from, maintains a single connection and channel, and creates a
kombu Consumer to receive messages, which are then distributed back out to
the various ‚Äúproviders‚ÄĚ that have delegated consumption to it.

This is not a great design (it would be better to have a channel per
consumer and a shared connection), but it’s how things currently work.

The prefetch_count value is equal to the size of the worker pool
(max_workers), which defaults to 10. This is also not very clever, because
they’re two distinct concepts and should be managed separately, but it
works OK in a limited set of scenarios. Since RabbitMQ 3.3.0 the prefetch
count is applied to the consumers, meaning it is possible to have more
unack’d messages than available workers.

Prior to Nameko 2.5.3:

Since the very beginning there has been a nasty bug that could cause
deadlocks. The root cause was the fact that the AMQP entrypoints blocked on
the worker pool ‚ÄĒ so when a message arrives, the entrypoint would wait for
a worker to be free until it returned control back to the QueueConsumer.
Messages are only ack’d after the entrypoint has completed (after the
worker thread has exited) and it was the QueueConsumer thread’s
responsibility to send them. So while the entrypoint was blocking for the
worker pool, no messages could be ack’d. If the prefetch count was
exhausted at that point, no more messages would be delivered, and if the
currently running workers were making RPC calls to other services, they
would wait forever for their RPC replies (also AMQP messages) to be
delivered. Since we ack after the entrypoint has completed, these hanging
workers consumed all the available prefetch.

This long-standing bug went undetected because you’d have to execute 10
concurrent workers across multiple AMQP entrypoints, that also made RPC
calls, before it deadlocked. The gist at
https://gist.github.com/mattbennett/1836aa08c4e640269e36ad6986df66a8
demonstrates all of these failure cases.

In Nameko 2.5.3:

We (hastily) increased the prefetch count to be the size of the worker
pool + 1. This was done because we identified that there was a performance
bottleneck in having the QueueConsumer thread ack the messages, and we
thought that having a spare prefetch would allow it to happen earlier. In
fact this was bogus and the fix only really solved the performance problem
accidentally (more details in https://github.com/nameko/nameko/pull/429).

Having the prefetch_count equal to max_workers + 1 made it much easier to
trigger the deadlock described above. In fact, all you needed to do was
make more than max workers requests to a single AMQP entrypoint that also
made RPC requests. This obviously caused a few people to start noticing
deadlocks, which altered us to the underlying problem.

An additional twist to the failure modes was the introduction of AMQP
consumer heartbeats in Nameko 2.4.4. As well as being responsible ack’ing
messages, the QueueConsumer thread is also responsible for sending
heartbeats. When an entrypoint blocked on the worker pool, the heartbeat
would stop. If it was stopped for long enough, the RabbitMQ broker would
tear down the connection from the other end. This resulted in a lot of “Rpc
disconnected waiting for reply‚ÄĚ errors from RpcProxies, and queues in
Rabbit that appeared to have temporarily ‚Äúlost‚ÄĚ their consumers. Since
prefetch count applies to every consumer, whereas the worker pool is
shared, you could see these errors even if the service wasn’t actually
deadlocked, just overloaded, and it would recover again afterwards. Again,
the gist at
https://gist.github.com/mattbennett/1836aa08c4e640269e36ad6986df66a8
demonstrates all of these failure cases.

Nameko 2.5.4 and later:

Happily, a very simple change to NOT block the queue consumer on the
worker pool solves all of these problems. We rolled this out as Nameko
2.5.4. In addition, messages are now ack’d from the main thread, rather
then delegating it to the QueueConsumer. This fully addresses the
performance problem we were trying to fix in 2.5.3.

On Wednesday, May 31, 2017 at 5:11:40 PM UTC+1, ma...@clearmetal.com > wrote:

Thanks Matt and David for your quick reply.
I upgraded to nameko 2.6.0 and it fixes the deadlock as you mentioned.

I am still not sure to understand the relation between max_workers and
prefetch_count. The Max workers seems to set the size of the GreenPool at
the service level (used to process all the service entrypoints). The
prefetch_count seems to be applied at the channel (almost connection)
level. In the nameko implementation, a specific container service has
multiples event_handler or rpc entrypoints which creates multiples queues
on rabbitmq side and multiples channel consequently. Those 2 metrics does
not seem to be conceptually at different level...

Also, from what I understand, the prefetch_count essentially allow a
performance improvement. Consequently, it should not be linked to any
deadlock issue, simply speed.

Please lemme know if I got something wrong.

PS: concerning the implementation of our own runner, it essentially comes
from being able to merge variables from our settings.py and nameko.yml. In
this example, there is no need for it.

On Tuesday, May 30, 2017 at 6:43:16 AM UTC-7, Matt Yule-Bennett wrote:

Hi Marco,

Are you using Nameko 2.5.3 here? If so, please upgrade. That version has
some problems with deadlock when the prefetch count is very low.

I can reproduce your deadlock on 2.5.3, but not on 2.6.0. The
interaction of the prefetch and worker count is complex in Nameko<2.5.4.
You need available prefetch count to consume acks, and available workers to
send them. Increasing one or the other can sometimes unstick a stuck
cluster, but it depends very much on the test-case and the timing. Your
"fix" of disabling message acknowledgements is probably only disrupting the
timing sufficiently to avoid a race-condition, rather than fixing the
problem.

Fortunately everything is much simpler from Nameko 2.5.4 onwards and you
should not experience any of these deadlocks.

Finally, I'm not sure why you've implemented your own runner, but the
approach you've used is exactly the way I would recommend: the "run"
function and "import_services" from nameko.cli.run do the heavy-lifting.
The "eventlet.monkey_patch" that David mentioned is actually included when
you import that module.

Matt.

On Saturday, May 27, 2017 at 9:09:28 AM UTC+1, David Szotten wrote:

Hi Marco,

How are you running your services? calling `python deadlock.py` i get
an error about missing amqp url, and with `nameko run deadlock` things seem
to work fine. how come you are using your own runner? if you do, note that
you (at least the sample code) are missing the call to
`eventlet.monkey_patch` which needs to happen at the top of your module
(before other imports)

Best,
David

On Friday, 26 May 2017 23:41:14 UTC+1, ma...@clearmetal.com wrote:

Hi all,

I have been experiencing a series of unexpected Deadlock in
Production. I spent some times working on recreating those deadlock locally
and trying to fix them. I would like to share my findings.

I am still not really sure to fully understand all those behaviors...
If anyone had some feedback or tried similar experiences, I would really
interested in learning from them...

DeadLock situation: in file deadlock.py

import time
from logging.config import dictConfig

from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys

logger = logging.getLogger(__name__)

class ServiceA(object):

    name = 'service_a'
    service_b = RpcProxy('service_b')
    dispatch = EventDispatcher()

    @rpc
    def analyze(self, n):
        logger.info('service_a test log')
        for i in range(n):
            logger.info(str(i))
            payload = {
                'ids': [i]
            }
            self.dispatch('process_ids', payload)

        return 'Done'

    @event_handler("service_a", "process_ids")
    def process_ids(self, payload):

        ids = payload['ids']
        logger.info('processing ids %s' % ids)
        processed_ids = self.service_b.process(ids)

        logger.info('processed_ids %s' % ids)
        # Simulating some post-process here
        time.sleep(4)

        logger.info('saving processed_ids %s' % processed_ids)

        # saving processed_ids
        saved_processed_ids = self.service_b.saving(processed_ids)
        logger.info('Done saving processed_ids - %s')

class ServiceB(object):

    name = 'service_b'

    @rpc
    def process(self, container_ids):
        logger.info('service_b processing ids %s' % container_ids)
        time.sleep(.5)
        logger.info('service_b container_ids - finish sleeping %s' % container_ids)

        return ['a', 'b']

    @rpc
    def saving(self, processed_ids):
        logger.info('service_b saving %s' % processed_ids)

        time.sleep(.5)
        logger.info('service_b returning processed_ids %s' % processed_ids)

        return True

if __name__ == "__main__":
    
    logging.basicConfig()
    services = []

    registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
    for idx, path in enumerate(registered_services):
        services.extend(
            import_service(path)
        )
    logger = logging.getLogger(__name__)

    logger.info('Services Starting')
    config = {'max_workers': 2,}
    run(services, config, backdoor_port=5050)

Then I trigger it on a separated shell
nameko shell
>>n.rpc.service_a.analyze(2)

This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...

*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...

import socket

from nameko.rpc import Rpc as NamekoRpc

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 100

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
    queue_consumer = QueueConsumer()

    @property
    def broadcast_identifier(self):
        return socket.gethostname()

event_handler = EventHandler.decorator

*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...

from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool

class CustomServiceContainer(ServiceContainer):

    def __init__(self, service_cls, config, worker_ctx_cls=None):
        super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
        self.worker_pools = self.get_workers_pool()
        
        # Update the number of max workers as it is used by to dynamically compute prefetch_count

        self.max_workers = len(self.worker_pools) * config.get('max_workers')

    def spawn_worker(self, entrypoint, args, kwargs,
                     context_data=None, handle_result=None):
        """ Spawn a worker thread for running the service method decorated
        by `entrypoint`.

        ``args`` and ``kwargs`` are used as parameters for the service method.

        ``context_data`` is used to initialize a ``WorkerContext``.

        ``handle_result`` is an optional function which may be passed
        in by the entrypoint. It is called with the result returned
        or error raised by the service method. If provided it must return a
        value for ``result`` and ``exc_info`` to propagate to dependencies;
        these may be different to those returned by the service method.
        """
                                                  context_data=context_data, handle_result=handle_result)
        if self._being_killed:
            _log.info("Worker spawn prevented due to being killed")
            raise ContainerBeingKilled()

        service = self.service_cls()

        worker_ctx = self.worker_ctx_cls(
            self, service, entrypoint, args, kwargs, data=context_data)

        _log.debug('spawning %s', worker_ctx)

        # REMOVE ANY NOTION WORKER

        gt = self.get_worker_pool(worker_ctx).spawn(
            self._run_worker, worker_ctx, handle_result
        )
        gt.link(self._handle_worker_thread_exited, worker_ctx)

        self._worker_threads[worker_ctx] = gt
        return worker_ctx

    def get_worker_pool(self, worker_ctx):
        return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]

    def get_workers_pool(self):
        pools = {}
        for entrypoint in self.entrypoints:
            pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
        return pools

    def _get_entrypoint_pool_key(self, entrypoint):
        return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)

        # Per entrypoint type

        # return '%s' % (entrypoint.__class__.__name__)

*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to
avoid any deadlock by overriding the RcpProxy use_confirms property to
False.

from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class MethodProxy(NamekoMethodProxy):

    @property
    def use_confirms(self):
        return False

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

    def __getattr__(self, name):
        return MethodProxy(
            self.worker_ctx, self.service_name, name, self
.reply_listener)