Entrypoint for real-time data using Kafka?


#1

I’m trying to write a Kafka Entrypoing that handles hundreds (possibly thousands) of messages per second. However, I’m worried about the performance consequences due to a worker being created for each message. Here’s the code that I got so far:

import kafka
from functools import partial
from marshmallow.validate import ValidationError
from nameko.extensions import Entrypoint


class KafkaCallback:
    def on_revoked(self, revoked):
        pass

    def on_assigned(self, assigned):
        pass


class Kafka(Entrypoint, kafka.ConsumerRebalanceListener):
    def __init__(self, topics, group_id=None, seek_to_end=False, callback=None, schema=None, **kargs):
        super().__init__(**kargs)

        # kafka preferences
        self.topics = topics
        self.group_id = group_id
        self.seek_to_end = seek_to_end

        # callback
        self.on_revoked = callback.on_revoked if isinstance(callback, KafkaCallback) else None
        self.on_assigned = callback.on_assigned if isinstance(callback, KafkaCallback) else None

        # marshmallow schema
        self.schema = schema

    # ConsumerRebalanceListener

    def on_partitions_revoked(self, revoked):
        if callable(self.on_revoked):
            self.on_revoked(revoked)

    def on_partitions_assigned(self, assigned):
        if callable(self.on_assigned):
            self.on_assigned(assigned)

        if len(assigned) == 0:
            print("STOPPING")
            self.container.stop()
            print("STOPPED")

        if self.seek_to_end:
            self.consumer.seek_to_end()

    # Entrypoint

    def setup(self):
        self.consumer = kafka.KafkaConsumer(
            group_id=self.group_id,
            value_deserializer=lambda m: m.decode('utf-8')
        )
        self.consumer.subscribe(self.topics, listener=self)

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier='Kafka.run'
        )

    def run(self):
        for message in self.consumer:
            self.handle_payload(message.value)

    def handle_payload(self, payload):
        handle_result = partial(self.handle_result, payload)

        obj = payload
        if self.schema is not None:
            obj = self.schema.loads(payload)

        args = (obj,)
        kargs = {}

        self.container.spawn_worker(
            self, args, kargs, handle_result=handle_result,
        )

    def handle_result(self, payload, worker_ctx, result, exc_info):
        return result, exc_info


subscribe = Kafka.decorator

Is there any way to directly call the decorated function to handle the message without spawning any workers? I know that I could just write something without Nameko if I don’t want to use the workers, in fact, that’s the way I had it before. However, re-implementing this with Nameko semantics is cleaner and more organized.

Note that the KafkaCallback class is used for dependencies that wish to be notified about Kafka re-balancing (and flush any data before new partitions are assigned).


#2

Spawning a worker isn’t a particularly heavy operation. It creates a new greenthread that executes the service method, and this is how Nameko handles multiple requests/messages concurrently. There is a little overhead associated with spawning a worker versus a bare greenthread but that is the cost of hooking into the rest of Nameko (e.g. accessing injected dependencies)

How did your previous implementation handle threading / concurrency?

Your entrypoint looks good – have you tested the performance? Note that the global worker pool will limit the maximum concurrent workers. To change the size of it, use the max_workers config key.


#3

My previous approach didn’t really use concurrency in the same process. I relied on running multiple processes that took care of their own batch of messages from the consumer group.

I ran some simple tests, and the Entrypoint is able to process about 2000 messages per-second, not bad indeed. However, my previous implementation was a little faster (about 2500 per-second), so I guess that’s the overhead present from spawning green-threads.

In some cases I do need to limit to one worker, so thanks for the max_workers tip.

I’ll try to avoid premature optimization, and keep working on the important stuff. I guess it is fast enough as it is right now.

Cheers!


#4

I guess as an experiment you can try batching the messages (spawning a worker per batch), at least to compare performance