EventDispatcher Performance

Hi all,

I am currently creating some event-driven microservices with nameko. I have
one service which dispatches a lot of events per worker (a loop with around
30000 items). However, the event dispatching per worker feels quite slow
(22/s) and therefore the execution takes a lot of time.
Now I wonder if I am doing something wrong or if this is just normal.

Can anyone share their experiences?

20 msg/sec is slow. Message publishing performance depends on a few factors
and tradeoffs, and the Nameko defaults choose the safest (and slowest)

Probably the biggest factor is latency between your service and the
RabbitMQ broker. On my laptop with a local RabbitMQ, this simple service
gives me ~200 msg/sec:

from nameko.events import EventDispatcher

from nameko.rpc import rpc

class Service:
    name = "dispatcher"

    dispatch = EventDispatcher()

    def go(self):
        for _ in range(30000):
            self.dispatch("eventtype", "payload")

It drops to less than 10 msg/sec if I use a (free, slow) cloud-hosted
RabbitMQ a few hundred milliseconds away.

The impact of the latency is partly because *publish confirms* are enabled.
You can disable them:

    dispatch = EventDispatcher(use_confirms=False)

This bumps my local delivery to to ~300 msg/sec, with my RabbitMQ docker
image and VM pegging a CPU each. The Nameko service isn't working
particularly hard.

You can also disable persistence of the messages to disk when they reach
the broker:

    dispatch = EventDispatcher(persistence="transient")

Transient messages may be helpful if your payloads are large.

If latency is the problem, you can improve performance by dispatching
messages in multiple parallel threads. If I call initiate 10 concurrent RPC
workers going around the loop, I get 10x the msg/sec at the broker.

(nameko) Matts-13-inch-Macbook-Pro:nameko mattbennett$ nameko shell
--broker amqp://mxwsvifk:****@lemur.cloudamqp.com/mxwsvifk
Nameko Python 3.5.3 (default, Jun 30 2017, 18:28:54)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] shell on darwin

for _ in range(10): n.rpc.dispatcher.go.call_async()

<nameko.rpc.RpcReply object at 0x10bf1da58>
<nameko.rpc.RpcReply object at 0x10bf1dc88>
<nameko.rpc.RpcReply object at 0x10bf1d470>
<nameko.rpc.RpcReply object at 0x10bfa0400>
<nameko.rpc.RpcReply object at 0x10bf1d3c8>
<nameko.rpc.RpcReply object at 0x10bfa0518>
<nameko.rpc.RpcReply object at 0x10bfa0f98>

Hope that helps.



On Tuesday, June 26, 2018 at 12:52:25 PM UTC+1, PtrBld wrote:

Hi all,

I am currently creating some event-driven microservices with nameko. I
have one service which dispatches a lot of events per worker (a loop with
around 30000 items). However, the event dispatching per worker feels quite
slow (22/s) and therefore the execution takes a lot of time.
Now I wonder if I am doing something wrong or if this is just normal.

Can anyone share their experiences?

Hi Matt,

thank you very much (!) for your fast and detailed response. I just tried
different things and it looks promising.
I want to share my results here, because someone might find this

First of all I tried the ```use_confirm``` flag.
This indeed gives me about 3 times faster event dispatching.

However, what really makes it a lot faster is processing the large list in
parallel. Thereby I get another 10x faster processing (about 400 to 1000
events per second).
What I did is spliting my large list in buckets of 1000 items each and
execute them in parallel (using x worker threads). I have decided to split
the list and dispatch events to the same service atm. but I might change
this in the future.

Simple example to get the idea:

class Service(object):
    name = "processing_service"
    config = Config()
    dispatch = EventDispatcher(use_confirms=False)
    # more dependencies

    def create_slices(self):
        count = get_large_list().count() # up to 10000000 items
        window_idx = 0
        window_size = self.config["PROCESSING_WINDOW_SIZE"] if
"PROCESSING_WINDOW_SIZE" in self.config else 1000
        for i in range(window_size, count + window_size, window_size):
            self.dispatch(PROCESS_ITEM, {"slice_start": window_idx})
            window_idx = i

    @event_handler(name, PROCESS_ITEM)
    def process_item(self, data):
        start = data["slice_start"]
        window_size = self.config["PROCESSING_WINDOW_SIZE"] if
"PROCESSING_WINDOW_SIZE" in self.config else 1000
        for item in items:
            # do some stuff

Thanks again!