Maintaining state in a dependency


#1

Hi,

I'm using the Google PubSub client to publish messages in a nameko service
supposed to handle a few hundred requests per second, using a custom
entrypoint to forward incoming messages from a websockets connection.

Unfortunately the PubSub client doesn't do well with being started up for
each request, it needs to be able to batch requests and basically persist
its datastore across workers.

Is it possible to have a dependency provider spawn a thread for the client,
or some other way to avoid reauthentication and losing the client batching
and state across requests?


#2

Using setup() in the dependency provider seems to be the answer here.

However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError:
HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries
exceeded with url: /o/oauth2/token (Caused by SSLError(SSLError("bad
handshake: SysCallError(-1, 'Unexpected EOF')",),))

My dependency provider:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class PubSub(DependencyProvider):
    def __init__(self, **options):
        self.options = options

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.client

The pubsub client has no problems publishing with the same configuration,
in the same virtual environment, outside of nameko.

···

On Saturday, March 10, 2018 at 6:26:27 PM UTC+2, Raymond A. Botha wrote:

Hi,

I'm using the Google PubSub client to publish messages in a nameko service
supposed to handle a few hundred requests per second, using a custom
entrypoint to forward incoming messages from a websockets connection.

Unfortunately the PubSub client doesn't do well with being started up for
each request, it needs to be able to batch requests and basically persist
its datastore across workers.

Is it possible to have a dependency provider spawn a thread for the
client, or some other way to avoid reauthentication and losing the client
batching and state across requests?


#3

Also the same issue with my attempt at the queue you used in nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#4

Is it possible to have a dependency provider spawn a thread for the
client, or some other way to avoid reauthentication and losing the client
batching and state across requests?

Yes. Use self.container.spawn_managed_thread. For an example, check the
nameko-slack
<https://github.com/iky/nameko-slack/blob/5455dc56363d6a7f019ec3ae4d3e1e32494e3715/nameko_slack/rtm.py#L50>
dependency. The class using spawn_managed_thread is an Extension not a
DependencyProvider, but the usage is the same.

Using setup() in the dependency provider seems to be the answer here.

However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='
accounts.google.com', port=443): Max retries exceeded with url:
/o/oauth2/token (Caused by SSLError(SSLError("bad handshake:
SysCallError(-1, 'Unexpected EOF')",),))

setup() is called once when the service container starts, so this is the
correct place to instantiate something that will be shared by all workers.
Whatever is returned from get_dependency must be thread-safe though. My
guess is the Google client object is not, which is why it's complaining
about an SSL handshake failing.

If the client is not thread-safe, options are to funnel all the actions
through a single thread (nameko_sentry
<https://github.com/mattbennett/nameko-sentry/blob/v0.0.2/nameko_sentry.py#L17-L28>
used to do this) or you could try this convenience wrapper
<https://gist.github.com/mattbennett/4587094f10694c028d29911932a24130> that
serializes method calls on an object that I've been experimenting with for
situations like this.

···

On Saturday, March 10, 2018 at 4:38:55 PM UTC, Raymond A. Botha wrote:

My dependency provider:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class PubSub(DependencyProvider):
    def __init__(self, **options):
        self.options = options

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.client

The pubsub client has no problems publishing with the same configuration,
in the same virtual environment, outside of nameko.

On Saturday, March 10, 2018 at 6:26:27 PM UTC+2, Raymond A. Botha wrote:

Hi,

I'm using the Google PubSub client to publish messages in a nameko
service supposed to handle a few hundred requests per second, using a
custom entrypoint to forward incoming messages from a websockets connection.

Unfortunately the PubSub client doesn't do well with being started up for
each request, it needs to be able to batch requests and basically persist
its datastore across workers.

Is it possible to have a dependency provider spawn a thread for the
client, or some other way to avoid reauthentication and losing the client
batching and state across requests?


#5

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's implemented
correctly there can only be one thread interacting the with client object
-- the one spawned to run self._run. My only suggestion would be to
instantiate the client in that thread too. According
to https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

···

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#6

Thanks Matt, it must be a threading issue. I gave your convenience wrapper
a shot like below:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from streaming.concurrency import ThreadSafeWrapper

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
        self.client = ThreadSafeWrapper(EventPublisher(self.topic_path, credentials=self.credentials))

    def get_dependency(self, worker_ctx):
        return self.client

However, this gave me the same error, full trace
here: https://gist.github.com/raybotha/f5de2f07386f0ca845a6d6d201b070c8

···

On Sunday, March 11, 2018 at 12:17:58 AM UTC+2, Matt Yule-Bennett wrote:

Is it possible to have a dependency provider spawn a thread for the

client, or some other way to avoid reauthentication and losing the client
batching and state across requests?

Yes. Use self.container.spawn_managed_thread. For an example, check the
nameko-slack
<https://github.com/iky/nameko-slack/blob/5455dc56363d6a7f019ec3ae4d3e1e32494e3715/nameko_slack/rtm.py#L50>
dependency. The class using spawn_managed_thread is an Extension not a
DependencyProvider, but the usage is the same.

On Saturday, March 10, 2018 at 4:38:55 PM UTC, Raymond A. Botha wrote:

Using setup() in the dependency provider seems to be the answer here.

However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='
accounts.google.com', port=443): Max retries exceeded with url:
/o/oauth2/token (Caused by SSLError(SSLError("bad handshake:
SysCallError(-1, 'Unexpected EOF')",),))

setup() is called once when the service container starts, so this is the
correct place to instantiate something that will be shared by all workers.
Whatever is returned from get_dependency must be thread-safe though. My
guess is the Google client object is not, which is why it's complaining
about an SSL handshake failing.

If the client is not thread-safe, options are to funnel all the actions
through a single thread (nameko_sentry
<https://github.com/mattbennett/nameko-sentry/blob/v0.0.2/nameko_sentry.py#L17-L28>
used to do this) or you could try this convenience wrapper
<https://gist.github.com/mattbennett/4587094f10694c028d29911932a24130>
that serializes method calls on an object that I've been experimenting with
for situations like this.

My dependency provider:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class PubSub(DependencyProvider):
    def __init__(self, **options):
        self.options = options

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.client

The pubsub client has no problems publishing with the same configuration,
in the same virtual environment, outside of nameko.

On Saturday, March 10, 2018 at 6:26:27 PM UTC+2, Raymond A. Botha wrote:

Hi,

I'm using the Google PubSub client to publish messages in a nameko
service supposed to handle a few hundred requests per second, using a
custom entrypoint to forward incoming messages from a websockets connection.

Unfortunately the PubSub client doesn't do well with being started up
for each request, it needs to be able to batch requests and basically
persist its datastore across workers.

Is it possible to have a dependency provider spawn a thread for the
client, or some other way to avoid reauthentication and losing the client
batching and state across requests?


#7

Thanks Matt, I've tried to isolate the client to its own thread as much as
I can think of doing, and as you've suggested, without luck. This is the
provider code
now: https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely due
to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

···

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's implemented
correctly there can only be one thread interacting the with client object
-- the one spawned to run self._run. My only suggestion would be to
instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#8

I don't think it can be a threading issue anymore. Your DependencyProvider
is serialising all the publishes into a single thread and looks correct.

You did need to make your DP thread-safe, but I no longer think that is the
root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022? I
know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but with
the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

···

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as much as
I can think of doing, and as you've suggested, without luck. This is the
provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely
due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#9

Just applying a simple monkey patch to the regular test code isn't causing
any
trouble: https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main
thread: https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related to
how httplib2 is using openssl.

···

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your DependencyProvider
is serialising all the publishes into a single thread and looks correct.

You did need to make your DP thread-safe, but I no longer think that is
the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022? I
know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but with
the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as much
as I can think of doing, and as you've suggested, without luck. This is the
provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely
due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#10

Spawning 100 greenthreads publishing messages are also not causing
trouble: https://gist.github.com/raybotha/d4df2d69c12d86ce357a6bd0785cdbdc

···

On Tuesday, March 13, 2018 at 5:56:22 PM UTC+2, Raymond A. Botha wrote:

Just applying a simple monkey patch to the regular test code isn't causing
any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related to
how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that is
the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022?
I know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as much
as I can think of doing, and as you've suggested, without luck. This is the
provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely
due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#11

You have to apply the monkey patch **before** imports, otherwise it doesn't
take effect

···

On Tuesday, March 13, 2018 at 3:56:22 PM UTC, Raymond Botha wrote:

Just applying a simple monkey patch to the regular test code isn't causing
any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related to
how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that is
the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022?
I know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as much
as I can think of doing, and as you've suggested, without luck. This is the
provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely
due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#12

Ah yes thanks, that broke it with the same error.

Hmm it seems like my options are to raise an issue in the google auth repo,
and possibly attempt a fork of it with a different http library.

Is it possible to run green threads alongside kernel threads? Or is it not
possible to run non-eventlet compatible code in a nameko service?

···

On Wednesday, March 14, 2018 at 6:16:42 PM UTC+2, Matt Yule-Bennett wrote:

You have to apply the monkey patch **before** imports, otherwise it
doesn't take effect

On Tuesday, March 13, 2018 at 3:56:22 PM UTC, Raymond Botha wrote:

Just applying a simple monkey patch to the regular test code isn't
causing any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related
to how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that is
the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022?
I know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as much
as I can think of doing, and as you've suggested, without luck. This is the
provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely
due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett wrote:

The convenience wrapper will only work if the object is safe *between* methods,
i.e. shared state is not manipulated by one method and then read by
another. I guess that's not the case here. It's more useful when you have
some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with a
shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#13

I think this should probably be considered a bug in Eventlet. Have you
tried using different versions of Eventlet?

If it comes to it, it is possible to run native threads.
See http://eventlet.net/doc/threading.html#tpool-simple-thread-pool

···

On Wednesday, March 14, 2018 at 5:05:30 PM UTC, Raymond Botha wrote:

Ah yes thanks, that broke it with the same error.

Hmm it seems like my options are to raise an issue in the google auth
repo, and possibly attempt a fork of it with a different http library.

Is it possible to run green threads alongside kernel threads? Or is it not
possible to run non-eventlet compatible code in a nameko service?

On Wednesday, March 14, 2018 at 6:16:42 PM UTC+2, Matt Yule-Bennett wrote:

You have to apply the monkey patch **before** imports, otherwise it
doesn't take effect

On Tuesday, March 13, 2018 at 3:56:22 PM UTC, Raymond Botha wrote:

Just applying a simple monkey patch to the regular test code isn't
causing any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related
to how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that is
the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022?
I know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as
much as I can think of doing, and as you've suggested, without luck. This
is the provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most
likely due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett >>>>> wrote:

The convenience wrapper will only work if the object is safe
*between* methods, i.e. shared state is not manipulated by one
method and then read by another. I guess that's not the case here. It's
more useful when you have some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with
a shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha >>>>>> wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#14

Thanks for the help last month Matt, but I still haven't got this working,
even with a tpool.
I opened an issue with eventlet a month ago though with a simple snippet to
reproduce the issue (https://github.com/eventlet/eventlet/issues/476).
It seems gRPC has added support for gevent in the latest release, but
eventlet is still incompatible.

···

On Thursday, March 15, 2018 at 12:06:06 PM UTC+2, Matt Yule-Bennett wrote:

I think this should probably be considered a bug in Eventlet. Have you
tried using different versions of Eventlet?

If it comes to it, it is possible to run native threads. See
http://eventlet.net/doc/threading.html#tpool-simple-thread-pool

On Wednesday, March 14, 2018 at 5:05:30 PM UTC, Raymond Botha wrote:

Ah yes thanks, that broke it with the same error.

Hmm it seems like my options are to raise an issue in the google auth
repo, and possibly attempt a fork of it with a different http library.

Is it possible to run green threads alongside kernel threads? Or is it
not possible to run non-eventlet compatible code in a nameko service?

On Wednesday, March 14, 2018 at 6:16:42 PM UTC+2, Matt Yule-Bennett wrote:

You have to apply the monkey patch **before** imports, otherwise it
doesn't take effect

On Tuesday, March 13, 2018 at 3:56:22 PM UTC, Raymond Botha wrote:

Just applying a simple monkey patch to the regular test code isn't
causing any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related
to how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that
is the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022?
I know you're not using requests but there's a lot of good info in there
about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as
much as I can think of doing, and as you've suggested, without luck. This
is the provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most
likely due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett >>>>>> wrote:

The convenience wrapper will only work if the object is safe
*between* methods, i.e. shared state is not manipulated by one
method and then read by another. I guess that's not the case here. It's
more useful when you have some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine with
a shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha >>>>>>> wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


#15

Damn. It's pretty unfortunate that the Google client libraries use a gRPC
client under the hood.

You should be able to integrate it using the tpool though; wrapping
eventlet-incompatible code is exactly its intended use-case. That will be a
lot easier than getting things going with gevent :wink:

···

On Monday, April 23, 2018 at 8:15:37 PM UTC+1, r...@invictuscapital.com wrote:

Thanks for the help last month Matt, but I still haven't got this working,
even with a tpool.
I opened an issue with eventlet a month ago though with a simple snippet
to reproduce the issue (https://github.com/eventlet/eventlet/issues/476).
It seems gRPC has added support for gevent in the latest release, but
eventlet is still incompatible.

On Thursday, March 15, 2018 at 12:06:06 PM UTC+2, Matt Yule-Bennett wrote:

I think this should probably be considered a bug in Eventlet. Have you
tried using different versions of Eventlet?

If it comes to it, it is possible to run native threads. See
http://eventlet.net/doc/threading.html#tpool-simple-thread-pool

On Wednesday, March 14, 2018 at 5:05:30 PM UTC, Raymond Botha wrote:

Ah yes thanks, that broke it with the same error.

Hmm it seems like my options are to raise an issue in the google auth
repo, and possibly attempt a fork of it with a different http library.

Is it possible to run green threads alongside kernel threads? Or is it
not possible to run non-eventlet compatible code in a nameko service?

On Wednesday, March 14, 2018 at 6:16:42 PM UTC+2, Matt Yule-Bennett >>> wrote:

You have to apply the monkey patch **before** imports, otherwise it
doesn't take effect

On Tuesday, March 13, 2018 at 3:56:22 PM UTC, Raymond Botha wrote:

Just applying a simple monkey patch to the regular test code isn't
causing any trouble:
https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation
accessing the client in the main thread:
https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow
related to how httplib2 is using openssl.

On Tuesday, March 13, 2018 at 5:09:06 PM UTC+2, Matt Yule-Bennett >>>>> wrote:

I don't think it can be a threading issue anymore. Your
DependencyProvider is serialising all the publishes into a single thread
and looks correct.

You did need to make your DP thread-safe, but I no longer think that
is the root cause of your problem.

Have you read through
https://github.com/requests/requests/issues/2022? I know you're not
using requests but there's a lot of good info in there about OpenSSL and
possible handshake failures.

One experiment to try would be using the client outside of Nameko but
with the Eventlet monkey patch applied. It may be that there's a bug in the
green implementation of OpenSSL.

On Monday, March 12, 2018 at 3:00:37 PM UTC, Raymond A. Botha wrote:

Thanks Matt, I've tried to isolate the client to its own thread as
much as I can think of doing, and as you've suggested, without luck. This
is the provider code now:
https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most
likely due to the Google client auth using httplib 2 (
https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17
)
This issue is discussed here
<https://developers.google.com/api-client-library/python/guide/thread_safety#the-httplib2http-objects-are-not-thread-safe>,
but without much context or relation to the PubSub usage - I'll have to
give it some hacking or something.

On Monday, March 12, 2018 at 10:00:01 AM UTC+2, Matt Yule-Bennett >>>>>>> wrote:

The convenience wrapper will only work if the object is safe
*between* methods, i.e. shared state is not manipulated by one
method and then read by another. I guess that's not the case here. It's
more useful when you have some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's
implemented correctly there can only be one thread interacting the with
client object -- the one spawned to run self._run. My only suggestion would
be to instantiate the client in that thread too. According to
https://developers.google.com/api-client-library/python/guide/thread_safety
the credentials object *is* thread-safe, so you should be fine
with a shared instance of that.

On Saturday, March 10, 2018 at 11:02:16 PM UTC, Raymond A. Botha >>>>>>>> wrote:

Also the same issue with my attempt at the queue you used in
nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue

class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)

class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)

class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()
        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client