EventDispatcher Performance

Hi all,

I am currently creating some event-driven microservices with nameko. I have
one service which dispatches a lot of events per worker (a loop with around
30000 items). However, the event dispatching per worker feels quite slow
(22/s) and therefore the execution takes a lot of time.
Now I wonder if I am doing something wrong or if this is just normal.

Can anyone share their experiences?
Thanks!

20 msg/sec is slow. Message publishing performance depends on a few factors
and tradeoffs, and the Nameko defaults choose the safest (and slowest)
combinations.

Probably the biggest factor is latency between your service and the
RabbitMQ broker. On my laptop with a local RabbitMQ, this simple service
gives me ~200 msg/sec:

from nameko.events import EventDispatcher

from nameko.rpc import rpc

class Service:
    name = "dispatcher"

    dispatch = EventDispatcher()

    @rpc
    def go(self):
        for _ in range(30000):
            self.dispatch("eventtype", "payload")

It drops to less than 10 msg/sec if I use a (free, slow) cloud-hosted
RabbitMQ a few hundred milliseconds away.

The impact of the latency is partly because *publish confirms* are enabled.
You can disable them:

    ...
    dispatch = EventDispatcher(use_confirms=False)
    ...

This bumps my local delivery to to ~300 msg/sec, with my RabbitMQ docker
image and VM pegging a CPU each. The Nameko service isn't working
particularly hard.

You can also disable persistence of the messages to disk when they reach
the broker:

    ...
    dispatch = EventDispatcher(persistence="transient")
    ...

Transient messages may be helpful if your payloads are large.

If latency is the problem, you can improve performance by dispatching
messages in multiple parallel threads. If I call initiate 10 concurrent RPC
workers going around the loop, I get 10x the msg/sec at the broker.

(nameko) Matts-13-inch-Macbook-Pro:nameko mattbennett$ nameko shell
--broker amqp://mxwsvifk:****@lemur.cloudamqp.com/mxwsvifk
Nameko Python 3.5.3 (default, Jun 30 2017, 18:28:54)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] shell on darwin
Broker:amqp://mxwsvifk:****@lemur.cloudamqp.com/mxwsvifk

for _ in range(10): n.rpc.dispatcher.go.call_async()

...
<nameko.rpc.RpcReply object at 0x10bf1da58>
<nameko.rpc.RpcReply object at 0x10bf1dc88>
<nameko.rpc.RpcReply object at 0x10bf1d470>
<nameko.rpc.RpcReply object at 0x10bfa0400>
<nameko.rpc.RpcReply object at 0x10bf1d3c8>
<nameko.rpc.RpcReply object at 0x10bfa0518>
<nameko.rpc.RpcReply object at 0x10bfa0f98>
...

Hope that helps.

Matt.

···

On Tuesday, June 26, 2018 at 12:52:25 PM UTC+1, PtrBld wrote:

Hi all,

I am currently creating some event-driven microservices with nameko. I
have one service which dispatches a lot of events per worker (a loop with
around 30000 items). However, the event dispatching per worker feels quite
slow (22/s) and therefore the execution takes a lot of time.
Now I wonder if I am doing something wrong or if this is just normal.

Can anyone share their experiences?
Thanks!

Hi Matt,

thank you very much (!) for your fast and detailed response. I just tried
different things and it looks promising.
I want to share my results here, because someone might find this
interesting.

First of all I tried the ```use_confirm``` flag.
This indeed gives me about 3 times faster event dispatching.

However, what really makes it a lot faster is processing the large list in
parallel. Thereby I get another 10x faster processing (about 400 to 1000
events per second).
What I did is spliting my large list in buckets of 1000 items each and
execute them in parallel (using x worker threads). I have decided to split
the list and dispatch events to the same service atm. but I might change
this in the future.

Simple example to get the idea:

class Service(object):
    name = "processing_service"
    config = Config()
    dispatch = EventDispatcher(use_confirms=False)
    # more dependencies

      @rpc
    def create_slices(self):
        count = get_large_list().count() # up to 10000000 items
        window_idx = 0
        window_size = self.config["PROCESSING_WINDOW_SIZE"] if
"PROCESSING_WINDOW_SIZE" in self.config else 1000
        for i in range(window_size, count + window_size, window_size):
            self.dispatch(PROCESS_ITEM, {"slice_start": window_idx})
            window_idx = i

    @event_handler(name, PROCESS_ITEM)
    def process_item(self, data):
        start = data["slice_start"]
        window_size = self.config["PROCESSING_WINDOW_SIZE"] if
"PROCESSING_WINDOW_SIZE" in self.config else 1000
                get_large_list().skip(start).limit(window_size)
        for item in items:
            # do some stuff
            self.dispatch(ACTUAL_EVENT,item)

Thanks again!