DependencyProvider that directly publishes events?

Some background:

I have a class in a third-party package that maintains a socket-level connection, and provides a method to send via the socket, and a receive loop method (intended to be run as a thread target) that receives and forms messages off that socket, and a callback loop method that then calls an arbitrary callable passing along that formed message (and is also intended to be run as a thread target).

I’ve incorporated an instance of this class into a DependencyProvider and now my goal is to publish events whenever the class callback method fires.

My question is how to accomplish this?

Example:

import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc


class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()


class Connection(DependencyProvider):

    def setup(self):
        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
        self.client.connect(config.get("SOME_ADDRESS"))
        self.container.spawn_managed_thread(
            self.client._receive_loop, identifier="Receive loop"
        )
        self.container.spawn_managed_thread(
            self.client._callback_loop, identifier="Callback loop"
        )

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!
        print(payload)


class ConnectionService:

    name = 'connection'

    connection = Connection()

    @rpc
    def get_info(self):
        return self.connection.get_info()

Hi @sanseihappa,

Sorry nobody replied to this sooner.

When you say “publish an event”, do you mean a Nameko event? If so, it would be pretty straightforward to replace your print(payload) with a function that calls a standalone event dispatcher.

An alternative approach would be to turn wrap this third-party package into an entrypoint. In this arrangement you’d be able to decorate a service method and have it run every time the client produced a message. The service method could then of course also dispatch a Nameko event. For an example of this kind of arrangement, see https://github.com/nameko/nameko-bayeux-client.

Hey @mattbennett, no problem. I already seemed to have solved this using the standalone event dispatcher as you mention, resulting in code as seen below. However, I encountered a throughput issue and also had to add another thread to handle the dispatching of events into a sort of “batched” payload. Does that make sense?

Also, I’m looking at the alternative approach you suggest, and it appears to be following the example set forth in https://nameko.readthedocs.io/en/stable/writing_extensions.html#id2, correct?

import collections
import copy
import threading
import time

import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
from nameko.standalone.events import event_dispatcher


class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()


class Connection(DependencyProvider):

    def setup(self):
        self.deck = collections.deque()
        self.deck_lock = threading.Lock()
        self.last_publish_time = time.time()
        self.publish_time_expiry = 1.0
        self.publish_batch_length = 100

        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
        self.client.connect(config.get("SOME_ADDRESS"))
        self.container.spawn_managed_thread(
            self.client._receive_loop, identifier="Receive loop"
        )
        self.container.spawn_managed_thread(
            self.client._callback_loop, identifier="Callback loop"
        )
        self.container.spawn_managed_thread(
            self._dispatch_loop, identifier="Dispatch loop"
        )

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!
        self.deck.append(payload)

    def _dispatch_loop(self):
        while True:
            now = time.time()
            expired = now >= self.last_publish_time + self.publish_time_expiry
            exceeded = len(self.deck) >= self.publish_batch_length
            if exceeded or expired:
                self.last_publish_time = now
                with self.deck_lock:
                    items = list(copy.copy(self.deck))
                    self.deck.clear()
                event_dispatcher()("connection", "custom_event", items)
            time.sleep(0.0001)


class ConnectionService:

    name = 'connection'

    connection = Connection()

    @rpc
    def get_info(self):
        return self.connection.get_info()

Yes, makes perfect sense. The standalone event dispatcher is intended for single messages rather than continuous use in a loop.

Yes, that’s correct.

nameko-bayeux-client is a good real-world example because it has a separate client that handles a message stream, and turns the messages received by that client into payloads for a Nameko entrypoint – so you can trigger a Nameko service method when specific messages arrive.

Hi folks!
I’d like to join the discussion and I have a question.
I am working on nameko extension for Asterisk PBX (https://github.com/litnimax/nameko-ami)

I read nameko-bayeux-client source code.
It is understood how to use it for receiving messages.
But what about sending messages from a service?

I found nameko-salesforce addon where they achieve this.
But! As far as I understood they use different connections for getting events (streaming) and sending (api).
So it seems to mean that at least 2 connections are used.

Actually my task is to use at the same time entrypoints for receiving and client for sending.
If you see my nameko-ami source code you will see that I use AmiClientExtension to get the running AmiClient and use its client to send actions:

class AmiClientExtension(DependencyProvider):
    def get_dependency(self, worker_ctx):
        for ext in self.container.extensions:
            if isinstance(ext, AmiClient):
                return ext

But! If I do not call @ami(‘event’) decorator my client never does not start :-).
How do do it case when I do not need to receive any events but just to send actions?

I can only see that I need to create 2 different files:

  • ami_messages.py
  • ami_actions.py

So it’s nameko-salesforce approach. But I have to duplicate the code in this case.
I cannot have AmiClient that has all connect procedures and re-use like this:

from nameko_ami import AmiClient

class AmiEvents(AmiClient, SharedExtension, ProviderCollector): 
   ...

When I try to inherit from AmiClient(DependencyProvider) I get an error:

TypeError: bind() missing 1 required positional argument: 'attr_name'

Please advise! Thanks much!

Hi @litmax. I can’t figure out how to help here. Can you explain your questions a little more?

In principle there is no problem having a client object that both receives events and sends messages using the same connection if the protocol is bi-directional. An example entrypoint that does this is in nameko-grpc: https://github.com/nameko/nameko-grpc/blob/master/nameko_grpc/entrypoint.py

One other thing that may be helpful – the inspection of self.container.extensions in get_dependency in your snippet above seems very wrong. What are you trying to accomplish there?

Hi @mattbennett!

The full code is here - https://github.com/litnimax/nameko-ami/blob/master/nameko_ami/ami_client.py

And this is how I use it:

class AmiBroker:
    name = 'asterisk_ami'
    ami_client = AmiClientExtension()
    dispatch = EventDispatcher()

    @rpc
    def send_action(self, action):
        logger.debug('AMI send action: %s', action)
        result = self.ami_client.manager.send_action(action)
        logger.debug('Action result: %s', result)
        return {
            'headers': result.headers,
            'data': result.data,            
        }

    @ami('*')
    def on_ami_event(self, event, manager):
            self.dispatch(event.headers['Event'], {'headers': event.headers,
                                                   'data': event.data})

The idea is that AmiEventHandler(Entrypoint) has ami_client = AmiClient() connection to receive messages from Asterisk.
And asterisk_ami service uses the same connection to send events.
And I could not figure our how to achive this without this introspection hack.

Regards,
Max.

OK I see.

You should use three components to achieve this:

  • An Entrypoint to decorate service methods
  • A DependencyProvider to let service methods send events
  • A SharedExtension that owns all the connection management logic

Your Entrypoint should certainly use a SharedExtension of some sort, to manage a single connection that can be shared by all @ami entrypoints (see the Rpc entrypoint, which uses a shared RpcConsumer or the Grpc entrypoint, which uses a shared GrpcServer).

You could then also use this SharedExtension (call it AmiManager, for example) in your DependencyProvider:

class AmiClient(DependencyProvider):
    manager = AmiManager()

    def get_dependency(self, worker_ctx):
        """ A simplistic `get_dependency` that simply returns a method
            from the manager; would be better to return something with
            an appropriate API for service method usage here.
        """
        return self.manager.send_action

Then, as well as implementing the methods to distribute received messages amongst the registered entrypoints, your AmiManager would implement a send_action method to send an action back to Asterisk when the DependencyProvider was used.

Does that make sense?

1 Like

Thank you Matt.
I did exactly as you described and it works.