Service deadlock when request volume increases

Hi Devs

We're coming across this situation more and more in existing services and
new services. The deadlock occurs when one service receives more incoming
requests than workers are available for it, and as part of each request the
service entrypoint makes an additional call to another service for data.

E.g.

class FooService:

    spam_rpc = RpcProxy('spam')

    @rpc
    def do_some_foo(self):

                            logger.info('get some spam...')

spam = self.spam_rpc.add_spam()
# we never reach here!

             logger.info('we have spam: "%s"', spam)

If FooService has 10 available workers and 100 requests to `do_some_foo`
come in quick enough, what appears to happen is that the reply queue set up
to handle the response from `spam` service fills up (with 10 messages) but
there are no available workers to handle them. We then also have 10 in
flight messages in FooService, all un-acked. And here we stay!

Is this behaviour expected/understood? Our assumption is that a new green
thread should be made available to handle the `spam` responses outside of
the configured max (10 here).

We're using nameko 2.2.0, RabbitMQ 3.4.3, Python 2.7

We can overcome our immediate issues with design improvements, and indeed
they will probably prove to be the correct way forward, and we understand
we can increase the number of workers available, but are concerns are about
the implementation nameko uses in this situation and how we should design
services with this in mind.

Thanks for your advice.

1 Like

*update*
The example I gave was not actually are exact use case.
All of the entrypoints in question for us are *event handlers* which
subsequently make an RPC call.

    class FooService:

        spam_rpc = RpcProxy('spam')

        @event_handler
        def handle_foo(self, payload):
        logger.info('get some spam...')
    spam = self.spam_rpc.add_spam()
            # we never reach here!
            logger.info('we have spam: "%s"', spam)

Thanks

···

On Thursday, 25 August 2016 10:58:31 UTC+1, simon harrison wrote:

Hi Devs

We're coming across this situation more and more in existing services and
new services. The deadlock occurs when one service receives more incoming
requests than workers are available for it, and as part of each request the
service entrypoint makes an additional call to another service for data.

E.g.

class FooService:

    spam_rpc = RpcProxy('spam')

    @rpc
    def do_some_foo(self):

                            logger.info('get some spam...')

spam = self.spam_rpc.add_spam()
# we never reach here!

             logger.info('we have spam: "%s"', spam)

If FooService has 10 available workers and 100 requests to `do_some_foo`
come in quick enough, what appears to happen is that the reply queue set up
to handle the response from `spam` service fills up (with 10 messages) but
there are no available workers to handle them. We then also have 10 in
flight messages in FooService, all un-acked. And here we stay!

Is this behaviour expected/understood? Our assumption is that a new green
thread should be made available to handle the `spam` responses outside of
the configured max (10 here).

We're using nameko 2.2.0, RabbitMQ 3.4.3, Python 2.7

We can overcome our immediate issues with design improvements, and indeed
they will probably prove to be the correct way forward, and we understand
we can increase the number of workers available, but are concerns are about
the implementation nameko uses in this situation and how we should design
services with this in mind.

Thanks for your advice.

Hi,

It's been a while since i looked at this in detail, so don't recall all the
specifics (matt may recall better), but i think there is a known issue due
to the way consumption from rabbit is set up, and how that interacts with
the qos settings and max_workers. this is one of the main things we are
looking to address by rewriting the amqp internals

d

···

On Thursday, 25 August 2016 14:45:40 UTC+1, simon harrison wrote:

*update*
The example I gave was not actually are exact use case.
All of the entrypoints in question for us are *event handlers* which
subsequently make an RPC call.

    class FooService:

        spam_rpc = RpcProxy('spam')

        @event_handler
        def handle_foo(self, payload):
        logger.info('get some spam...')
    spam = self.spam_rpc.add_spam()
            # we never reach here!
            logger.info('we have spam: "%s"', spam)

Thanks

On Thursday, 25 August 2016 10:58:31 UTC+1, simon harrison wrote:

Hi Devs

We're coming across this situation more and more in existing services and
new services. The deadlock occurs when one service receives more incoming
requests than workers are available for it, and as part of each request the
service entrypoint makes an additional call to another service for data.

E.g.

class FooService:

    spam_rpc = RpcProxy('spam')

    @rpc
    def do_some_foo(self):

                            logger.info('get some spam...')

spam = self.spam_rpc.add_spam()
# we never reach here!

             logger.info('we have spam: "%s"', spam)

If FooService has 10 available workers and 100 requests to `do_some_foo`
come in quick enough, what appears to happen is that the reply queue set up
to handle the response from `spam` service fills up (with 10 messages) but
there are no available workers to handle them. We then also have 10 in
flight messages in FooService, all un-acked. And here we stay!

Is this behaviour expected/understood? Our assumption is that a new green
thread should be made available to handle the `spam` responses outside of
the configured max (10 here).

We're using nameko 2.2.0, RabbitMQ 3.4.3, Python 2.7

We can overcome our immediate issues with design improvements, and indeed
they will probably prove to be the correct way forward, and we understand
we can increase the number of workers available, but are concerns are about
the implementation nameko uses in this situation and how we should design
services with this in mind.

Thanks for your advice.

Indeed, this is another problem caused by the structure of the
QueueConsumer.

The issue here is that the QoS (aka "prefetch count") is applied once, on
the channel, rather than for each consumer. The EventHandlers and the RPC
ReplyListener delegate control of their consumers to the QueueConsumer,
which means they all share a channel rather than getting their own. The
prefetch count is pooled for all the consumers on the channel.

The deadlock happens when you exhaust the prefetch count handling events.
Each concurrent FooService worker holds an unack'd message and reduces the
prefetch count by one; the message isn't ack'd (and prefetch count
re-incremented) until the worker completes. But to complete, it must
consume the RPC reply from the "spam" service. Since the prefetch count is
already exhausted, they will all wait for somebody else to finish.

There are various ways to work around it, but the ultimate solution is to
have the AMQP extensions manage their own consumers.

Matt.

···

On Friday, 26 August 2016 09:45:23 UTC+1, David Szotten wrote:

Hi,

It's been a while since i looked at this in detail, so don't recall all
the specifics (matt may recall better), but i think there is a known issue
due to the way consumption from rabbit is set up, and how that interacts
with the qos settings and max_workers. this is one of the main things we
are looking to address by rewriting the amqp internals

d

On Thursday, 25 August 2016 14:45:40 UTC+1, simon harrison wrote:

*update*
The example I gave was not actually are exact use case.
All of the entrypoints in question for us are *event handlers* which
subsequently make an RPC call.

    class FooService:

        spam_rpc = RpcProxy('spam')

        @event_handler
        def handle_foo(self, payload):
        logger.info('get some spam...')
    spam = self.spam_rpc.add_spam()
            # we never reach here!
            logger.info('we have spam: "%s"', spam)

Thanks

On Thursday, 25 August 2016 10:58:31 UTC+1, simon harrison wrote:

Hi Devs

We're coming across this situation more and more in existing services
and new services. The deadlock occurs when one service receives more
incoming requests than workers are available for it, and as part of each
request the service entrypoint makes an additional call to another service
for data.

E.g.

class FooService:

    spam_rpc = RpcProxy('spam')

    @rpc
    def do_some_foo(self):

                            logger.info('get some spam...')

spam = self.spam_rpc.add_spam()
# we never reach here!

             logger.info('we have spam: "%s"', spam)

If FooService has 10 available workers and 100 requests to `do_some_foo`
come in quick enough, what appears to happen is that the reply queue set up
to handle the response from `spam` service fills up (with 10 messages) but
there are no available workers to handle them. We then also have 10 in
flight messages in FooService, all un-acked. And here we stay!

Is this behaviour expected/understood? Our assumption is that a new
green thread should be made available to handle the `spam` responses
outside of the configured max (10 here).

We're using nameko 2.2.0, RabbitMQ 3.4.3, Python 2.7

We can overcome our immediate issues with design improvements, and
indeed they will probably prove to be the correct way forward, and we
understand we can increase the number of workers available, but are
concerns are about the implementation nameko uses in this situation and how
we should design services with this in mind.

Thanks for your advice.

Could you give an example of how to work around it?

The workaround I was thinking of was to force the RpcProxy ReplyListener to
use its own QueueConsumer. You can do this this a series of subclasses:

from nameko.rpc import ReplyListener as NamekoReplyListener
from nameko.rpc import RpcProxy as RpcProxy
from nameko.messaging import QueueConsumer as NamekoQueueConsumer

class QueueConsumer(NamekoQueueConsumer):
    @property
    def sharing_key(self):
        # The `sharing_key` is used to determine whether a new instance of a
        # SharedExtension is needed.
        # There will always be one instance for each unique sharing_key
        return "some value"

class ReplyListener(NamekoReplyListener):
    queue_consumer = QueueConsumer()

class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

Nameko 2.5.3 now includes this change
<https://github.com/nameko/nameko/pull/419&gt; though, which should be
sufficient to break the deadlock.

···

On Thursday, March 23, 2017 at 11:11:31 AM UTC, rollo.ko...@complyadvantage.com wrote:

Could you give an example of how to work around it?

1 Like

Adding another prefetched message is not actually enough to break the
deadlock. In fact the behaviour doesn't change at all.

Can you post an example? I'd be interested to see if it's exactly the same
behaviour.

Using a dedicated QueueConsumer would be a reliable fix. It will establish
a new connection for the ReplyListener consumers, which will have their own
prefetch count.

···

On Thursday, March 23, 2017 at 2:57:58 PM UTC, rollo.ko...@complyadvantage.com wrote:

Adding another prefetched message is not actually enough to break the
deadlock. In fact the behaviour doesn't change at all.