Making the reply queues non-durable

Hi,
The durable nature of the queues created by nameko, specifically the reply
queues, cause high IO on our rabbitmq server.
We want to make SOME of the reply queues (for some services) non durable,
to reduce IO.

How can this be accomplished?
T.

Anyway, I've been playing a little with the code, and ended up with the
following monkey-patch:

from nameko import rpc

def _setup(self):
    service_uuid = rpc.uuid.uuid4() # TODO: give srv_ctx a uuid?
    service_name = self.container.service_name

    queue_name = rpc.RPC_REPLY_QUEUE_TEMPLATE.format(
        service_name, service_uuid)

    self.routing_key = str(service_uuid)

    exchange = rpc.get_rpc_exchange(self.container.config)

    self.queue = rpc.Queue(
        queue_name,
        exchange=exchange,
        routing_key=self.routing_key,
        auto_delete=True,
        exclusive=True,
        durable=False # <-- the patch
    )

    self.queue_consumer.register_provider(self)

rpc.ReplyListener.setup = _setup

basically what I did is patch the setup() method of the ReplyListener
class, adding "durable=False" to the queue declaration.

This works (I can see that the reply queues are not durable), but I'm
wondering if anything can break because of this.

Note that this patch is only used on clients connecting to nameko using
ClusterRpcProxy.

Any comment would be appreciated.

T.

···

On Thursday, October 20, 2016 at 9:26:31 AM UTC+3, tsachi...@gmail.com wrote:

Hi,
The durable nature of the queues created by nameko, specifically the reply
queues, cause high IO on our rabbitmq server.
We want to make SOME of the reply queues (for some services) non durable,
to reduce IO.

How can this be accomplished?
T.

Any help?

The patch you've applied here will do what you want with minimal negative
implications.

Having said that, I think it's preferable to use subclasses to do this kind
of override. The following would achieve the same thing as your patch:

# app/dependencies.py
from nameko.rpc import ReplyListener as NamekoReplyListener
from nameko.rpc import RpcProxy as NamekoRpcProxy

# override to create a non-durable queue
class ReplyListener(NamekoReplyListener):

    def setup(self):

        service_uuid = uuid.uuid4()
        service_name = self.container.service_name

        queue_name = RPC_REPLY_QUEUE_TEMPLATE.format(
            service_name, service_uuid)

        self.routing_key = str(service_uuid)

        exchange = get_rpc_exchange(self.container.config)

        self.queue = Queue(
            queue_name,
            exchange=exchange,
            routing_key=self.routing_key,
            auto_delete=True,
            exclusive=True,
            durable=False # the override
        )

        self.queue_consumer.register_provider(self)

# override to use custom ReplyListener
class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

#app/service.py

from app.dependencies import RpcProxy

class Service:
    name = "nondurable"

    rpc_proxy = RpcProxy() # your subclass
    
    ...

Although this is our preferred pattern we aren't great at making it easy to
do. This situation would be much better if we exposed "durable"` or
"queue_params" as an attribute of the ReplyListener class. Then you could
do simply:

# app/dependencies.py
from nameko.rpc import ReplyListener as NamekoReplyListener
from nameko.rpc import RpcProxy as NamekoRpcProxy

# override to specify a non-durable queue
class ReplyListener(NamekoReplyListener):
    queue_params = {
        'durable': False
    }

# override to use custom ReplyListener
class RpcProxy(NamekoRpcProxy):
    rpc_reply_listener = ReplyListener()

I have started to introduce this pattern as part of a recent PR (see
https://github.com/onefinestay/nameko/pull/337/files#diff-f6430d9814b008d760592d1baedbed66R379\)

···

On Friday, October 21, 2016 at 2:19:12 PM UTC+1, tsachi...@gmail.com wrote:

Any help?

Many thanks.
I'll give it a shot.

T.

If we take this discussion one step further. Your problem is high I/O on
the rabbitMQ server.

Durability in general I would consider something that you want. How much
I/O is high? There might be a solution in the rabbitMQ side of things
instead of disabling the durability. But I might be ignoring something
obvious?

Kindest

···

Den fredag 21 oktober 2016 kl. 17:10:24 UTC+2 skrev tsachi...@gmail.com:

Many thanks.
I'll give it a shot.

T.

Hi.
If we run vmstat on the rabbit machine, we see the "wa" values (cpu waiting for io) at around 60-70.
Considering a message rate of circa 100 messages/sec, these are some very bad numbers.
We have around 100+ clients connecting the main service every second, thus creating 100+ durable queues every second.
Raise the number of clients to 1000, and we are in a bad situation.
Making the queues non-durable reduce the io but at least 50% (tested on our qa env).

T.

Without knowing anything regarding server specs or rabbitmq setup this
sound awfully high to me, and a bit strange :slight_smile:

Though this is some concern relates to a nameko implementation the problem
lies elsewhere. Even with message persitance 1000msg/sec shouldnt pose any
problem.

The following post might be interesting to give some sort of relative
performance measurement:

https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

···

Den måndag 24 oktober 2016 kl. 12:01:22 UTC+2 skrev tsachi...@gmail.com:

Hi.
If we run vmstat on the rabbit machine, we see the "wa" values (cpu
waiting for io) at around 60-70.
Considering a message rate of circa 100 messages/sec, these are some very
bad numbers.
We have around 100+ clients connecting the main service every second, thus
creating 100+ durable queues every second.
Raise the number of clients to 1000, and we are in a bad situation.
Making the queues non-durable reduce the io but at least 50% (tested on
our qa env).

T.

Hi,
I'm aware of the link about the measurements, but note that they were taken with one producer and one consumer on a single queue.
The nameko framework creates a queue for every client for the reply, so 100 parallel clients will create 100 reply queues, all persistent. Since our clients connect every second, this creates and destroys 100 queues every second on the disc.
This is a lot of io, as we saw in the server stats.
Maybe we can tune the machine better, but even out of the box, there should be no reason for high io, unless creating/destroying hundreds of durable queues every second IS very io intensive.

Hi,

Are these the same clients that are re-connecting? I guess they're
non-nameko? Such clients probably wants to keep a pool of proxyes, to
re-use the reply queue. See other discussions on this list, and
e.g. GitHub - jessepollak/flask-nameko: A wrapper for using nameko services with Flask

Best,
David

···

On Monday, 24 October 2016 12:53:04 UTC+1, tsachi...@gmail.com wrote:

Hi,
I'm aware of the link about the measurements, but note that they were
taken with one producer and one consumer on a single queue.
The nameko framework creates a queue for every client for the reply, so
100 parallel clients will create 100 reply queues, all persistent. Since
our clients connect every second, this creates and destroys 100 queues
every second on the disc.
This is a lot of io, as we saw in the server stats.
Maybe we can tune the machine better, but even out of the box, there
should be no reason for high io, unless creating/destroying hundreds of
durable queues every second IS very io intensive.

These are the same clients, using the ClustetRpcProxy class.
They connect, get a reply, abd disconnect.
If there's a way keep the reply queues alive, it would be great.
How can I accomplish that?

I don't think turning off durability will give you a huge performance
boost. Most of the cost here is in connecting and creating queues.

As David suggests, you should try to pool the proxies to re-use connections
and reply queues. There are two existing implementations that do something
like this (GitHub - jessepollak/flask-nameko: A wrapper for using nameko services with Flask and
https://github.com/and3rson/django-nameko\). Try one of these (or something
like them) and you'll see a dramatic improvement.

···

On Monday, October 24, 2016 at 1:06:18 PM UTC+1, tsachi...@gmail.com wrote:

These are the same clients, using the ClustetRpcProxy class.
They connect, get a reply, abd disconnect.
If there's a way keep the reply queues alive, it would be great.
How can I accomplish that?

What kind of project(s) are these clients embedded in? There is some
initial work in https://github.com/onefinestay/nameko/pull/357\. Pr also
includes a few references e.g. the flask one i linked above

D

···

On Monday, 24 October 2016 13:06:18 UTC+1, tsachi...@gmail.com wrote:

These are the same clients, using the ClustetRpcProxy class.
They connect, get a reply, abd disconnect.
If there's a way keep the reply queues alive, it would be great.
How can I accomplish that?

Here's an overview of our setup:
There are clients (not browsers) that need to pull data from rabbit. The clients, for design reasons, don't have access to rabbit server.
So they send a signal to a central process along with their id.
The central process uses ClusterRpcProxy to connect to to nameko service and fetch the data in behalf of the client.
The central service then returns the data to the client.

That's a single cycle. This happens once a second for every client, with over 100 clients.

Ok. I guess the question becomes: how does the "central process" work? i
guess that's not a nameko service? how do the clients communicate with the
central process? (the aim is to figure out if/how the central process could
re-use the same clusterrpcproxy)

···

On Monday, 24 October 2016 14:02:54 UTC+1, tsachi...@gmail.com wrote:

Here's an overview of our setup:
There are clients (not browsers) that need to pull data from rabbit. The
clients, for design reasons, don't have access to rabbit server.
So they send a signal to a central process along with their id.
The central process uses ClusterRpcProxy to connect to to nameko service
and fetch the data in behalf of the client.
The central service then returns the data to the client.

That's a single cycle. This happens once a second for every client, with
over 100 clients.

The central service is a simple webserver that receives signals as http requests.
As you guessed, it's not a nameko service. It connects to a nameko service using a ClusterRpcProxy class.

I took a look at the nameko-django implementation, and we're actually using a very similar pool (it stores ClusterRpcProxy instances and re-uses them) but still io is high.
Are instances of ClusterRpcProxy bound to a dingle reply queue?

would you consider making it a nameko service (using nameko http)? then
you'd be able to use a regular rpc proxy and get the queue re-use for free?
if not, are you using any web framework? you'd have to look at the
approaches in the links above

d

···

On Monday, 24 October 2016 14:11:33 UTC+1, tsachi...@gmail.com wrote:

The central service is a simple webserver that receives signals as http
requests.
As you guessed, it's not a nameko service. It connects to a nameko service
using a ClusterRpcProxy class.

I took a look at the nameko-django implementation, and we're actually
using a very similar pool (it stores ClusterRpcProxy instances and re-uses
them) but still io is high.
Are instances of ClusterRpcProxy bound to a dingle reply queue?

We don't want to do major changes to our webserver. It does other things as well, so we don't want to rewrite everything as nameko services.

As I said, we are using a pool for ClusterRpcProxy, similar to the nameko-django implementation, but io is still high.
Are these instances bound to a single reply queue?
If we re-use them, no new queues are created?

That's correct. The cluster proxy creates a reply queue when you enter the
context manager, which is pretty bad from a performance point of view. some
of the approaches linked include ideas for how to re-use, which should give
you a pretty big perf win

d

···

On Monday, 24 October 2016 14:26:52 UTC+1, tsachi...@gmail.com wrote:

We don't want to do major changes to our webserver. It does other things
as well, so we don't want to rewrite everything as nameko services.

As I said, we are using a pool for ClusterRpcProxy, similar to the
nameko-django implementation, but io is still high.
Are these instances bound to a single reply queue?
If we re-use them, no new queues are created?