Ok, let me explain my setup in more details. I'm working on https://plon.io
- my data science startup, where each user can create and run its own
projects (using an online IDE for running commands, editing files, etc.).
Currently I am using my own amqp+rpc solution but I'm considering switching
to nameko, which is much more mature and easier to test. So in Plon each
project has it's unique id and is controlled by a service (this is what i
referred to as a worker). When a user issues a command it's being passed to
a supervisor (where the command is auhtorized first). Next, supervisor
passes the command to a particular project (worker). Obviously, I don't
know ids of projects in advance
I tried to follow the approach in RpcProxy.get_dependency:
class WorkerCtx(DependencyProvider):
def __init__(self):
self.proxies = defaultdict(list)
def get_dependency(self, worker_ctx):
return worker_ctx
class SupervisorService(object):
name = 'supervison'
worker_ctx = WorkerCtx()
@rpc
def ping(self, worker_id):
# TODO: authentication? worker_ctx?
proxy = ServiceProxy(self.worker_ctx, "worker_%s" % worker_id, ReplyListener())
return proxy.listdir(args, kwargs)
but it gives me:
Traceback (most recent call last):
File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/containers.py",
line 404, in _run_worker
result = method(*worker_ctx.args, **worker_ctx.kwargs)
File
"/home/pgorecki/workspace/plon_microservices/overseer/fs_service.py", line
26, in listdir
return proxy.listdir(args, kwargs)
File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 394, in __call__
reply = self._call(*args, **kwargs)
File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 498, in _call
reply_to_routing_key = reply_listener.routing_key
AttributeError: 'ReplyListener' object has no attribute 'routing_key'
I'm not sure if I'm on the right track. TIA.
···
On Friday, January 13, 2017 at 2:43:05 PM UTC+1, David Szotten wrote:
Hi,
The build in RpcProxy dependency does indeed take the target service as a
parameter, but its implementation is actually pretty small (~10 lines)
https://github.com/nameko/nameko/blob/master/nameko/rpc.py#L343 so it
should be straightforward to make your own that defers the choice of target
service until runtime (and still re-uses the reply queue)
There are no timeouts built int yet (see discussion on the open pr here
https://github.com/nameko/nameko/pull/360\)
Could you elaborate a bit on your use case? For distributing work over
multiple "workers", the usual pattern is to have them all use the _same_
service name. Any incoming requests will be distributed between the
workers, and you can add/remove workers as you wish
Best,
David
On Friday, 13 January 2017 10:11:48 UTC, przemysla...@gmail.com wrote:
I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.
class WorkerService(object):
name = 'worker_' + os.environ['WORKER_ID']
@rpc
def ping(self):
return True
and
class SupervisorService(object):
name = 'supervisor'
@rpc
def ping_worker(self, worker_id):
# TODO: create proxy to worker
proxy = create_proxy_to_worker(worker_id)
return proxy.ping()
Here are some questions:
1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.
2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests