Also the same issue with my attempt at the queue you used in nameko-sentry:
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue
class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)
def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)
class ThreadSafeClient:
def __init__(self):
self.queue = Queue()
def publish(self, message):
self.queue.put(message)
class PubSub(DependencyProvider):
def __init__(self, topic=None, **options):
self.topic = topic
self.options = options
def _run(self):
while True:
item = self.safe_client.queue.get()
if item is None:
break
self.client.publish(item)
del item
def start(self):
self._gt = self.container.spawn_managed_thread(
self._run)
def stop(self):
self.safe_client.queue.put(None)
if self._gt is not None:
self._gt.wait()
def setup(self):
config = self.container.config["PUBSUB"]
project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.topic_path = f"projects/{project}/topics/{self.topic}"
self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
self.safe_client = ThreadSafeClient()
self.client = EventPublisher(self.topic_path, credentials=self.credentials)
def get_dependency(self, worker_ctx):
return self.safe_client