Non-persistent ClusterRpcProxy


#1

Hi All,

I am developing an application that will handle large stream of videos/images and process them.
I found nameko is quite easy to use and it is quite good so far.
But, I would like to further decrease the message processing time, and I have been reading threads related to this. What I am still trying to understand is to make the message non-persistent and therefore can make RabbitMQ runs faster.
I understand that the deliver_mode has been exposed in nameko library. But, I do not understand how to set this parameter when I use an RPC call by using ClusterRpcProxy.
I tried several things, but it did not achieve what I wanted (i.e. to make the message non-persistent).
Some of the things that I tried but not successful:

rpc = ClusterRpcProxy(config, delivery_mode = 1)

from kombu import Exchange
Exchange.delivery_mode = 1

config = {
‘AMQP_URI’: AMQP_URI
‘delivery_mode’: 1
}
cluster_rpc = ClusterRpcProxy(config)

Could someone help me? Did I miss something obvious here?

Best Regards,
Arnold


#2

Hi Arnold,

In the current stable release it’s not possible to configured the standalone proxy in this way.

However, the standalone proxy has been significantly refactored by the work in https://github.com/nameko/nameko/pull/542. These changes are about to hit PyPI as a pre-release version.

Using the pre-release version you will be able to do exactly as in your first attempt:

rpc = ClusterRpcProxy(config, delivery_mode=1)

We would be grateful for some feedback if you start to use this pre-release version!


#3

Pre-release 3.0.0rc0 is now on PyPI.

You can install it with pip install --pre nameko


#4

Thanks Matt! I am still working on the other parts. I shall update you soon once I start looking at it.


#5

Hi Matt,

I started looking at this, I did the pip install --pre nameko
Then I ran my service: nameko run myservice, I received the following error:

from nameko.rpc import rpc, ServiceProxy, ReplyListener
ImportError: cannot import name ServiceProxy

#6

For what reason are you importing ServiceProxy directly? It’s a semi-internal part of the old implementation that had been refactored away.

If you can post some code it would help


#7

Hi Matt,

Sorry for the late response. I followed the code from: https://groups.google.com/forum/#!topic/nameko-dev/XE-uarCEV00 to make a dynamic service call using RPCProxy.

class RpcProxyFactory(object):

    def __init__(self, worker_ctx, reply_listener):
        self.worker_ctx = worker_ctx
        self.reply_listener = reply_listener

    def __call__(self, target_service):
        return ServiceProxy(self.worker_ctx, target_service, self.reply_listener)


class DynamicRpcProxy(DependencyProvider):
    rpc_reply_listener = ReplyListener()

    def get_dependency(self, worker_ctx):
        return RpcProxyFactory(worker_ctx, self.rpc_reply_listener)

#8

Ah, interesting. I had forgotten about this use-case.

In the refactored code you almost get the “dynamic” proxy behaviour for free. I say almost because the RpcProxy requires that you provide a target service name and pre-populates it on the object that is returned. If that argument became optional, you could use the normal RpcProxy “dynamically” by simply not providing a target service name when declaring it.

I think this is a sufficiently common request that we should make that change. I’ll try and get that done and cut a new pre-release.

If you need something in the mean time, take a look at overriding the constructor and get_dependency methods of the RpcProxy


#9

Thanks Matt for responding.

I will wait for the new pre-release to enable me initialize RpcProxy by doing “proxy = RpcProxy()” in a service class.

In the mean time, I will look the constructor and get_dependency that you suggested. I suppose I only need to find the replacement of ServiceProxy in the __call__ function, is this correct?


#10

Sorry for the slow reply @arnoldw.

I’ve opened a PR with these changes against the v3.0.0-rc branch. Once they land I’ll cut a new pre-release.


#11

Thanks @mattbennett! I look forward to testing the pre-release!