DependencyProvider that directly publishes events?

Some background:

I have a class in a third-party package that maintains a socket-level connection, and provides a method to send via the socket, and a receive loop method (intended to be run as a thread target) that receives and forms messages off that socket, and a callback loop method that then calls an arbitrary callable passing along that formed message (and is also intended to be run as a thread target).

I’ve incorporated an instance of this class into a DependencyProvider and now my goal is to publish events whenever the class callback method fires.

My question is how to accomplish this?


import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc

class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()

class Connection(DependencyProvider):

    def setup(self):
        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
            self.client._receive_loop, identifier="Receive loop"
            self.client._callback_loop, identifier="Callback loop"

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!

class ConnectionService:

    name = 'connection'

    connection = Connection()

    def get_info(self):
        return self.connection.get_info()

Hi @sanseihappa,

Sorry nobody replied to this sooner.

When you say “publish an event”, do you mean a Nameko event? If so, it would be pretty straightforward to replace your print(payload) with a function that calls a standalone event dispatcher.

An alternative approach would be to turn wrap this third-party package into an entrypoint. In this arrangement you’d be able to decorate a service method and have it run every time the client produced a message. The service method could then of course also dispatch a Nameko event. For an example of this kind of arrangement, see

Hey @mattbennett, no problem. I already seemed to have solved this using the standalone event dispatcher as you mention, resulting in code as seen below. However, I encountered a throughput issue and also had to add another thread to handle the dispatching of events into a sort of “batched” payload. Does that make sense?

Also, I’m looking at the alternative approach you suggest, and it appears to be following the example set forth in, correct?

import collections
import copy
import threading
import time

import socketthing

from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
from import event_dispatcher

class ConnectionWrapper:
    def __init__(self, client):
        self.client = client

    def get_info(self):
        return self.client.get_connection_info()

class Connection(DependencyProvider):

    def setup(self):
        self.deck = collections.deque()
        self.deck_lock = threading.Lock()
        self.last_publish_time = time.time()
        self.publish_time_expiry = 1.0
        self.publish_batch_length = 100

        self.client = socketthing.SocketThing(spawn_threads=False)
        self.client.callback = self.handler
            self.client._receive_loop, identifier="Receive loop"
            self.client._callback_loop, identifier="Callback loop"
            self._dispatch_loop, identifier="Dispatch loop"

    def get_dependency(self, worker_ctx):
        return ConnectionWrapper(self.client)

    def handler(self, payload):
        # Publish an event here instead!

    def _dispatch_loop(self):
        while True:
            now = time.time()
            expired = now >= self.last_publish_time + self.publish_time_expiry
            exceeded = len(self.deck) >= self.publish_batch_length
            if exceeded or expired:
                self.last_publish_time = now
                with self.deck_lock:
                    items = list(copy.copy(self.deck))
                event_dispatcher()("connection", "custom_event", items)

class ConnectionService:

    name = 'connection'

    connection = Connection()

    def get_info(self):
        return self.connection.get_info()

Yes, makes perfect sense. The standalone event dispatcher is intended for single messages rather than continuous use in a loop.

Yes, that’s correct.

nameko-bayeux-client is a good real-world example because it has a separate client that handles a message stream, and turns the messages received by that client into payloads for a Nameko entrypoint – so you can trigger a Nameko service method when specific messages arrive.

Hi folks!
I’d like to join the discussion and I have a question.
I am working on nameko extension for Asterisk PBX (

I read nameko-bayeux-client source code.
It is understood how to use it for receiving messages.
But what about sending messages from a service?

I found nameko-salesforce addon where they achieve this.
But! As far as I understood they use different connections for getting events (streaming) and sending (api).
So it seems to mean that at least 2 connections are used.

Actually my task is to use at the same time entrypoints for receiving and client for sending.
If you see my nameko-ami source code you will see that I use AmiClientExtension to get the running AmiClient and use its client to send actions:

class AmiClientExtension(DependencyProvider):
    def get_dependency(self, worker_ctx):
        for ext in self.container.extensions:
            if isinstance(ext, AmiClient):
                return ext

But! If I do not call @ami(‘event’) decorator my client never does not start :-).
How do do it case when I do not need to receive any events but just to send actions?

I can only see that I need to create 2 different files:


So it’s nameko-salesforce approach. But I have to duplicate the code in this case.
I cannot have AmiClient that has all connect procedures and re-use like this:

from nameko_ami import AmiClient

class AmiEvents(AmiClient, SharedExtension, ProviderCollector): 

When I try to inherit from AmiClient(DependencyProvider) I get an error:

TypeError: bind() missing 1 required positional argument: 'attr_name'

Please advise! Thanks much!

Hi @litmax. I can’t figure out how to help here. Can you explain your questions a little more?

In principle there is no problem having a client object that both receives events and sends messages using the same connection if the protocol is bi-directional. An example entrypoint that does this is in nameko-grpc:

One other thing that may be helpful – the inspection of self.container.extensions in get_dependency in your snippet above seems very wrong. What are you trying to accomplish there?

Hi @mattbennett!

The full code is here -

And this is how I use it:

class AmiBroker:
    name = 'asterisk_ami'
    ami_client = AmiClientExtension()
    dispatch = EventDispatcher()

    def send_action(self, action):
        logger.debug('AMI send action: %s', action)
        result = self.ami_client.manager.send_action(action)
        logger.debug('Action result: %s', result)
        return {
            'headers': result.headers,

    def on_ami_event(self, event, manager):
            self.dispatch(event.headers['Event'], {'headers': event.headers,

The idea is that AmiEventHandler(Entrypoint) has ami_client = AmiClient() connection to receive messages from Asterisk.
And asterisk_ami service uses the same connection to send events.
And I could not figure our how to achive this without this introspection hack.


OK I see.

You should use three components to achieve this:

  • An Entrypoint to decorate service methods
  • A DependencyProvider to let service methods send events
  • A SharedExtension that owns all the connection management logic

Your Entrypoint should certainly use a SharedExtension of some sort, to manage a single connection that can be shared by all @ami entrypoints (see the Rpc entrypoint, which uses a shared RpcConsumer or the Grpc entrypoint, which uses a shared GrpcServer).

You could then also use this SharedExtension (call it AmiManager, for example) in your DependencyProvider:

class AmiClient(DependencyProvider):
    manager = AmiManager()

    def get_dependency(self, worker_ctx):
        """ A simplistic `get_dependency` that simply returns a method
            from the manager; would be better to return something with
            an appropriate API for service method usage here.
        return self.manager.send_action

Then, as well as implementing the methods to distribute received messages amongst the registered entrypoints, your AmiManager would implement a send_action method to send an action back to Asterisk when the DependencyProvider was used.

Does that make sense?

1 Like

Thank you Matt.
I did exactly as you described and it works.