How to identify pubsub server from wxpython sub application

Hi Guys,

I have two machines trying to talk to each other in a pubsub way. On the subscriber (192.168.1.88) I have a wx.python GUI including:

@event_handler(
    "Environment_Events", "TandH", handler_type=BROADCAST, reliable_delivery=False
)
def updateStatusBarEnv(self, TandH):

On the publisher (192.168.1.81) I have a nameko service with RabbitMQ also on the same machine:

    self.dispatch("TandH", outString)

But I’m getting nothing.

I added to the sub machine:

CONFIG = {'AMQP_URI': "amqp://guest:guest@192.168.1.81:5672"}

To no effect. I also tried pubbing in nameko and subbing with wx.lib.pubsub.

None of these seemed to work. Surely there must be a way of informing the subber which pub server to subscribe to?

Thanks in advance.

Regards

Steve.

Surely there must be a way of informing the subber which pub server to subscribe to?

Nameko events aren’t peer-to-peer. Both publisher and subscriber connect to the same RabbitMQ broker, and the messages pass through it.

Assuming you’ve got them both connected to the same broker, the problem must be in the routing.

Your event_handler is saying subscribe me to “TandH” events from the “Environment_Events” service. Is your dispatch call in a service called “Environment_Events”?

Yes it is.

There are no other commands than those I mentioned.

is

CONFIG = {'AMQP_URI': "amqp://guest:guest@192.168.1.81:5672"}

and

@event_handler

Enough to connect?

It worked beautifully with RPC :frowning_face:

Actually when I said this I probably meant broker rather than publisher.

The @event_handler decorator and a correct AMQP_URI string is call you need. It’s no more complicated than RPC.

You should be able to figure out what’s wrong by looking at the RabbitMQ management interface.

I combined your two snippets into an example for comparison:

from nameko.events import EventDispatcher, event_handler, BROADCAST
from nameko.timer import timer

class S1:
    name = "Environment_Events"

    dispatch = EventDispatcher()

    @timer(interval=1)
    def ping(self):
        self.dispatch("TandH", "payload...")

class S2:
    name = "subber"

    @event_handler(
        "Environment_Events", "TandH", handler_type=BROADCAST, reliable_delivery=False
    )
    def updateStatusBarEnv(self, TandH):
        print(TandH)

Connecting these services to a broker (local or remote, doesn’t matter) you should see a queue created by the @event_handler:

Clicking through to the detail page for that queue:

Note that the queue has a consumer, and that it is bound to the Environment_Events exchange. Clicking through to the detail page for the exchange:

You can see that the queue is bound with the correct routing key.

Compare to your services and see where you’re mismatched.

Outstanding. Where do I get that map of the RabbitMQ warren?

This is the RabbitMQ management interface. You’ll find it on port 15672 assuming you have the management plugin installed.

Hi Matt,

Thanks for this. It looks like I have the module installed, but that I need to enable it. I’m running under docker:

sudo docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3

So I guess enable is something like this:

sudo docker run rabbitmq-plugins enable rabbitmq_management

But I get:

Unable to find image 'rabbitmq-plugins:latest' locally
docker: Error response from daemon: pull access denied for rabbitmq-plugins, 
repository does not exist   or may require 'docker login'.

What do you think?

Regards

Steve.

Hi Steve,

To run RabbitMQ broker with Management plugin enabled you can simply use this docker run command:
sudo docker run -d -p 5672:5672 -p 15672:15672 --hostname rabbit --name rabbit rabbitmq:3.7-management-alpine

Port 5672 is used for AMQP connections and 15672 for Web Management console.

You get the error above because there isn’t rabbitmq-plugins:latest docker image in docker hub. You can find all available images here: https://hub.docker.com/_/rabbitmq

Hi Jakub_Borys,

Great. Finally got the console working after reinstalling Docker and RabbitMQ.

I’ll finish debugging my pubsub tomorrow.

Thanks all.

Steve.

Hi Guys,

So eventually I have all the concepts working, although not programmatically.

My subscriber looks like this:

#!/usr/bin/env python

from nameko.events import event_handler, BROADCAST

CONFIG = {'AMQP_URI': "amqp://guest:guest@192.168.1.81:5672"}

class Subber:
    name = "subber"

    @event_handler(
        "Environment_Events2", "TandH", handler_type=BROADCAST, reliable_delivery=False
    )
    def updateStatusBarEnv(self, TandH):
        print("TandH: %s") % TandH

This works perfectly on 192.168.1.81 (the same machine as the publisher service) but not on 192.168.1.88 unless I run it like this:

nameko run Subber_Test --broker amqp://guest:guest@192.168.1.81

When it also works. As you can see I have created the CONFIG variable in the subscriber service, but how do I pass it to @event_handler?

Thanks

Steve.

It’s not working because declaring a CONFIG variable like that doesn’t actually do anything :wink:

You either need to use a config file, or (if you’re using the nameko 3.x prerelease**) you can declare it on the command line:

nameko run Subber_Test --define AMQP_URI=amqp://guest:guest@192.168.1.81:5672

** To install the prerelease, run pip install --pre nameko

Yes, I could see that. But I’m not running the main app with nameko. It a large wxpython app and I wanted a sort of custom event to pop in and call a routine. If I use a config file will that be called by event_handler?

Standalone, it runs the way you suggest:

nameko run Subber_Test --broker amqp://guest:guest@192.168.1.81

But from another app, it doesn’t. I’ll try the config file.

Thanks.

Steve.

Hi Matt,

So I tried all the above, but whatever I use I need to call a service through the nameko run command. What I have is an application called by the python2.7 command. The application run with nameko run just crashes.

I thought maybe I could use the wxpython pubsub structure but it has this line in on the wiki page:

Note that pubsub itself does not broker over a network, but only within an application.

Which makes it all but useless. I have a service on another host, which I wanted to subscribe to in a pub-sub way. I can do it in a rpc way using ClusterRpcProxy and a CONFIG variable, but not using pub-sub. It seems the same restriction applies. I can run a service to service pub-sub over the network, but not an application to service. The issue is @event_handler. If I could inject the CONFIG into that, it would work. I tried setting the components as environment vars, but still no joy. Is it possible to spell out the @event_handler long-hand with host address injection? How would I do that?

I could potentially daisy-chain my pub-subs, with two RabbitMQs, one on each host. Run a service on my client host attached to both Rabbits, run it under nameko subscribing to the remote host and re-publishing under the local host, which I could pick up with the application. Seems like a lot of work.

What do you think?

Clearly I should have come to your open day last year!!! :slightly_smiling_face:

Will you have another one this year? I discovered nameko the week after you’d just run it.

Regards

Steve

Sorry Steve. Even though it’s right there in the title I missed the part about wxpython. Do you want the Nameko event to trigger something inside the GUI app?

It’s not simply a case of getting the config into the event handler. The @event_handler decorator is a Nameko entrypoint – it relies on various mechanisms inside the Nameko service container to work. So you can’t just decorate a method in another random class and have it work.

The same is true of the @rpc decorator. So I assume your wxpython app is not the server in the RPC relationship, but the client, and uses the RpcProxy from the nameko.standalone.rpc package?

If your wxpython application was the publisher, you could use the standalone event dispatcher (in nameko.standalone.events). Unfortunately there is no standalone event handler. It’s quite a lot harder to consume messages than it is to send them – you need to maintain a persistent connection to the broker, which means a background thread, co-routine, or blocking wait, and which one of those is appropriate depends where you’re using it. So it doesn’t come out of the box.

In Nameko 3.x there is a utility which consumes AMQP messages and the various AMQP entrypoints use it. You might be able to use this as a starting point for consuming events inside a wxpython app.

Exactly. What you say about the @rpc is also true. It sends a message to the @rpc broker on another host and gets the reply. It works, but the applications hangs while it does that (only for a second or two). I could put that call in a background thread and then there would be no hanging at all, which might be better.

I have now got a version of the subscription service which then resends the message locally. That also works. The issue is how to get it into the app. Even the wxpython pubsub only works within a single gui instance. I could poll for it, but that defeats the whole object of it all, I might as well go back to using rpc. pubsub should just shout when its ready and the app comes and gets it. Rather than the app saying are you done yet, are you done yet, are you done yet… ?

Do all pubsubs work like this? What do you think? Am I just using the wrong paradigm. Is there a pusub paradigm that I’m not aware of?

Thanks,

Steve.

We’ve been around the houses in this thread, and I might now be a bit lost!

From a quick read of the pypubsub docs, it’s intended to help decouple bits of a single application (especially useful for GUIs), not let an application respond to events on other machines. Nameko Events and pypubsub both use a “publisher-subscriber” pattern but they are very different.

I think the crux of this all is that you have a concurrency problem. Your wxpython application has its own eventloop. This is why the app hangs when you make a blocking call with the standalone RPC proxy. I guess pypubsub is integrated into that eventloop, so every time your xwpython application goes around the loop it checks to see if there are any new events and calls any handlers you’ve set up for them.

In contrast, there is nothing in that eventloop or any other background thread that is listening for AMQP messages and calling your @event_handler decorated method.

I don’t know much about wxpython, but I would try to separate the two concerns:

  1. Spin up a background thread that can consume messages from AMQP. You can use the Consumer utility from Nameko 3 or just use Kombu or similar directly.
  2. When an AMQP message arrives, use pysubpub to publish that message in a way that your GUI app can react to

(assuming pypubsub publishing is thread-safe)

No, I think you’ve captured it perfectly. So I managed to get point 1) working with a nameko service running on my local host. I tried 2) exactly as you say, but pysubpub doesn’t even work from an external service on the same PC to another task on the same PC. Maybe I can insert a very lightweight check into the main event loop, but I don’t really know how to do that.

In the meantime, it seems easier to continue to use rpc and then just put it into a thread. I could, I suppose, pypubsub the answer intra-app.

I think I learned that in architecture pub-sub needs to be service to service and async, but rpc can be app to service and then needs to be sync, although it can be threaded to reduce impact on GUI performance.

Thanks for your patience.

Regards,

Steve.