DependencyProvider that directly publishes events?

Some background:

I have a class in a third-party package that maintains a socket-level connection, and provides a method to send via the socket, and a receive loop method (intended to be run as a thread target) that receives and forms messages off that socket, and a callback loop method that then calls an arbitrary callable passing along that formed message (and is also intended to be run as a thread target).

I’ve incorporated an instance of this class into a DependencyProvider and now my goal is to publish events whenever the class callback method fires.

My question is how to accomplish this?

Example:

import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc


class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()


class Connection(DependencyProvider):

    def setup(self):
        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
        self.client.connect(config.get("SOME_ADDRESS"))
        self.container.spawn_managed_thread(
            self.client._receive_loop, identifier="Receive loop"
        )
        self.container.spawn_managed_thread(
            self.client._callback_loop, identifier="Callback loop"
        )

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!
        print(payload)


class ConnectionService:

    name = 'connection'

    connection = Connection()

    @rpc
    def get_info(self):
        return self.connection.get_info()

Hi @sanseihappa,

Sorry nobody replied to this sooner.

When you say “publish an event”, do you mean a Nameko event? If so, it would be pretty straightforward to replace your print(payload) with a function that calls a standalone event dispatcher.

An alternative approach would be to turn wrap this third-party package into an entrypoint. In this arrangement you’d be able to decorate a service method and have it run every time the client produced a message. The service method could then of course also dispatch a Nameko event. For an example of this kind of arrangement, see https://github.com/nameko/nameko-bayeux-client.

Hey @mattbennett, no problem. I already seemed to have solved this using the standalone event dispatcher as you mention, resulting in code as seen below. However, I encountered a throughput issue and also had to add another thread to handle the dispatching of events into a sort of “batched” payload. Does that make sense?

Also, I’m looking at the alternative approach you suggest, and it appears to be following the example set forth in https://nameko.readthedocs.io/en/stable/writing_extensions.html#id2, correct?

import collections
import copy
import threading
import time

import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
from nameko.standalone.events import event_dispatcher


class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()


class Connection(DependencyProvider):

    def setup(self):
        self.deck = collections.deque()
        self.deck_lock = threading.Lock()
        self.last_publish_time = time.time()
        self.publish_time_expiry = 1.0
        self.publish_batch_length = 100

        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
        self.client.connect(config.get("SOME_ADDRESS"))
        self.container.spawn_managed_thread(
            self.client._receive_loop, identifier="Receive loop"
        )
        self.container.spawn_managed_thread(
            self.client._callback_loop, identifier="Callback loop"
        )
        self.container.spawn_managed_thread(
            self._dispatch_loop, identifier="Dispatch loop"
        )

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!
        self.deck.append(payload)

    def _dispatch_loop(self):
        while True:
            now = time.time()
            expired = now >= self.last_publish_time + self.publish_time_expiry
            exceeded = len(self.deck) >= self.publish_batch_length
            if exceeded or expired:
                self.last_publish_time = now
                with self.deck_lock:
                    items = list(copy.copy(self.deck))
                    self.deck.clear()
                event_dispatcher()("connection", "custom_event", items)
            time.sleep(0.0001)


class ConnectionService:

    name = 'connection'

    connection = Connection()

    @rpc
    def get_info(self):
        return self.connection.get_info()

Yes, makes perfect sense. The standalone event dispatcher is intended for single messages rather than continuous use in a loop.

Yes, that’s correct.

nameko-bayeux-client is a good real-world example because it has a separate client that handles a message stream, and turns the messages received by that client into payloads for a Nameko entrypoint – so you can trigger a Nameko service method when specific messages arrive.

Hi folks!
I’d like to join the discussion and I have a question.
I am working on nameko extension for Asterisk PBX (https://github.com/litnimax/nameko-ami)

I read nameko-bayeux-client source code.
It is understood how to use it for receiving messages.
But what about sending messages from a service?

I found nameko-salesforce addon where they achieve this.
But! As far as I understood they use different connections for getting events (streaming) and sending (api).
So it seems to mean that at least 2 connections are used.

Actually my task is to use at the same time entrypoints for receiving and client for sending.
If you see my nameko-ami source code you will see that I use AmiClientExtension to get the running AmiClient and use its client to send actions:

class AmiClientExtension(DependencyProvider):
    def get_dependency(self, worker_ctx):
        for ext in self.container.extensions:
            if isinstance(ext, AmiClient):
                return ext

But! If I do not call @ami(‘event’) decorator my client never does not start :-).
How do do it case when I do not need to receive any events but just to send actions?

I can only see that I need to create 2 different files:

  • ami_messages.py
  • ami_actions.py

So it’s nameko-salesforce approach. But I have to duplicate the code in this case.
I cannot have AmiClient that has all connect procedures and re-use like this:

from nameko_ami import AmiClient

class AmiEvents(AmiClient, SharedExtension, ProviderCollector): 
   ...

When I try to inherit from AmiClient(DependencyProvider) I get an error:

TypeError: bind() missing 1 required positional argument: 'attr_name'

Please advise! Thanks much!