I have a class in a third-party package that maintains a socket-level connection, and provides a method to send via the socket, and a receive loop method (intended to be run as a thread target) that receives and forms messages off that socket, and a callback loop method that then calls an arbitrary callable passing along that formed message (and is also intended to be run as a thread target).
I’ve incorporated an instance of this class into a DependencyProvider and now my goal is to publish events whenever the class callback method fires.
My question is how to accomplish this?
Example:
import socketthing
from nameko import config
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
class ConnectionWrapper:
def __init__(self, client):
self.client = client
def get_info(self):
return self.client.get_connection_info()
class Connection(DependencyProvider):
def setup(self):
self.client = socketthing.SocketThing(spawn_threads=False)
self.client.callback = self.handler
self.client.connect(config.get("SOME_ADDRESS"))
self.container.spawn_managed_thread(
self.client._receive_loop, identifier="Receive loop"
)
self.container.spawn_managed_thread(
self.client._callback_loop, identifier="Callback loop"
)
def get_dependency(self, worker_ctx):
return ConnectionWrapper(self.client)
def handler(self, payload):
# Publish an event here instead!
print(payload)
class ConnectionService:
name = 'connection'
connection = Connection()
@rpc
def get_info(self):
return self.connection.get_info()
When you say “publish an event”, do you mean a Nameko event? If so, it would be pretty straightforward to replace your print(payload) with a function that calls a standalone event dispatcher.
An alternative approach would be to turn wrap this third-party package into an entrypoint. In this arrangement you’d be able to decorate a service method and have it run every time the client produced a message. The service method could then of course also dispatch a Nameko event. For an example of this kind of arrangement, see https://github.com/nameko/nameko-bayeux-client.
Hey @mattbennett, no problem. I already seemed to have solved this using the standalone event dispatcher as you mention, resulting in code as seen below. However, I encountered a throughput issue and also had to add another thread to handle the dispatching of events into a sort of “batched” payload. Does that make sense?
Yes, makes perfect sense. The standalone event dispatcher is intended for single messages rather than continuous use in a loop.
Yes, that’s correct.
nameko-bayeux-client is a good real-world example because it has a separate client that handles a message stream, and turns the messages received by that client into payloads for a Nameko entrypoint – so you can trigger a Nameko service method when specific messages arrive.
Hi folks!
I’d like to join the discussion and I have a question.
I am working on nameko extension for Asterisk PBX (https://github.com/litnimax/nameko-ami)
I read nameko-bayeux-client source code.
It is understood how to use it for receiving messages.
But what about sending messages from a service?
I found nameko-salesforce addon where they achieve this.
But! As far as I understood they use different connections for getting events (streaming) and sending (api).
So it seems to mean that at least 2 connections are used.
Actually my task is to use at the same time entrypoints for receiving and client for sending.
If you see my nameko-ami source code you will see that I use AmiClientExtension to get the running AmiClient and use its client to send actions:
class AmiClientExtension(DependencyProvider):
def get_dependency(self, worker_ctx):
for ext in self.container.extensions:
if isinstance(ext, AmiClient):
return ext
But! If I do not call @ami(‘event’) decorator my client never does not start :-).
How do do it case when I do not need to receive any events but just to send actions?
I can only see that I need to create 2 different files:
ami_messages.py
ami_actions.py
So it’s nameko-salesforce approach. But I have to duplicate the code in this case.
I cannot have AmiClient that has all connect procedures and re-use like this:
from nameko_ami import AmiClient
class AmiEvents(AmiClient, SharedExtension, ProviderCollector):
...
When I try to inherit from AmiClient(DependencyProvider) I get an error:
One other thing that may be helpful – the inspection of self.container.extensions in get_dependency in your snippet above seems very wrong. What are you trying to accomplish there?
The idea is that AmiEventHandler(Entrypoint) has ami_client = AmiClient() connection to receive messages from Asterisk.
And asterisk_ami service uses the same connection to send events.
And I could not figure our how to achive this without this introspection hack.
You could then also use this SharedExtension (call it AmiManager, for example) in your DependencyProvider:
class AmiClient(DependencyProvider):
manager = AmiManager()
def get_dependency(self, worker_ctx):
""" A simplistic `get_dependency` that simply returns a method
from the manager; would be better to return something with
an appropriate API for service method usage here.
"""
return self.manager.send_action
Then, as well as implementing the methods to distribute received messages amongst the registered entrypoints, your AmiManager would implement a send_action method to send an action back to Asterisk when the DependencyProvider was used.