'Out-of-Band' worker management

I’ve been thinking about how we’d like to manage our nameko services.

We’re using nameko more to orchestrate a bunch of actions across many machines, rather than for sheer throughput. So, our workers tend to be long-lived.

We’re almost certain to want to answer questions like ‘Which server is running the export for Customer X and how long has it been running for?’, and ‘The queue for service Y is rather long. What are its workers currently doing, and how far along are they?’

If a service has all of its workers active, any calls to a ‘what are you up to?’ service method will just sit in the queue along with everything else. Some of the questions can be answered with decent logging, but the ability to interrogate a service’s container would be interesting.

Is this sort of ‘out-of-band’ communication feasible? Perhaps some sort of ‘sidecar’ service that can query the main service’s container?

Perhaps this might also open up the possibility of forcing termination of workers. For example, we’ve got a ‘control’ service which follows sets of job-plans and orchestrates many service calls. We have the ability for a user to flag a running job as ‘TerminateEarly’, but it’d be great to forcibly terminate the workers that are doing things on behalf of that job.

Any thoughts appreciated. I’m going to explore the ‘sidecar service as a dependency’ approach, since it feels like it might be cleanest - although ‘nameko service inside a nameko dependency, inside a nameko service’ is a bit Inception-y.

Ok, I’ve got something working with the ‘sidecar’ approach. It’s a DependencyProvider, when it should probably be an extension. Plus, it only returns the number of running workers – I’ve not tried anything more involved yet.

from nameko.extensions import DependencyProvider
from nameko.containers import ServiceContainer
from nameko.rpc import rpc

class Sidecar(DependencyProvider):

    def setup(self):
        parent_svc_container = self.container

        class SidecarService():
            parent_container = parent_svc_container
            name = parent_container.service_name + "_sidecar"
            
            @rpc
            def worker_stats(self):
                worker_contexts = list(self.parent_container._worker_threads.keys())
                
                # For now, just return a count of workers.
                return len(worker_contexts)
                
        sidecar_config = parent_svc_container.config
        sidecar_container = ServiceContainer(SidecarService, config=sidecar_config)
        sidecar_container.start()

Because our services have some predictable arguments (e.g. customer_id), I could inspect the args attribute of each worker to see what/who it’s running for. I need to see if I can figure out a way to ask a worker ‘how far through are you?’ and see how to (and what happens when) I try to kill a worker from the sidecar, if it’s even possible. Best case is I find a method to kill the worker and it throws an exception rather than leaving an un-ack’d message on the queue…

Lots of interesting ideas here.

The “worker lifecycle” that can be tapped into by a DependencyProvider is definitely the easiest way to track what a service is up to1. I don’t think you want to be doing starting a container inside DependencyProvider.setup() though, I’m kind of surprised container-inception works at all :wink:

You could mix your sidecar methods in the main service class, but the problem then is that they’re not “out of band” and a call to collect stats will consume a worker. To get the “out of band” behaviour you have to handle requests without spawning a worker, which basically boils down to writing an entrypoint-like extension that uses container.spawn_managed_thread rather than container.spawn_worker.

There is a prototype of something like this at https://github.com/timbu/nameko-management.

As for aborting workers… it’s not really supported in normal entrypoints. You can abort a greenthread with an exception though, which would (I think) terminate the worker and make the entrypoint acknowledge any AMQP message. If you added the exception to the entrypoint’s expected_exceptions you could also suppress any alerts on that exception too. If this works we could think about adding first-class support for terminating workers.


1 As an aside, I had a conversation with @Ondrej_Kohout recently about how the worker lifecycle really doesn’t need to be part of the DependencyProvider and we could potentially split it out, but it lives where it lives for now and there’s no penalty for not implementing get_dependency.