I’m trying to write a Kafka Entrypoing that handles hundreds (possibly thousands) of messages per second. However, I’m worried about the performance consequences due to a worker being created for each message. Here’s the code that I got so far:
import kafka
from functools import partial
from marshmallow.validate import ValidationError
from nameko.extensions import Entrypoint
class KafkaCallback:
def on_revoked(self, revoked):
pass
def on_assigned(self, assigned):
pass
class Kafka(Entrypoint, kafka.ConsumerRebalanceListener):
def __init__(self, topics, group_id=None, seek_to_end=False, callback=None, schema=None, **kargs):
super().__init__(**kargs)
# kafka preferences
self.topics = topics
self.group_id = group_id
self.seek_to_end = seek_to_end
# callback
self.on_revoked = callback.on_revoked if isinstance(callback, KafkaCallback) else None
self.on_assigned = callback.on_assigned if isinstance(callback, KafkaCallback) else None
# marshmallow schema
self.schema = schema
# ConsumerRebalanceListener
def on_partitions_revoked(self, revoked):
if callable(self.on_revoked):
self.on_revoked(revoked)
def on_partitions_assigned(self, assigned):
if callable(self.on_assigned):
self.on_assigned(assigned)
if len(assigned) == 0:
print("STOPPING")
self.container.stop()
print("STOPPED")
if self.seek_to_end:
self.consumer.seek_to_end()
# Entrypoint
def setup(self):
self.consumer = kafka.KafkaConsumer(
group_id=self.group_id,
value_deserializer=lambda m: m.decode('utf-8')
)
self.consumer.subscribe(self.topics, listener=self)
def start(self):
self.container.spawn_managed_thread(
self.run, identifier='Kafka.run'
)
def run(self):
for message in self.consumer:
self.handle_payload(message.value)
def handle_payload(self, payload):
handle_result = partial(self.handle_result, payload)
obj = payload
if self.schema is not None:
obj = self.schema.loads(payload)
args = (obj,)
kargs = {}
self.container.spawn_worker(
self, args, kargs, handle_result=handle_result,
)
def handle_result(self, payload, worker_ctx, result, exc_info):
return result, exc_info
subscribe = Kafka.decorator
Is there any way to directly call the decorated function to handle the message without spawning any workers? I know that I could just write something without Nameko if I don’t want to use the workers, in fact, that’s the way I had it before. However, re-implementing this with Nameko semantics is cleaner and more organized.
Note that the KafkaCallback
class is used for dependencies that wish to be notified about Kafka re-balancing (and flush any data before new partitions are assigned).