How to return ACK from consumer as soon as message is received/consumed?

I am using nameko.messaging.consume for consuming messages from a queue. Here’s a sample code -

from kombu import Queue
from nameko.messaging import consume

class Service:
    name = "sample_service"
    QUEUE = Queue("queue_name", no_declare=True)

    @consume(QUEUE)
    def process_message(self, payload):
        # Some long running code ...
        return result

By default behaviour, ACK will be sent to rabbitMQ broker after process_message function returns a response (Here, statement return result). I want to send an ACK as soon as consumer consumes the message. How can I do that?

*In library “pika”, consumer acknowledges as soon as message is consumed. That will be good example what I want to replicate with nameko’s consumer.

Thanks :blush: @mattbennett

from nameko.rpc import Rpc as NamekoRpc, get_rpc_exchange, Responder
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_SERIALIZER, SERIALIZER_CONFIG_KEY
)
from nameko.exceptions import (ContainerBeingKilled, MalformedRequest)
from functools import partial
from nameko.events import EventHandler as NamekoEventHandler

class Rpc(NamekoRpc):

def __init__(self, only_once: bool = False, *args, **kwargs):
    self.only_once = only_once
    super(Rpc, self).__init__(
        *args, **kwargs
    )

def handle_message(self, body, message):
    if self.only_once is True:
        # Consume only once, automatically ack after getting the message
        self.rpc_consumer.queue_consumer.ack_message(message)
    try:
        args = body['args']
        kwargs = body['kwargs']
    except KeyError:
        raise MalformedRequest('Message missing `args` or `kwargs`')

    self.check_signature(args, kwargs)

    context_data = self.unpack_message_headers(message)

    handle_result = partial(self.handle_result, message)
    try:
        self.container.spawn_worker(self, args, kwargs,
                                    context_data=context_data,
                                    handle_result=handle_result)
    except ContainerBeingKilled:
        self.rpc_consumer.requeue_message(message)

def handle_result(self, message, worker_ctx, result, exc_info):
    amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
    serializer = self.container.config.get(
        SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER
    )
    exchange = get_rpc_exchange(self.container.config)
    ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)

    responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl)
    result, exc_info = responder.send_response(result, exc_info)

    if self.only_once is False:
        self.rpc_consumer.queue_consumer.ack_message(message)
    return result, exc_info

rpc = Rpc.decorator

1 Like