Retrieve an asynchronous task at a later time by it's correlation_id.

I'm writing a restful service that is using Nameko under the hood to break
everything down into microservices. In the event of an expensive task I
wish to call it asynchronously and have the service process the task whilst
I return the correlation_id back to the user. Every 5 seconds they will be
expected to hit the rest service with the correlation_id to check if the
task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?

Hi David,

Since the services are designed to be stateless, I guess you need to use
some kind of persistence layer (e.g. your database) to keep the task
results. You might also separate "dispatching" from actually performing the
task. Maybe something like

class Service(object):
    name = "service"

    dispatch = EventDispatcher() # nameko builtin
    db = MyDataStore() # maybe a 3rd party redis, or sql?

    @http('POST', '/expensive')
    def request_expensive_thing(self, request):
        params = get_and_validate(request)
        correlation_id = uuid4()
        self.dispatch('do_thing', {
            'correlation_id': correlation_id,
            'params': params,
        })
        return json.dumps({'correlation_id': correlation_id})

    @http('GET', '/expensive/status')
    def check_status(self, request):
        """poll this endpoint"""
        correlation_id = get_and_validate(request)
        result = self.db.get(correlation_id)
        if result is None:
            return json.dumps({'status': 'not_ready'})

        # possibly delete now that it's retrieved? or have a @timer
        # endpoint to remove "old" results
        return json.dumps({
            'status': 'ready',
            'result': result,
        })

    # this could even be in a separate service
    @event_handler('service', 'do_thing')
    def do_expensive_thing(self, payload):
        correlation_id = payload['correlation_id']
        params = payload['params']
        result = do_work(params)
        self.db.store(correlation_id, result)

One endpoint to dispatch the request. This generates a unique correlation
id, emits an event do trigger the work to be done, and then returns the
correlation id. Another endpoint can be polled to check the result data
store if the result is available yet. Finally and event handler that does
the actual work

Best,
David

···

On Monday, 18 January 2016 11:01:24 UTC, David Smith wrote:

I'm writing a restful service that is using Nameko under the hood to break
everything down into microservices. In the event of an expensive task I
wish to call it asynchronously and have the service process the task whilst
I return the correlation_id back to the user. Every 5 seconds they will be
expected to hit the rest service with the correlation_id to check if the
task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?

Doesn't this seem redundant? Using nameko I already have RabbitMQ storing
requests and their associated results. I'm looking for an easy way, using
the correlation_id from nameko.rpc.RpcReply.correlation_id, to request that
task's response from the MQ.

···

On Monday, 18 January 2016 11:01:24 UTC, David Smith wrote:

I'm writing a restful service that is using Nameko under the hood to break
everything down into microservices. In the event of an expensive task I
wish to call it asynchronously and have the service process the task whilst
I return the correlation_id back to the user. Every 5 seconds they will be
expected to hit the rest service with the correlation_id to check if the
task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?

Or put another way. Given I close my consuming process or it crashes whilst
the Service continues to process the expensive task, I should be able to
recover my async tasks?

···

On Monday, 18 January 2016 11:01:24 UTC, David Smith wrote:

I'm writing a restful service that is using Nameko under the hood to break
everything down into microservices. In the event of an expensive task I
wish to call it asynchronously and have the service process the task whilst
I return the correlation_id back to the user. Every 5 seconds they will be
expected to hit the rest service with the correlation_id to check if the
task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?

The way rabbitmq works, you can't cherry-pick messages from a queue (e.g.
by correlation id), so the nameko rpc framework isn't really suitable for
this. you might be able to get rabbit to keep the results for you (e.g. in
a dedicated queue), but i'm not sure this is a great fit for amqp.

You could possibly use a websocket endpoint and have the browser wait while
the service runs the calculation

···

On Monday, 18 January 2016 11:30:38 UTC, David Smith wrote:

Doesn't this seem redundant? Using nameko I already have RabbitMQ storing
requests and their associated results. I'm looking for an easy way, using
the correlation_id from nameko.rpc.RpcReply.correlation_id, to request that
task's response from the MQ.

On Monday, 18 January 2016 11:01:24 UTC, David Smith wrote:

I'm writing a restful service that is using Nameko under the hood to
break everything down into microservices. In the event of an expensive task
I wish to call it asynchronously and have the service process the task
whilst I return the correlation_id back to the user. Every 5 seconds they
will be expected to hit the rest service with the correlation_id to check
if the task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?

Again, since services are stateless, if you want to be able to recover i
think we're back to my original proposal, with an event handler processing
the expensive task independently of the request that triggered it, and
eventually storing it. Then, as long as you keep hold of the correlation
id, you can always call `check_status` to get the result

···

On Monday, 18 January 2016 11:38:03 UTC, David Smith wrote:

Or put another way. Given I close my consuming process or it crashes
whilst the Service continues to process the expensive task, I should be
able to recover my async tasks?

On Monday, 18 January 2016 11:01:24 UTC, David Smith wrote:

I'm writing a restful service that is using Nameko under the hood to
break everything down into microservices. In the event of an expensive task
I wish to call it asynchronously and have the service process the task
whilst I return the correlation_id back to the user. Every 5 seconds they
will be expected to hit the rest service with the correlation_id to check
if the task has finished processing. How would I go about reconstructing
the nameko.rpc.RpcReply object to retrieve the task response?