Event Dispatcher `ConnectionResetError` when rabbitmq turned off/on

I have the following setup currently, with an EventDispatcher being munged
onto a dependency on the service class (I wont go into the reason why, but
it's needed on that dep).

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
dispatch.setup()

self.dispatch_spam('world_ended_event', {})

class MyService:
name = 'my_service'
subnamespace = SubNamespaceDep()

This works okay until rabbitmq restarts, at which point when the next
entrypoint call comes in, the following occurs:

File "./myapp/dependencies.py", line 530, in get_dependency
    dispatch.setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/events.py",
line 86, in setup
    super(EventDispatcher, self).setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/messaging.py",
line 161, in setup
    maybe_declare(exchange, conn)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 120, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 127, in _maybe_declare
    entity.declare()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/entity.py",
line 174, in declare
    nowait=nowait, passive=passive,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/channel.py",
line 615, in exchange_declare
    self._send_method((40, 10), args)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/abstract_channel.py",
line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/method_framing.py",
line 221, in write_method
    write_frame(1, channel, payload)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/transport.py",
line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 397, in sendall
    tail = self.send(data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 391, in send
    return self._send_loop(self.fd.send, data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 378, in _send_loop
    return send_method(data, *args)
ConnectionResetError: [Errno 104] Connection reset by peer

There's a number of hacks I could do to get this working, e.g:

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
try:
dispatch.setup()
except ConnectionResetError:
pass
self.dispatch_spam('world_ended_event', {})

but I'd rather not as I don't know any other side effects they could cause.

I assume it's all related to putting the EventDispatcher on the dependency?
Why does this code error if the connection should have already been
restablished?
(https://github.com/nameko/nameko/blob/master/nameko/messaging.py#L161)

Thanks in advance.

Hi Richard,

It is indeed related to using the EventDispatcher as you are.

The problem you're seeing is the exchange declaration
<https://github.com/nameko/nameko/blob/master/nameko/messaging.py#L161&gt;
fail during DependencyProvider.setup(). The problem occurs because kombu
pools connections in the background. If you grab a previously used
connection that has since gone idle, it raises. When this error happens
elsewhere (e.g. when publishing a message) it's automatically retried, and
you don't see it bubble out. The setup method is normally called when the
service starts, so the connection is always fresh, and the retry policy
isn't required.

The EventDispatcher DependencyProvider is a very light wrapper around the standalone
event dispatcher
<https://github.com/nameko/nameko/blob/master/nameko/standalone/events.py&gt;,
so you could probably avoid this problem by using that directly to dispatch
the message, rather than the DependencyProvider (which is not intended to
be manually manipulated as you are doing).

Hope that helps,

Matt.

···

On Friday, June 23, 2017 at 10:17:59 AM UTC+1, Richard O'Dwyer wrote:

I have the following setup currently, with an EventDispatcher being munged
onto a dependency on the service class (I wont go into the reason why, but
it's needed on that dep).

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
dispatch.setup()

self.dispatch_spam('world_ended_event', {})

class MyService:
name = 'my_service'
subnamespace = SubNamespaceDep()

This works okay until rabbitmq restarts, at which point when the next
entrypoint call comes in, the following occurs:

File "./myapp/dependencies.py", line 530, in get_dependency
    dispatch.setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/events.py",
line 86, in setup
    super(EventDispatcher, self).setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/messaging.py",
line 161, in setup
    maybe_declare(exchange, conn)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 120, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 127, in _maybe_declare
    entity.declare()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/entity.py",
line 174, in declare
    nowait=nowait, passive=passive,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/channel.py",
line 615, in exchange_declare
    self._send_method((40, 10), args)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/abstract_channel.py",
line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/method_framing.py",
line 221, in write_method
    write_frame(1, channel, payload)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/transport.py",
line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 397, in sendall
    tail = self.send(data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 391, in send
    return self._send_loop(self.fd.send, data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 378, in _send_loop
    return send_method(data, *args)
ConnectionResetError: [Errno 104] Connection reset by peer

There's a number of hacks I could do to get this working, e.g:

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
try:
dispatch.setup()
except ConnectionResetError:
pass
self.dispatch_spam('world_ended_event', {})

but I'd rather not as I don't know any other side effects they could
cause.

I assume it's all related to putting the EventDispatcher on the dependency?
Why does this code error if the connection should have already been
restablished? (
https://github.com/nameko/nameko/blob/master/nameko/messaging.py#L161\)

Thanks in advance.

Thanks, that seems to work fine. Here's the code for future reference for
anyone

from nameko.messaging import HeaderEncoder
from nameko.standalone.events import event_dispatcher

def get_event_dispatcher(config, worker_ctx):
   headers = HeaderEncoder().get_message_headers(worker_ctx)
   return event_dispatcher(config, headers=headers)

···

On Wednesday, June 28, 2017 at 2:17:01 PM UTC+1, Matt Yule-Bennett wrote:

Hi Richard,

It is indeed related to using the EventDispatcher as you are.

The problem you're seeing is the exchange declaration
<https://github.com/nameko/nameko/blob/master/nameko/messaging.py#L161&gt;
fail during DependencyProvider.setup(). The problem occurs because kombu
pools connections in the background. If you grab a previously used
connection that has since gone idle, it raises. When this error happens
elsewhere (e.g. when publishing a message) it's automatically retried, and
you don't see it bubble out. The setup method is normally called when the
service starts, so the connection is always fresh, and the retry policy
isn't required.

The EventDispatcher DependencyProvider is a very light wrapper around the standalone
event dispatcher
<https://github.com/nameko/nameko/blob/master/nameko/standalone/events.py&gt;,
so you could probably avoid this problem by using that directly to dispatch
the message, rather than the DependencyProvider (which is not intended to
be manually manipulated as you are doing).

Hope that helps,

Matt.

On Friday, June 23, 2017 at 10:17:59 AM UTC+1, Richard O'Dwyer wrote:

I have the following setup currently, with an EventDispatcher being
munged onto a dependency on the service class (I wont go into the reason
why, but it's needed on that dep).

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
dispatch.setup()

self.dispatch_spam('world_ended_event', {})

class MyService:
name = 'my_service'
subnamespace = SubNamespaceDep()

This works okay until rabbitmq restarts, at which point when the next
entrypoint call comes in, the following occurs:

File "./myapp/dependencies.py", line 530, in get_dependency
    dispatch.setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/events.py",
line 86, in setup
    super(EventDispatcher, self).setup()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/nameko/messaging.py",
line 161, in setup
    maybe_declare(exchange, conn)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 120, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/common.py",
line 127, in _maybe_declare
    entity.declare()
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/kombu/entity.py",
line 174, in declare
    nowait=nowait, passive=passive,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/channel.py",
line 615, in exchange_declare
    self._send_method((40, 10), args)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/abstract_channel.py",
line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/method_framing.py",
line 221, in write_method
    write_frame(1, channel, payload)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/amqp/transport.py",
line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 397, in sendall
    tail = self.send(data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 391, in send
    return self._send_loop(self.fd.send, data, flags)
  File
"/home/vagrant/.virtualenvs/myapp/lib/python3.6/site-packages/eventlet/greenio/base.py",
line 378, in _send_loop
    return send_method(data, *args)
ConnectionResetError: [Errno 104] Connection reset by peer

There's a number of hacks I could do to get this working, e.g:

class SubNamespaceDep(DependencyProvider):
def get_dependency(self, worker_ctx):
dispatch = EventDispatcher()
dispatch.container = self.container
try:
dispatch.setup()
except ConnectionResetError:
pass
self.dispatch_spam('world_ended_event', {})

but I'd rather not as I don't know any other side effects they could
cause.

I assume it's all related to putting the EventDispatcher on the
dependency?
Why does this code error if the connection should have already been
restablished? (
https://github.com/nameko/nameko/blob/master/nameko/messaging.py#L161\)

Thanks in advance.