Non-persistent ClusterRpcProxy

Hi All,

I am developing an application that will handle large stream of videos/images and process them.
I found nameko is quite easy to use and it is quite good so far.
But, I would like to further decrease the message processing time, and I have been reading threads related to this. What I am still trying to understand is to make the message non-persistent and therefore can make RabbitMQ runs faster.
I understand that the deliver_mode has been exposed in nameko library. But, I do not understand how to set this parameter when I use an RPC call by using ClusterRpcProxy.
I tried several things, but it did not achieve what I wanted (i.e. to make the message non-persistent).
Some of the things that I tried but not successful:

rpc = ClusterRpcProxy(config, delivery_mode = 1)

from kombu import Exchange
Exchange.delivery_mode = 1

config = {
‘AMQP_URI’: AMQP_URI
‘delivery_mode’: 1
}
cluster_rpc = ClusterRpcProxy(config)

Could someone help me? Did I miss something obvious here?

Best Regards,
Arnold

Hi Arnold,

In the current stable release it’s not possible to configured the standalone proxy in this way.

However, the standalone proxy has been significantly refactored by the work in https://github.com/nameko/nameko/pull/542. These changes are about to hit PyPI as a pre-release version.

Using the pre-release version you will be able to do exactly as in your first attempt:

rpc = ClusterRpcProxy(config, delivery_mode=1)

We would be grateful for some feedback if you start to use this pre-release version!

Pre-release 3.0.0rc0 is now on PyPI.

You can install it with pip install --pre nameko

Thanks Matt! I am still working on the other parts. I shall update you soon once I start looking at it.

Hi Matt,

I started looking at this, I did the pip install --pre nameko
Then I ran my service: nameko run myservice, I received the following error:

from nameko.rpc import rpc, ServiceProxy, ReplyListener
ImportError: cannot import name ServiceProxy

For what reason are you importing ServiceProxy directly? It’s a semi-internal part of the old implementation that had been refactored away.

If you can post some code it would help

Hi Matt,

Sorry for the late response. I followed the code from: https://groups.google.com/forum/#!topic/nameko-dev/XE-uarCEV00 to make a dynamic service call using RPCProxy.

class RpcProxyFactory(object):

    def __init__(self, worker_ctx, reply_listener):
        self.worker_ctx = worker_ctx
        self.reply_listener = reply_listener

    def __call__(self, target_service):
        return ServiceProxy(self.worker_ctx, target_service, self.reply_listener)


class DynamicRpcProxy(DependencyProvider):
    rpc_reply_listener = ReplyListener()

    def get_dependency(self, worker_ctx):
        return RpcProxyFactory(worker_ctx, self.rpc_reply_listener)

Ah, interesting. I had forgotten about this use-case.

In the refactored code you almost get the “dynamic” proxy behaviour for free. I say almost because the RpcProxy requires that you provide a target service name and pre-populates it on the object that is returned. If that argument became optional, you could use the normal RpcProxy “dynamically” by simply not providing a target service name when declaring it.

I think this is a sufficiently common request that we should make that change. I’ll try and get that done and cut a new pre-release.

If you need something in the mean time, take a look at overriding the constructor and get_dependency methods of the RpcProxy

Thanks Matt for responding.

I will wait for the new pre-release to enable me initialize RpcProxy by doing “proxy = RpcProxy()” in a service class.

In the mean time, I will look the constructor and get_dependency that you suggested. I suppose I only need to find the replacement of ServiceProxy in the __call__ function, is this correct?

Sorry for the slow reply @arnoldw.

I’ve opened a PR with these changes against the v3.0.0-rc branch. Once they land I’ll cut a new pre-release.

Thanks @mattbennett! I look forward to testing the pre-release!

@arnoldw 3.0.0-rc1 has been released to PyPI

Thanks @mattbennett! I will look into it soon.