I am using nameko.messaging.consume
for consuming messages from a queue. Here’s a sample code -
from kombu import Queue
from nameko.messaging import consume
class Service:
name = "sample_service"
QUEUE = Queue("queue_name", no_declare=True)
@consume(QUEUE)
def process_message(self, payload):
# Some long running code ...
return result
By default behaviour, ACK will be sent to rabbitMQ broker after process_message
function returns a response (Here, statement return result
). I want to send an ACK as soon as consumer consumes the message
. How can I do that?
*In library “pika”, consumer acknowledges as soon as message is consumed. That will be good example what I want to replicate with nameko’s consumer.
Thanks @mattbennett
from nameko.rpc import Rpc as NamekoRpc, get_rpc_exchange, Responder
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_SERIALIZER, SERIALIZER_CONFIG_KEY
)
from nameko.exceptions import (ContainerBeingKilled, MalformedRequest)
from functools import partial
from nameko.events import EventHandler as NamekoEventHandler
class Rpc(NamekoRpc):
def __init__(self, only_once: bool = False, *args, **kwargs):
self.only_once = only_once
super(Rpc, self).__init__(
*args, **kwargs
)
def handle_message(self, body, message):
if self.only_once is True:
# Consume only once, automatically ack after getting the message
self.rpc_consumer.queue_consumer.ack_message(message)
try:
args = body['args']
kwargs = body['kwargs']
except KeyError:
raise MalformedRequest('Message missing `args` or `kwargs`')
self.check_signature(args, kwargs)
context_data = self.unpack_message_headers(message)
handle_result = partial(self.handle_result, message)
try:
self.container.spawn_worker(self, args, kwargs,
context_data=context_data,
handle_result=handle_result)
except ContainerBeingKilled:
self.rpc_consumer.requeue_message(message)
def handle_result(self, message, worker_ctx, result, exc_info):
amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
serializer = self.container.config.get(
SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER
)
exchange = get_rpc_exchange(self.container.config)
ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)
responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl)
result, exc_info = responder.send_response(result, exc_info)
if self.only_once is False:
self.rpc_consumer.queue_consumer.ack_message(message)
return result, exc_info
rpc = Rpc.decorator
1 Like