How can I connect event callback to tornado?

From @wyattsuen on Thu Aug 23 2018 14:52:59 GMT+0000 (UTC)

I have a service to accept data from remote servers, and need to broadcast the data to my other services. These other services(I use tornado) need to show these data on websocket connections. I think event driven program is suitable. But how can I use the @event_handler outside nameko but in tornado?

The document give the example, but I have no way to send the ‘payload’ to my tornado websocket server.

classs ServiceB:
     name='service_b'

    tornado_service=??   how can I inject the tornado websocket callback?

    @event_handler('service_a', 'event_type')
    def handle_event(self, payload):
        print('service b received:', payload)
        tornado_service.callback(payload)??       but tornado server can't embed in nameko server.

Are there any graceful way for nameko to open a standalone event handler in flask or tornado?



Copied from original issue: https://github.com/nameko/nameko/issues/570

From @mattbennett on Thu Aug 23 2018 20:06:45 GMT+0000 (UTC)

A Nameko event is just an AMQP message with a particular payload, so a standalone event handler is just something that consumes those messages. You should be able to use any AMQP library.

If I understand your question correctly, you could also use your Nameko service to communicate with your tornado services using a websocket connection. You can use the websocket-client library for that, and there’s an example of using it in the nameko codebase.

From @mattbennett on Thu Aug 23 2018 21:46:28 GMT+0000 (UTC)

For future reference, the forum is a better place for support questions like these. I will copy this issue there now and we can continue discussion on the forum if needs be.

Thanks very much for the reply and this very beatiful framework. I just use nameko in a week, and switch my most work architechture to it. I know I have to solve this problem with kombu. As I see the kombu document for these days, I find it difficult for me to understand the process without the rabbitmq experience. Can you help me to provide the example how to get event_handler outside nameko using kombu.

class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
        self.dispatch("event_type", payload)

How to accept this event with SERVICE_POOL and BROADCAST method in kombu without nameko. I think the example will be very helpful to add to the nameko document. Or it is best to provide a standalone event_handler just like the standalone rpc service.

Although I have not met the problem, but I think it useful to send the event outside nameko, and accept the event in nameko is also welcome. Guessing tornado accept a message from user, and I want to send this message to many potential services, maybe broadcast the event is useful.

class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received:", payload)

How to send the event with SERVICE_POOL and BROADCAST method in kombu without nameko. The examples are very important for us without rabbitmq experiences. So kindly and patiently providing the examples is welcome. Thanks a lot.

This message got stuck in moderation and I wasn’t notified, sorry about that.

The thing you’re missing here is how the queues and exchanges work together in Nameko events. It’s covered to some extent in the docstring, but I will describe it here:

The event hander in ServiceB will create a queue called evt-service_a-event_type--service_b.handle_event bound to an exchange service_a.events with routing key event_type. When ServiceA publishes a message to the exchange service_a.events and the routing key event_type, it is routed to the event handler queue (in fact to all queues bound with the routing key event_type, as explained below). The event handler in ServiceB then consumes that message as the “event”. The payload is just a serialised copy of what is passed to the dispatcher.

A “standalone” event handler would just need to consume from the queue.

There are two other important facts you need to understand about AMQP:

  1. Multiple consumers connected to the same queue will “share” the messages. That is, they consume them in a round-robin fashion and one message is delivered to one consumer only.

  2. An exchange will route a message to every bound queue that matches the routing key. That is, the message is duplicated.

Thus it is not possible to broadcast messages in SERVICE_POOL or BROADCAST mode – they’re configurations of the consumers. This setting determines whether or not the consumers “share” a queue or get their own.

When you connect your “standalone” event handler, you’ll need to create a dedicated queue for it if you want it to receive a copy of all the events. If you want to have multiple instances of your standalone consumer working together as a “pool”, create a new queue and bind all the instances to that one queue – then events will round-robin between all the instances.