How to create RpcProxy in rpc call?

I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.

class WorkerService(object):
    name = 'worker_' + os.environ['WORKER_ID']

    @rpc
    def ping(self):
        return True

and

class SupervisorService(object):
    name = 'supervisor'

    @rpc
    def ping_worker(self, worker_id):
        # TODO: create proxy to worker
        proxy = create_proxy_to_worker(worker_id)

        return proxy.ping()

Here are some questions:

1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.

2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests

Hi,

The build in RpcProxy dependency does indeed take the target service as a
parameter, but its implementation is actually pretty small (~10
lines) https://github.com/nameko/nameko/blob/master/nameko/rpc.py#L343 so
it should be straightforward to make your own that defers the choice of
target service until runtime (and still re-uses the reply queue)

There are no timeouts built int yet (see discussion on the open pr here
https://github.com/nameko/nameko/pull/360\)

Could you elaborate a bit on your use case? For distributing work over
multiple "workers", the usual pattern is to have them all use the _same_
service name. Any incoming requests will be distributed between the
workers, and you can add/remove workers as you wish

Best,
David

···

On Friday, 13 January 2017 10:11:48 UTC, przemysla...@gmail.com wrote:

I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.

class WorkerService(object):
    name = 'worker_' + os.environ['WORKER_ID']

    @rpc
    def ping(self):
        return True

and

class SupervisorService(object):
    name = 'supervisor'

    @rpc
    def ping_worker(self, worker_id):
        # TODO: create proxy to worker
        proxy = create_proxy_to_worker(worker_id)

        return proxy.ping()

Here are some questions:

1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.

2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests

Ok, let me explain my setup in more details. I'm working on https://plon.io
- my data science startup, where each user can create and run its own
projects (using an online IDE for running commands, editing files, etc.).
Currently I am using my own amqp+rpc solution but I'm considering switching
to nameko, which is much more mature and easier to test. So in Plon each
project has it's unique id and is controlled by a service (this is what i
referred to as a worker). When a user issues a command it's being passed to
a supervisor (where the command is auhtorized first). Next, supervisor
passes the command to a particular project (worker). Obviously, I don't
know ids of projects in advance :slight_smile:

I tried to follow the approach in RpcProxy.get_dependency:

class WorkerCtx(DependencyProvider):
    def __init__(self):
        self.proxies = defaultdict(list)

    def get_dependency(self, worker_ctx):
        return worker_ctx

class SupervisorService(object):
    name = 'supervison'
    worker_ctx = WorkerCtx()

    @rpc
    def ping(self, worker_id):
        # TODO: authentication? worker_ctx?
        proxy = ServiceProxy(self.worker_ctx, "worker_%s" % worker_id, ReplyListener())

        return proxy.listdir(args, kwargs)

but it gives me:
Traceback (most recent call last):
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/containers.py",
line 404, in _run_worker
    result = method(*worker_ctx.args, **worker_ctx.kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/overseer/fs_service.py", line
26, in listdir
    return proxy.listdir(args, kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 394, in __call__
    reply = self._call(*args, **kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 498, in _call
    reply_to_routing_key = reply_listener.routing_key
AttributeError: 'ReplyListener' object has no attribute 'routing_key'

I'm not sure if I'm on the right track. TIA.

···

On Friday, January 13, 2017 at 2:43:05 PM UTC+1, David Szotten wrote:

Hi,

The build in RpcProxy dependency does indeed take the target service as a
parameter, but its implementation is actually pretty small (~10 lines)
https://github.com/nameko/nameko/blob/master/nameko/rpc.py#L343 so it
should be straightforward to make your own that defers the choice of target
service until runtime (and still re-uses the reply queue)

There are no timeouts built int yet (see discussion on the open pr here
https://github.com/nameko/nameko/pull/360\)

Could you elaborate a bit on your use case? For distributing work over
multiple "workers", the usual pattern is to have them all use the _same_
service name. Any incoming requests will be distributed between the
workers, and you can add/remove workers as you wish

Best,
David

On Friday, 13 January 2017 10:11:48 UTC, przemysla...@gmail.com wrote:

I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.

class WorkerService(object):
    name = 'worker_' + os.environ['WORKER_ID']

    @rpc
    def ping(self):
        return True

and

class SupervisorService(object):
    name = 'supervisor'

    @rpc
    def ping_worker(self, worker_id):
        # TODO: create proxy to worker
        proxy = create_proxy_to_worker(worker_id)

        return proxy.ping()

Here are some questions:

1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.

2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests

The `ReplyListener` is itself an extension, so you can't just instantiate
it in your service (it needs to be managed by nameko)

Exactly what you need will depend on how you want to interact with it, but
something like (not tested)

class ClusterProxy(object):
    def __init__(self, worker_ctx, reply_listener):
        self.worker_ctx = worker_ctx
        self.reply_listener = reply_listener

    # pick your interface. this uses dict-like access to chose target
service
    def __getitem__(self, name):
        return ServiceProxy(self.worker_ctx, name, self.reply_listener)

class ClusterRpcProxy(DependencyProvider):
    rpc_reply_listener = ReplyListener()

    def get_dependency(self, worker_ctx):
        return ClusterProxy(worker_ctx, self.rpc_reply_listener)

# Usage
class MyService():
    name = 'foo'

    proxy = ClusterRpcProxy()

    @rpc
    def entrypoint(self);
        self.proxy['worker_1'].ping()

···

On Friday, 13 January 2017 14:09:07 UTC, przemysla...@gmail.com wrote:

Ok, let me explain my setup in more details. I'm working on
https://plon.io - my data science startup, where each user can create and
run its own projects (using an online IDE for running commands, editing
files, etc.). Currently I am using my own amqp+rpc solution but I'm
considering switching to nameko, which is much more mature and easier to
test. So in Plon each project has it's unique id and is controlled by a
service (this is what i referred to as a worker). When a user issues a
command it's being passed to a supervisor (where the command is auhtorized
first). Next, supervisor passes the command to a particular project
(worker). Obviously, I don't know ids of projects in advance :slight_smile:

I tried to follow the approach in RpcProxy.get_dependency:

class WorkerCtx(DependencyProvider):
    def __init__(self):
        self.proxies = defaultdict(list)

    def get_dependency(self, worker_ctx):
        return worker_ctx

class SupervisorService(object):
    name = 'supervison'
    worker_ctx = WorkerCtx()

    @rpc
    def ping(self, worker_id):
        # TODO: authentication? worker_ctx?
        proxy = ServiceProxy(self.worker_ctx, "worker_%s" % worker_id, ReplyListener())

        return proxy.listdir(args, kwargs)

but it gives me:
Traceback (most recent call last):
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/containers.py",
line 404, in _run_worker
    result = method(*worker_ctx.args, **worker_ctx.kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/overseer/fs_service.py", line
26, in listdir
    return proxy.listdir(args, kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 394, in __call__
    reply = self._call(*args, **kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 498, in _call
    reply_to_routing_key = reply_listener.routing_key
AttributeError: 'ReplyListener' object has no attribute 'routing_key'

I'm not sure if I'm on the right track. TIA.

On Friday, January 13, 2017 at 2:43:05 PM UTC+1, David Szotten wrote:

Hi,

The build in RpcProxy dependency does indeed take the target service as a
parameter, but its implementation is actually pretty small (~10 lines)
https://github.com/nameko/nameko/blob/master/nameko/rpc.py#L343 so it
should be straightforward to make your own that defers the choice of target
service until runtime (and still re-uses the reply queue)

There are no timeouts built int yet (see discussion on the open pr here
https://github.com/nameko/nameko/pull/360\)

Could you elaborate a bit on your use case? For distributing work over
multiple "workers", the usual pattern is to have them all use the _same_
service name. Any incoming requests will be distributed between the
workers, and you can add/remove workers as you wish

Best,
David

On Friday, 13 January 2017 10:11:48 UTC, przemysla...@gmail.com wrote:

I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.

class WorkerService(object):
    name = 'worker_' + os.environ['WORKER_ID']

    @rpc
    def ping(self):
        return True

and

class SupervisorService(object):
    name = 'supervisor'

    @rpc
    def ping_worker(self, worker_id):
        # TODO: create proxy to worker
        proxy = create_proxy_to_worker(worker_id)

        return proxy.ping()

Here are some questions:

1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.

2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests

Thanks a lot. I will give it a try.

···

On Friday, January 13, 2017 at 3:49:04 PM UTC+1, David Szotten wrote:

The `ReplyListener` is itself an extension, so you can't just instantiate
it in your service (it needs to be managed by nameko)

Exactly what you need will depend on how you want to interact with it, but
something like (not tested)

class ClusterProxy(object):
    def __init__(self, worker_ctx, reply_listener):
        self.worker_ctx = worker_ctx
        self.reply_listener = reply_listener

    # pick your interface. this uses dict-like access to chose target
service
    def __getitem__(self, name):
        return ServiceProxy(self.worker_ctx, name, self.reply_listener)

class ClusterRpcProxy(DependencyProvider):
    rpc_reply_listener = ReplyListener()

    def get_dependency(self, worker_ctx):
        return ClusterProxy(worker_ctx, self.rpc_reply_listener)

# Usage
class MyService():
    name = 'foo'

    proxy = ClusterRpcProxy()

    @rpc
    def entrypoint(self);
        self.proxy['worker_1'].ping()

On Friday, 13 January 2017 14:09:07 UTC, przemysla...@gmail.com wrote:

Ok, let me explain my setup in more details. I'm working on
https://plon.io - my data science startup, where each user can create
and run its own projects (using an online IDE for running commands, editing
files, etc.). Currently I am using my own amqp+rpc solution but I'm
considering switching to nameko, which is much more mature and easier to
test. So in Plon each project has it's unique id and is controlled by a
service (this is what i referred to as a worker). When a user issues a
command it's being passed to a supervisor (where the command is auhtorized
first). Next, supervisor passes the command to a particular project
(worker). Obviously, I don't know ids of projects in advance :slight_smile:

I tried to follow the approach in RpcProxy.get_dependency:

class WorkerCtx(DependencyProvider):
    def __init__(self):
        self.proxies = defaultdict(list)

    def get_dependency(self, worker_ctx):
        return worker_ctx

class SupervisorService(object):
    name = 'supervison'
    worker_ctx = WorkerCtx()

    @rpc
    def ping(self, worker_id):
        # TODO: authentication? worker_ctx?
        proxy = ServiceProxy(self.worker_ctx, "worker_%s" % worker_id, ReplyListener())

        return proxy.listdir(args, kwargs)

but it gives me:
Traceback (most recent call last):
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/containers.py",
line 404, in _run_worker
    result = method(*worker_ctx.args, **worker_ctx.kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/overseer/fs_service.py", line
26, in listdir
    return proxy.listdir(args, kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 394, in __call__
    reply = self._call(*args, **kwargs)
  File
"/home/pgorecki/workspace/plon_microservices/.venv_os/lib/python3.5/site-packages/nameko/rpc.py",
line 498, in _call
    reply_to_routing_key = reply_listener.routing_key
AttributeError: 'ReplyListener' object has no attribute 'routing_key'

I'm not sure if I'm on the right track. TIA.

On Friday, January 13, 2017 at 2:43:05 PM UTC+1, David Szotten wrote:

Hi,

The build in RpcProxy dependency does indeed take the target service as
a parameter, but its implementation is actually pretty small (~10 lines)
https://github.com/nameko/nameko/blob/master/nameko/rpc.py#L343 so it
should be straightforward to make your own that defers the choice of target
service until runtime (and still re-uses the reply queue)

There are no timeouts built int yet (see discussion on the open pr here
https://github.com/nameko/nameko/pull/360\)

Could you elaborate a bit on your use case? For distributing work over
multiple "workers", the usual pattern is to have them all use the _same_
service name. Any incoming requests will be distributed between the
workers, and you can add/remove workers as you wish

Best,
David

On Friday, 13 January 2017 10:11:48 UTC, przemysla...@gmail.com wrote:

I have the following scenatio: all requests are coming to Supervior
service, which receives requests and passes them to workers. Workers can be
spawned dynamically.

class WorkerService(object):
    name = 'worker_' + os.environ['WORKER_ID']

    @rpc
    def ping(self):
        return True

and

class SupervisorService(object):
    name = 'supervisor'

    @rpc
    def ping_worker(self, worker_id):
        # TODO: create proxy to worker
        proxy = create_proxy_to_worker(worker_id)

        return proxy.ping()

Here are some questions:

1) How can I implement create_proxy_to_worker? All the examples in the user guide assume that service names are known in advance, which is not my case.

2) How can I set timeout for a worker proxy? I cannot guarantee if worker with specific id exists and it doesn't make sense to enqueue ping requests