nameko stuck in a self.__dispatch() call for a service sending an event to another service

I am running Nameko 2.9.0 with python 3.5.

I have a piece of code where service-A needs to dispatch an event to
service-B.
When I run Service-A to read a NATS queue and pass an API to another
service (Service-B), the Nameko service for Service-A gets stuck in the
EventDispatch for Service-B.

Any clues ?

class Service-A:
    name = "Service-A"
    CONFIG = {'AMQP_URI' : "amqp://......"}

    def __init__(self):
        self.__NATSserversList = ["nats://localhost:4222"]
        self.__natsClient = NATS()
        self.__subjects = ["k8s.events", "aws.events", "k8s.alarms",
"aws.alarms"]
        self.__loop = asyncio.new_event_loop()
        self.__dispatch = EventDispatcher()
        print("Created a Service-A")

    @rpc
    def Service-A_start(self):
        self.__loop.run_until_complete(self.__natsReaderLoop(self.__loop))
        try:
            print("Starting event Loop")
            self.__loop.run_forever()
        finally:
            print("Closing event Loop")
            self.__loop.close()

    async def __readEventMessage(self, msg):
        try:
               self.__dispatch("Event_AWSEvent", payload=msg)
        except Exception as e:
            print("Exception in processing message on Topic")

    def __natsReaderLoop(self, loop):
        try:
            yield from
self.__natsClient.connect(servers=self.__NATSserversList,
                                                 io_loop=self.__loop)
            yield from self.__natsClient.subscribe(self.__subjects,
                                                       "nats-subscriber",
                                                      
self.__readEventMessage)

        except Exception as e:
            print(e)

from nameko.containers import ServiceContainer
container = ServiceContainer(Service-A, config=Service-A.CONFIG)
service_extensions = list(container.extensions)
container.start()
with ClusterRpcProxy(Service-A.CONFIG) as rpc:
    rpc.Service-A.Service-A_start()

Hi ..

I just tried changing the dispatcher 'dispatch' from a instance variable to
a class variable and it seemed to work.
Whats the logic ? [ I am a newbie to Nameko ]

.. Do all instances of the Service share a Dispatcher ?

···

On Wednesday, 1 August 2018 17:05:42 UTC+5:30, nsam...@gmail.com wrote:

I am running Nameko 2.9.0 with python 3.5.

I have a piece of code where service-A needs to dispatch an event to
service-B.
When I run Service-A to read a NATS queue and pass an API to another
service (Service-B), the Nameko service for Service-A gets stuck in the
EventDispatch for Service-B.

Any clues ?

class Service-A:
    name = "Service-A"
    CONFIG = {'AMQP_URI' : "amqp://......"}

    def __init__(self):
        self.__NATSserversList = ["nats://localhost:4222"]
        self.__natsClient = NATS()
        self.__subjects = ["k8s.events", "aws.events", "k8s.alarms",
"aws.alarms"]
        self.__loop = asyncio.new_event_loop()
        self.__dispatch = EventDispatcher()
        print("Created a Service-A")

    @rpc
    def Service-A_start(self):
        self.__loop.run_until_complete(self.__natsReaderLoop(self.__loop))
        try:
            print("Starting event Loop")
            self.__loop.run_forever()
        finally:
            print("Closing event Loop")
            self.__loop.close()

    async def __readEventMessage(self, msg):
        try:
               self.__dispatch("Event_AWSEvent", payload=msg)
        except Exception as e:
            print("Exception in processing message on Topic")

    def __natsReaderLoop(self, loop):
        try:
            yield from
self.__natsClient.connect(servers=self.__NATSserversList,
                                                 io_loop=self.__loop)
            yield from self.__natsClient.subscribe(self.__subjects,
                                                       "nats-subscriber",
                                                      
self.__readEventMessage)

        except Exception as e:
            print(e)

from nameko.containers import ServiceContainer
container = ServiceContainer(Service-A, config=Service-A.CONFIG)
service_extensions = list(container.extensions)
container.start()
with ClusterRpcProxy(Service-A.CONFIG) as rpc:
    rpc.Service-A.Service-A_start()

You seem to be using asyncio. That’s not going to work…

Nameko uses Eventlet for concurrency. You don’t need to run your own event loop like this.

You also don’t want to be putting anything into the __init__ method on your service class. Services instances are ephemeral; the class is instantiated for every worker and not once when the service is started as you’re assuming here.

I recommend you take a look at the nameko-examples repo to get a better idea what a Nameko service should look like.

Hi Matt,

Thanks for the reply.

I wanted to create a ‘subscriber’ worker for a NATS notification service, for each call to RPC Service-A_start().

The worker would run an event-loop processing notifications.

I understand the point about the Service instance being ephemeral.

Is there a worker time-out that I need to be concerned about, that could kill the subscriber event-loop ?

I think the proper approach would have been to have a Nameko extension for NATS (similar to AMQP), but I couldn’t see one available yet.

Ah, I see what you’re trying to do now.

A Nameko entrypoint for NATS is absolutely the way to go. It’s not too difficult to create entrypoints; if there was a Python NATS library that wasn’t tied to an eventloop (I only see the asyncio and tornado versions) it would be easy.

I don’t know about integrating asyncio-based libraries into an eventlet ecosystem though. I think you could probably put the reader loop into a tpool

There is no timeout for workers so your eventloop-in-worker would keep running until it voluntarily stops. This would be a weird pattern though, I’d recommend trying to refactor your NATS consumer into a proper entrypoint if you can.