Hi all,
I have been experiencing a series of unexpected Deadlock in Production. I
spent some times working on recreating those deadlock locally and trying to
fix them. I would like to share my findings.
I am still not really sure to fully understand all those behaviors... If
anyone had some feedback or tried similar experiences, I would really
interested in learning from them...
DeadLock situation: in file deadlock.py
import time
from logging.config import dictConfig
from nameko.cli.run import run, import_service
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import RpcProxy, rpc
import logging
import sys
logger = logging.getLogger(__name__)
class ServiceA(object):
name = 'service_a'
service_b = RpcProxy('service_b')
dispatch = EventDispatcher()
@rpc
def analyze(self, n):
logger.info('service_a test log')
for i in range(n):
logger.info(str(i))
payload = {
'ids': [i]
}
self.dispatch('process_ids', payload)
return 'Done'
@event_handler("service_a", "process_ids")
def process_ids(self, payload):
ids = payload['ids']
logger.info('processing ids %s' % ids)
processed_ids = self.service_b.process(ids)
logger.info('processed_ids %s' % ids)
# Simulating some post-process here
time.sleep(4)
logger.info('saving processed_ids %s' % processed_ids)
# saving processed_ids
saved_processed_ids = self.service_b.saving(processed_ids)
logger.info('Done saving processed_ids - %s')
class ServiceB(object):
name = 'service_b'
@rpc
def process(self, container_ids):
logger.info('service_b processing ids %s' % container_ids)
time.sleep(.5)
logger.info('service_b container_ids - finish sleeping %s' % container_ids)
return ['a', 'b']
@rpc
def saving(self, processed_ids):
logger.info('service_b saving %s' % processed_ids)
time.sleep(.5)
logger.info('service_b returning processed_ids %s' % processed_ids)
return True
if __name__ == "__main__":
logging.basicConfig()
services =
registered_services = ['deadlock:ServiceA', 'deadlock:ServiceB']
for idx, path in enumerate(registered_services):
services.extend(
import_service(path)
)
logger = logging.getLogger(__name__)
logger.info('Services Starting')
config = {'max_workers': 2,}
run(services, config, backdoor_port=5050)
Then I trigger it on a separated shell
nameko shell
n.rpc.service_a.analyze(2)
This code generates a Deadlock situation even if, based on my
understanding, it should not. Indeed, entrypoint service_a.analyze requires
one worker that dispatch n=2 events and then the worker is back free to the
GreenPool. Then the EventHandler service_a.process_ids is triggered one per
message so my 2 workers from the service_a are requested. Each of those
workers calls the service_b sequentially but as service_b also have 2
workers, it should not create any deadlock...
*Prefetch_count*
Based on discussions I found online, I tried to manually set up my own
QueueConsumer to increase artificially the prefecth_count. But this did not
solve my deadlock...
import socket
from nameko.rpc import Rpc as NamekoRpc
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 100
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()
@property
def broadcast_identifier(self):
return socket.gethostname()
event_handler = EventHandler.decorator
*Increase Worker Pool Size*
Then, I tried to create an independent pool of worker for each type of
entrypoint [Rpc, Event] and then for each entrypoints overriding the
ContainerService class. This did not work again and I am not sure why...
from nameko.containers import ServiceContainer, _log, WorkerContext
from nameko.exceptions import ContainerBeingKilled
from eventlet.greenpool import GreenPool
class CustomServiceContainer(ServiceContainer):
def __init__(self, service_cls, config, worker_ctx_cls=None):
super(CustomServiceContainer, self).__init__(service_cls, config, worker_ctx_cls=MDCWorkerContext)
self.worker_pools = self.get_workers_pool()
# Update the number of max workers as it is used by to dynamically compute prefetch_count
self.max_workers = len(self.worker_pools) * config.get('max_workers')
def spawn_worker(self, entrypoint, args, kwargs,
context_data=None, handle_result=None):
""" Spawn a worker thread for running the service method decorated
by `entrypoint`.
``args`` and ``kwargs`` are used as parameters for the service method.
``context_data`` is used to initialize a ``WorkerContext``.
``handle_result`` is an optional function which may be passed
in by the entrypoint. It is called with the result returned
or error raised by the service method. If provided it must return a
value for ``result`` and ``exc_info`` to propagate to dependencies;
these may be different to those returned by the service method.
"""
context_data=context_data, handle_result=handle_result)
if self._being_killed:
_log.info("Worker spawn prevented due to being killed")
raise ContainerBeingKilled()
service = self.service_cls()
worker_ctx = self.worker_ctx_cls(
self, service, entrypoint, args, kwargs, data=context_data)
_log.debug('spawning %s', worker_ctx)
# REMOVE ANY NOTION WORKER
gt = self.get_worker_pool(worker_ctx).spawn(
self._run_worker, worker_ctx, handle_result
)
gt.link(self._handle_worker_thread_exited, worker_ctx)
self._worker_threads[worker_ctx] = gt
return worker_ctx
def get_worker_pool(self, worker_ctx):
return self.worker_pools[self._get_entrypoint_pool_key(worker_ctx.entrypoint)]
def get_workers_pool(self):
pools = {}
for entrypoint in self.entrypoints:
pools[self._get_entrypoint_pool_key(entrypoint)] = GreenPool(size=self.max_workers)
return pools
def _get_entrypoint_pool_key(self, entrypoint):
return '%s_%s' % (entrypoint.__class__.__name__, entrypoint.method_name)
# Per entrypoint type
# return '%s' % (entrypoint.__class__.__name__)
*RcpProxy: use_confirms*
Then, I tried to look at the message acknowledgment. I was able to avoid
any deadlock by overriding the RcpProxy use_confirms property to False.
from nameko.messaging import QueueConsumer
from nameko.rpc import MethodProxy as NamekoMethodProxy
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener
class ReplyListener(NamekoReplyListener):
queue_consumer = QueueConsumer()
class MethodProxy(NamekoMethodProxy):
@property
def use_confirms(self):
return False
class RpcProxy(NamekoRpcProxy):
rpc_reply_listener = ReplyListener()
def __getattr__(self, name):
return MethodProxy(
self.worker_ctx, self.service_name, name, self.reply_listener)