handling IOError: Socket closed on idle services

Hi,

We're experiencing a lot of closed connection errors when a service is not
used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177, in run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma: no
cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line 404, in
consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line 275, in
drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.connection,
**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 95
, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303, in
drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366, in
_wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content = read_timeout(
timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337, in
read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 189,
in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket closed

It looks like Service B needs to reestablish its connection and so an error
is raised in Service A. I think that's what's happening because I deployed
our services in a separate environment where Service B is called on a
15-minute interval. In that environment, when Service A calls Service B,
the connection error does not get raised.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

2. If this is the expected behavior, what is the best way to handle this
IOError? I'd rather not do a try/catch for an IOError every time I make a
RPC call!

3. If this is an issue in amqp rather than nameko, what is the best way to
handle it?

Any help would be greatly appreciated!

Thanks,

Grace

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken, but
only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

Hi,

We're experiencing a lot of closed connection errors when a service is not
used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177, in
run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma: no
cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line 404,
in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line 275,
in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.connection,
**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line
95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303, in
drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366, in
_wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content = read_timeout(
timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337, in
read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 189
, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a traceback
in Service A. The two services' connections are independent. If Service A
managed to send a message to Service B, but Service B couldn't respond, the
behaviour you'd expect is for Service A to hang forever waiting for its
reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

No it's not. I would expect the connection to stay up in a healthy network
environment.

2. If this is the expected behavior, what is the best way to handle this
IOError? I'd rather not do a try/catch for an IOError every time I make a
RPC call!

Producers (including the RPC proxy) already have automatic retry if they
encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't always
notice immediately. The retry policy take cares of reestablishing the
connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here: https://github.com/onefinestay/nameko/pull/337

3. If this is an issue in amqp rather than nameko, what is the best way to
handle it?

The *best* way to handle flaky connections is with AMQP heartbeats, which
kombu unfortunately does not support. The heartbeat feature constantly
monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is to
enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and re-establish
themselves. If your process crashes after a traceback like the one above,
that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

···

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Thanks,

Grace

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a better
path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:

This is great news, as it means that Kombu is continuing to retry and is
working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

2. We do have one service, however, where the Consumer appears to be losing
a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it does
not try to setup the consumer again if an IOError is raised:

Since ConnectionError is caught, I would expect IOError to also be caught
and for a RpcConnectionError to get raised and for an attempt to be made to
set up the consumer again. Is there a particular reason why this isn't the
case?

I know there was a recent commit to handle IOError when the consumer is
first being set up:

.

Should it also be handled in get_message?

Thanks,
Grace

···

On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken, but
only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service is
not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177, in
run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma: no
cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line 404,
in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line 275,
in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.connection
, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303,
in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366,
in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content = read_timeout(
timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337,
in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line
189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a traceback
in Service A. The two services' connections are independent. If Service A
managed to send a message to Service B, but Service B couldn't respond, the
behaviour you'd expect is for Service A to hang forever waiting for its
reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

No it's not. I would expect the connection to stay up in a healthy network
environment.

2. If this is the expected behavior, what is the best way to handle this
IOError? I'd rather not do a try/catch for an IOError every time I make a
RPC call!

Producers (including the RPC proxy) already have automatic retry if they
encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't always
notice immediately. The retry policy take cares of reestablishing the
connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here: https://github.com/onefinestay/nameko/pull/337

3. If this is an issue in amqp rather than nameko, what is the best way
to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats, which
kombu unfortunately does not support. The heartbeat feature constantly
monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is to
enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

Hi Grace,

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a better
path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and is
working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when you
find it. Are you connecting to clustered and/or HA rabbit nodes? There is a
possibly-related forum post here:
https://groups.google.com/forum/#!topic/nameko-dev/5sPw81lPQ50

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it does
not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be caught
and for a RpcConnectionError to get raised and for an attempt to be made to
set up the consumer again. Is there a particular reason why this isn't the
case?

How come you're using the standalone RPC proxy in this service, rather than
the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't have the
same behaviour of continuing to re-establish the connection. This could
probably be improved, but the behaviour you're seeing is expected.

···

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

I know there was a recent commit to handle IOError when the consumer is
first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett > wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken, but
only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service is
not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177, in
run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma: no
cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line 404,
in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line 275,
in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303,
in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366,
in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content = read_timeout
(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337,
in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line
189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a traceback
in Service A. The two services' connections are independent. If Service A
managed to send a message to Service B, but Service B couldn't respond, the
behaviour you'd expect is for Service A to hang forever waiting for its
reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle this
IOError? I'd rather not do a try/catch for an IOError every time I make a
RPC call!

Producers (including the RPC proxy) already have automatic retry if they
encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't always
notice immediately. The retry policy take cares of reestablishing the
connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here: https://github.com/onefinestay/nameko/pull/337

3. If this is an issue in amqp rather than nameko, what is the best way
to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is to
enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

Grace went on vacation today, so I'm backfilling the knowledge that we have!

We use Compose <http://compose.com> for our RabbitMQ cluster and it uses a
cluster of RabbitMQ nodes with two HAProxy proxies in front of the cluster.
Last night, we reached out to Compose to see what the connection timeout
settings were (and whether they were configurable) and they sent us this:

timeout client 3h
timeout server 3h
timeout connect 10s
timeout tunnel 1d
timeout client-fin 1h
timeout check 5s

This aligns with our debugging -- the HAProxy nodes are terminating
inactive connections -- in the RpcProxy services, the connection is
automatically recreated on the next attempt, but in the standalone RPC
proxy, the connection is failing to be recreated after the IOError happens.

We're using the standalone RPC proxy because we're using it outside of a
Nameko service in a Flask app. The traceback doesn't show it, but the error
is actually from a Nameko ClusterRpcProxy created and managed by the
flask-nameko
<https://github.com/clef/flask-nameko/blob/master/flask_nameko/proxies.py#L31&gt; we've
started maintaining. My sense is that this usage is correct and there's no
way we could use the DependencyProvider for this use case, correct?

Two follow up questions:

1. Regardless of the lack of retry functionality in the standalone proxy,
should the IOError itself be captured and re-raised, with the same logic as
the ConnectionError
<https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L149&gt;?
The full crash right now seems inappropriate since it's a non-nameko
namespaced error (which leads to confusion) and since none of the cleanup
logic happens.

2. Is there a reason the standalone proxy doesn't follow the same pattern
of continuing to re-establish the connection? If not, at what layer of
abstraction would it make sense to go about implementing this reconnect /
retry functionality (for instance, would it be better handled in our
flask-nameko library or in nameko itself? if in nameko itself, would it be
best handled in the PollingQueueConsumer?)?

Thanks for all your help on this Matt!

···

On Friday, September 9, 2016 at 8:43:39 AM UTC-7, Matt Yule-Bennett wrote:

Hi Grace,

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a better
path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and is
working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when you
find it. Are you connecting to clustered and/or HA rabbit nodes? There is a
possibly-related forum post here:
Redirecting to Google Groups

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it does
not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be caught
and for a RpcConnectionError to get raised and for an attempt to be made to
set up the consumer again. Is there a particular reason why this isn't the
case?

How come you're using the standalone RPC proxy in this service, rather
than the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't have
the same behaviour of continuing to re-establish the connection. This could
probably be improved, but the behaviour you're seeing is expected.

I know there was a recent commit to handle IOError when the consumer is
first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett >> wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken, but
only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service is
not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177, in
run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma:
no cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line 404
, in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line 275
, in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303,
in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366,
in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content =
read_timeout(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337,
in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line
189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a
traceback in Service A. The two services' connections are independent. If
Service A managed to send a message to Service B, but Service B couldn't
respond, the behaviour you'd expect is for Service A to hang forever
waiting for its reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle
this IOError? I'd rather not do a try/catch for an IOError every time I
make a RPC call!

Producers (including the RPC proxy) already have automatic retry if they
encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't
always notice immediately. The retry policy take cares of reestablishing
the connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here:
Enable confirms for all amqp publishers by mattbennett · Pull Request #337 · nameko/nameko · GitHub

3. If this is an issue in amqp rather than nameko, what is the best way
to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is to
enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

After thinking about this more, I came to the conclusion that this would
actually be best solved by implementing pool recycling in the connection
pool implementation we are using with the standalone proxy. The standalone
proxy by itself isn't built to contain a long-running connection, so I
think this is handling the issue at the right layer of abstraction. If
you're interested in that solution, you can check out the PR here
<https://github.com/clef/flask-nameko/pull/4&gt;\.

···

On Friday, September 9, 2016 at 10:12:46 AM UTC-7, Jesse Pollak wrote:

Grace went on vacation today, so I'm backfilling the knowledge that we
have!

We use Compose <http://compose.com> for our RabbitMQ cluster and it uses
a cluster of RabbitMQ nodes with two HAProxy proxies in front of the
cluster. Last night, we reached out to Compose to see what the connection
timeout settings were (and whether they were configurable) and they sent us
this:

timeout client 3h
timeout server 3h
timeout connect 10s
timeout tunnel 1d
timeout client-fin 1h
timeout check 5s

This aligns with our debugging -- the HAProxy nodes are terminating
inactive connections -- in the RpcProxy services, the connection is
automatically recreated on the next attempt, but in the standalone RPC
proxy, the connection is failing to be recreated after the IOError happens.

We're using the standalone RPC proxy because we're using it outside of a
Nameko service in a Flask app. The traceback doesn't show it, but the error
is actually from a Nameko ClusterRpcProxy created and managed by the
flask-nameko
<https://github.com/clef/flask-nameko/blob/master/flask_nameko/proxies.py#L31&gt; we've
started maintaining. My sense is that this usage is correct and there's no
way we could use the DependencyProvider for this use case, correct?

Two follow up questions:

1. Regardless of the lack of retry functionality in the standalone proxy,
should the IOError itself be captured and re-raised, with the same logic
as the ConnectionError
<https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L149&gt;?
The full crash right now seems inappropriate since it's a non-nameko
namespaced error (which leads to confusion) and since none of the cleanup
logic happens.

2. Is there a reason the standalone proxy doesn't follow the same pattern
of continuing to re-establish the connection? If not, at what layer of
abstraction would it make sense to go about implementing this reconnect /
retry functionality (for instance, would it be better handled in our
flask-nameko library or in nameko itself? if in nameko itself, would it be
best handled in the PollingQueueConsumer?)?

Thanks for all your help on this Matt!

On Friday, September 9, 2016 at 8:43:39 AM UTC-7, Matt Yule-Bennett wrote:

Hi Grace,

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a
better path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and is
working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when you
find it. Are you connecting to clustered and/or HA rabbit nodes? There is a
possibly-related forum post here:
Redirecting to Google Groups

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it
does not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be
caught and for a RpcConnectionError to get raised and for an attempt to be
made to set up the consumer again. Is there a particular reason why this
isn't the case?

How come you're using the standalone RPC proxy in this service, rather
than the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't have
the same behaviour of continuing to re-establish the connection. This could
probably be improved, but the behaviour you're seeing is expected.

I know there was a recent commit to handle IOError when the consumer is
first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett >>> wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken,
but only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service is
not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint for
Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177,
in run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma:
no cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line
404, in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line
275, in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 303
, in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 366
, in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content =
read_timeout(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line 337
, in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py",
line 189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a
traceback in Service A. The two services' connections are independent. If
Service A managed to send a message to Service B, but Service B couldn't
respond, the behaviour you'd expect is for Service A to hang forever
waiting for its reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle
this IOError? I'd rather not do a try/catch for an IOError every time I
make a RPC call!

Producers (including the RPC proxy) already have automatic retry if
they encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't
always notice immediately. The retry policy take cares of reestablishing
the connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here:
Enable confirms for all amqp publishers by mattbennett · Pull Request #337 · nameko/nameko · GitHub

3. If this is an issue in amqp rather than nameko, what is the best
way to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is to
enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

As you discovered in ResourceLocked: Queue.declare: (405) RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'rpc.reply-standalone_rpc_proxy' ... · Issue #359 · nameko/nameko · GitHub, the
standalone proxy doesn't cope well with disconnections. I'm sure there's a
better way to manage the connection there, but I don't know exactly how to
do so with Kombu. Catching the IOError would be an improvement, but there
would still be a problem binding a new consumer to the exclusive queue. You
might find that instead of bubbling the IOError, you see the ResourceLocked
error more frequently.

Connection recycling is a nice clean solution to your problem with HA
proxy. The only thing it won't help you recover from is intermittent
network failures.

AMQP heartbeats would solve both problems, because HA proxy wouldn't see
the connections as idle anymore, and intermittent errors would (probably)
be recovered before the connection was needed. AMQP heartbeats still aren't
supported by kombu though, so this is just blue-sky thinking.

···

On Monday, September 19, 2016 at 6:11:45 PM UTC+1, Jesse Pollak wrote:

After thinking about this more, I came to the conclusion that this would
actually be best solved by implementing pool recycling in the connection
pool implementation we are using with the standalone proxy. The standalone
proxy by itself isn't built to contain a long-running connection, so I
think this is handling the issue at the right layer of abstraction. If
you're interested in that solution, you can check out the PR here
<https://github.com/clef/flask-nameko/pull/4&gt;\.

On Friday, September 9, 2016 at 10:12:46 AM UTC-7, Jesse Pollak wrote:

Grace went on vacation today, so I'm backfilling the knowledge that we
have!

We use Compose <http://compose.com> for our RabbitMQ cluster and it uses
a cluster of RabbitMQ nodes with two HAProxy proxies in front of the
cluster. Last night, we reached out to Compose to see what the connection
timeout settings were (and whether they were configurable) and they sent us
this:

timeout client 3h
timeout server 3h
timeout connect 10s
timeout tunnel 1d
timeout client-fin 1h
timeout check 5s

This aligns with our debugging -- the HAProxy nodes are terminating
inactive connections -- in the RpcProxy services, the connection is
automatically recreated on the next attempt, but in the standalone RPC
proxy, the connection is failing to be recreated after the IOError happens.

We're using the standalone RPC proxy because we're using it outside of a
Nameko service in a Flask app. The traceback doesn't show it, but the error
is actually from a Nameko ClusterRpcProxy created and managed by the
flask-nameko
<https://github.com/clef/flask-nameko/blob/master/flask_nameko/proxies.py#L31&gt; we've
started maintaining. My sense is that this usage is correct and there's no
way we could use the DependencyProvider for this use case, correct?

Two follow up questions:

1. Regardless of the lack of retry functionality in the standalone proxy,
should the IOError itself be captured and re-raised, with the same logic
as the ConnectionError
<https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L149&gt;?
The full crash right now seems inappropriate since it's a non-nameko
namespaced error (which leads to confusion) and since none of the cleanup
logic happens.

2. Is there a reason the standalone proxy doesn't follow the same pattern
of continuing to re-establish the connection? If not, at what layer of
abstraction would it make sense to go about implementing this reconnect /
retry functionality (for instance, would it be better handled in our
flask-nameko library or in nameko itself? if in nameko itself, would it be
best handled in the PollingQueueConsumer?)?

Thanks for all your help on this Matt!

On Friday, September 9, 2016 at 8:43:39 AM UTC-7, Matt Yule-Bennett wrote:

Hi Grace,

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a
better path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and
is working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when
you find it. Are you connecting to clustered and/or HA rabbit nodes? There
is a possibly-related forum post here:
Redirecting to Google Groups

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it
does not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be
caught and for a RpcConnectionError to get raised and for an attempt to be
made to set up the consumer again. Is there a particular reason why this
isn't the case?

How come you're using the standalone RPC proxy in this service, rather
than the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't have
the same behaviour of continuing to re-establish the connection. This could
probably be improved, but the behaviour you're seeing is expected.

I know there was a recent commit to handle IOError when the consumer is
first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett >>>> wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken,
but only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service
is not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint
for Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177,
in run

  Sep 06 10:33:22: for _ in self.consume(limit=None): # pragma:
no cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line
404, in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line
275, in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
303, in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
366, in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content =
read_timeout(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
337, in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py",
line 189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so an
error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a
traceback in Service A. The two services' connections are independent. If
Service A managed to send a message to Service B, but Service B couldn't
respond, the behaviour you'd expect is for Service A to hang forever
waiting for its reply.

I don't know very much about amqp internals but my main questions are:

1. Is the automatic closing of idle connections the expected
behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle
this IOError? I'd rather not do a try/catch for an IOError every time I
make a RPC call!

Producers (including the RPC proxy) already have automatic retry if
they encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't
always notice immediately. The retry policy take cares of reestablishing
the connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here:
Enable confirms for all amqp publishers by mattbennett · Pull Request #337 · nameko/nameko · GitHub

3. If this is an issue in amqp rather than nameko, what is the best
way to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is
to enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

Is there a reason why we couldn't use the ConsumerMixin for the standalone
proxy? I'm guessing there's some limitation that makes it unsuitable, but
I'm not sure what that is.

WRT heartbeats, I know Kombu doesn't support them out of the box, but do
you think it would be possible to implement them natively in nameko with
the heartbeat_check function they provide
<Connection - kombu.connection — Kombu 5.3.4 documentation?
Mostly asking from an architectural perspective whether you think this
would be possible -- if so I may explore what an implementation would look
like.

···

On Thursday, September 22, 2016 at 7:39:46 AM UTC-7, Matt Yule-Bennett wrote:

As you discovered in ResourceLocked: Queue.declare: (405) RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'rpc.reply-standalone_rpc_proxy' ... · Issue #359 · nameko/nameko · GitHub,
the standalone proxy doesn't cope well with disconnections. I'm sure
there's a better way to manage the connection there, but I don't know
exactly how to do so with Kombu. Catching the IOError would be an
improvement, but there would still be a problem binding a new consumer to
the exclusive queue. You might find that instead of bubbling the IOError,
you see the ResourceLocked error more frequently.

Connection recycling is a nice clean solution to your problem with HA
proxy. The only thing it won't help you recover from is intermittent
network failures.

AMQP heartbeats would solve both problems, because HA proxy wouldn't see
the connections as idle anymore, and intermittent errors would (probably)
be recovered before the connection was needed. AMQP heartbeats still aren't
supported by kombu though, so this is just blue-sky thinking.

On Monday, September 19, 2016 at 6:11:45 PM UTC+1, Jesse Pollak wrote:

After thinking about this more, I came to the conclusion that this would
actually be best solved by implementing pool recycling in the connection
pool implementation we are using with the standalone proxy. The standalone
proxy by itself isn't built to contain a long-running connection, so I
think this is handling the issue at the right layer of abstraction. If
you're interested in that solution, you can check out the PR here
<https://github.com/clef/flask-nameko/pull/4&gt;\.

On Friday, September 9, 2016 at 10:12:46 AM UTC-7, Jesse Pollak wrote:

Grace went on vacation today, so I'm backfilling the knowledge that we
have!

We use Compose <http://compose.com> for our RabbitMQ cluster and it
uses a cluster of RabbitMQ nodes with two HAProxy proxies in front of the
cluster. Last night, we reached out to Compose to see what the connection
timeout settings were (and whether they were configurable) and they sent us
this:

timeout client 3h
timeout server 3h
timeout connect 10s
timeout tunnel 1d
timeout client-fin 1h
timeout check 5s

This aligns with our debugging -- the HAProxy nodes are terminating
inactive connections -- in the RpcProxy services, the connection is
automatically recreated on the next attempt, but in the standalone RPC
proxy, the connection is failing to be recreated after the IOError happens.

We're using the standalone RPC proxy because we're using it outside of a
Nameko service in a Flask app. The traceback doesn't show it, but the error
is actually from a Nameko ClusterRpcProxy created and managed by the
flask-nameko
<https://github.com/clef/flask-nameko/blob/master/flask_nameko/proxies.py#L31&gt; we've
started maintaining. My sense is that this usage is correct and there's no
way we could use the DependencyProvider for this use case, correct?

Two follow up questions:

1. Regardless of the lack of retry functionality in the standalone
proxy, should the IOError itself be captured and re-raised, with the same
logic as the ConnectionError
<https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L149&gt;?
The full crash right now seems inappropriate since it's a non-nameko
namespaced error (which leads to confusion) and since none of the cleanup
logic happens.

2. Is there a reason the standalone proxy doesn't follow the same
pattern of continuing to re-establish the connection? If not, at what layer
of abstraction would it make sense to go about implementing this reconnect
/ retry functionality (for instance, would it be better handled in our
flask-nameko library or in nameko itself? if in nameko itself, would it be
best handled in the PollingQueueConsumer?)?

Thanks for all your help on this Matt!

On Friday, September 9, 2016 at 8:43:39 AM UTC-7, Matt Yule-Bennett >>> wrote:

Hi Grace,

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a
better path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and
is working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when
you find it. Are you connecting to clustered and/or HA rabbit nodes? There
is a possibly-related forum post here:
Redirecting to Google Groups

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it
does not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be
caught and for a RpcConnectionError to get raised and for an attempt to be
made to set up the consumer again. Is there a particular reason why this
isn't the case?

How come you're using the standalone RPC proxy in this service, rather
than the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't have
the same behaviour of continuing to re-establish the connection. This could
probably be improved, but the behaviour you're seeing is expected.

I know there was a recent commit to handle IOError when the consumer
is first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett >>>>> wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken,
but only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service
is not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint
for Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177,
in run

  Sep 06 10:33:22: for _ in self.consume(limit=None): #
pragma: no cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py", line
404, in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py", line
275, in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
303, in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
366, in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content =
read_timeout(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
337, in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py",
line 189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection and
attempted to reconnect, but found the socket to be closed. Does the process
crash at this point? If so, it's unusual; you'd normally expect Kombu to
continue to retry with an increasing backoff until the connection is
re-established.

It looks like Service B needs to reestablish its connection and so
an error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a
traceback in Service A. The two services' connections are independent. If
Service A managed to send a message to Service B, but Service B couldn't
respond, the behaviour you'd expect is for Service A to hang forever
waiting for its reply.

I don't know very much about amqp internals but my main questions
are:

1. Is the automatic closing of idle connections the expected
behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle
this IOError? I'd rather not do a try/catch for an IOError every time I
make a RPC call!

Producers (including the RPC proxy) already have automatic retry if
they encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't
always notice immediately. The retry policy take cares of reestablishing
the connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here:
Enable confirms for all amqp publishers by mattbennett · Pull Request #337 · nameko/nameko · GitHub

3. If this is an issue in amqp rather than nameko, what is the best
way to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is
to enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

Is there a reason why we couldn't use the ConsumerMixin for the standalone
proxy? I'm guessing there's some limitation that makes it unsuitable, but
I'm not sure what that is.

I don't remember trying to use the ConsumerMixin, and I don't know why we
didn't (or whether we did and I just don't remember). Perhaps we got hung
up on not using the existing QueueConsumer because that relies on Eventlet,
and never tried to use the mixin in a different way.

WRT heartbeats, I know Kombu doesn't support them out of the box, but do
you think it would be possible to implement them natively in nameko with
the heartbeat_check function they provide
<Redirect Notice?
Mostly asking from an architectural perspective whether you think this
would be possible -- if so I may explore what an implementation would look
like.

It certainly could be implemented natively, at least for consumer
connections. The publishers don't have much machinery though -- we just
grab a kombu's producer object and let it use its own connection pool.

I've been wanting to move away from kombu for a while. We talked about it
quite a lot in the offline meetup
<Redirecting to Google Groups. There are
a bunch of alternative AMQP libraries (pyamqp, pika, amqpy, puka, haigha,
txamqp, asynqp, ...!) but they all use different concurrency mechanisms.
Our ideal requirements here are:

a) eventlet compatible or, better, completely thread-safe
b) works with py2 and py3
c) asynchronous
d) supports heartbeats
e) supports connection pools shared between threads

kombu (which uses pyamqp under the hood) gives us only a) and b). pika may
give us everything, but we'd have to use its callback-based API. amqpy
looks very promising, and seems to have added py2 compatibility since I saw
it last.

If you wanted to explore an implementation of using an alternative AMQP
library it would be very interesting. The standalone RPC proxy would be a
nice place to start because a) it needs to work in pure python b) it has a
publisher and consumer and c) it's otherwise relatively small and
standalone.

···

On Thursday, September 22, 2016 at 5:05:56 PM UTC+1, Jesse Pollak wrote:

On Thursday, September 22, 2016 at 7:39:46 AM UTC-7, Matt Yule-Bennett > wrote:

As you discovered in ResourceLocked: Queue.declare: (405) RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'rpc.reply-standalone_rpc_proxy' ... · Issue #359 · nameko/nameko · GitHub,
the standalone proxy doesn't cope well with disconnections. I'm sure
there's a better way to manage the connection there, but I don't know
exactly how to do so with Kombu. Catching the IOError would be an
improvement, but there would still be a problem binding a new consumer to
the exclusive queue. You might find that instead of bubbling the IOError,
you see the ResourceLocked error more frequently.

Connection recycling is a nice clean solution to your problem with HA
proxy. The only thing it won't help you recover from is intermittent
network failures.

AMQP heartbeats would solve both problems, because HA proxy wouldn't see
the connections as idle anymore, and intermittent errors would (probably)
be recovered before the connection was needed. AMQP heartbeats still aren't
supported by kombu though, so this is just blue-sky thinking.

On Monday, September 19, 2016 at 6:11:45 PM UTC+1, Jesse Pollak wrote:

After thinking about this more, I came to the conclusion that this would
actually be best solved by implementing pool recycling in the connection
pool implementation we are using with the standalone proxy. The standalone
proxy by itself isn't built to contain a long-running connection, so I
think this is handling the issue at the right layer of abstraction. If
you're interested in that solution, you can check out the PR here
<https://github.com/clef/flask-nameko/pull/4&gt;\.

On Friday, September 9, 2016 at 10:12:46 AM UTC-7, Jesse Pollak wrote:

Grace went on vacation today, so I'm backfilling the knowledge that we
have!

We use Compose <http://compose.com> for our RabbitMQ cluster and it
uses a cluster of RabbitMQ nodes with two HAProxy proxies in front of the
cluster. Last night, we reached out to Compose to see what the connection
timeout settings were (and whether they were configurable) and they sent us
this:

timeout client 3h
timeout server 3h
timeout connect 10s
timeout tunnel 1d
timeout client-fin 1h
timeout check 5s

This aligns with our debugging -- the HAProxy nodes are terminating
inactive connections -- in the RpcProxy services, the connection is
automatically recreated on the next attempt, but in the standalone RPC
proxy, the connection is failing to be recreated after the IOError happens.

We're using the standalone RPC proxy because we're using it outside of
a Nameko service in a Flask app. The traceback doesn't show it, but the
error is actually from a Nameko ClusterRpcProxy created and managed by the
flask-nameko
<https://github.com/clef/flask-nameko/blob/master/flask_nameko/proxies.py#L31&gt; we've
started maintaining. My sense is that this usage is correct and there's no
way we could use the DependencyProvider for this use case, correct?

Two follow up questions:

1. Regardless of the lack of retry functionality in the standalone
proxy, should the IOError itself be captured and re-raised, with the same
logic as the ConnectionError
<https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L149&gt;?
The full crash right now seems inappropriate since it's a non-nameko
namespaced error (which leads to confusion) and since none of the cleanup
logic happens.

2. Is there a reason the standalone proxy doesn't follow the same
pattern of continuing to re-establish the connection? If not, at what layer
of abstraction would it make sense to go about implementing this reconnect
/ retry functionality (for instance, would it be better handled in our
flask-nameko library or in nameko itself? if in nameko itself, would it be
best handled in the PollingQueueConsumer?)?

Thanks for all your help on this Matt!

On Friday, September 9, 2016 at 8:43:39 AM UTC-7, Matt Yule-Bennett >>>> wrote:

Hi Grace,

On Friday, September 9, 2016 at 8:35:57 AM UTC+1, Grace Wong wrote:

Hi Matt,

Thanks for your comments! They've definitely helped to get us on a
better path for figuring out what's going on.

It appears we have two separate issues:

1. Our connections are being broken sometimes.

But unlike what I thought in my earlier post, the Kombu Consumer is
detecting the broken connection and does reestablish connection. Originally
I thought the Kombu Consumer was not reestablishing connection, but that
was due to some poor log formatting on our end. In the traceback I posted,
we're actually logging a warning from Kombu, which includes exc_info:
https://github.com/celery/kombu/blob/master/kombu/mixins.py#L181

This is great news, as it means that Kombu is continuing to retry and
is working as expected. Our services are connecting to rabbitmq through
HAProxy, so we are looking into the network environment to figure out the
cause of the broken connections.

Cool. I'd be interested to hear what the problem with HAProxy is when
you find it. Are you connecting to clustered and/or HA rabbit nodes? There
is a possibly-related forum post here:
Redirecting to Google Groups

2. We do have one service, however, where the Consumer appears to be
losing a connection and is not getting restarted. Here's the traceback:

IOError: Socket closed
File "flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "pages/entrypoint.py", line 23, in dispatch_request
browser = self.store.load_or_create()
File "pages/stores/browser.py", line 17, in load_or_create
browser = self.load()
File "pages/stores/browser.py", line 14, in load
return self.service.get(browser_id)
File "nameko/rpc.py", line 352, in __call__
return reply.result()
File "nameko/rpc.py", line 333, in result
self.resp_body = self.reply_event.wait()
File "nameko/standalone/rpc.py", line 57, in wait
self.queue_consumer.get_message(self.correlation_id)
File "nameko/standalone/rpc.py", line 133, in get_message
timeout=self.timeout
File "kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "amqp/connection.py", line 303, in drain_events
chanmap, None, timeout=timeout,
File "amqp/connection.py", line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "amqp/method_framing.py", line 189, in read_method
raise m

It looks like when the PollingQueueConsumer tries to get_message, it
does not try to setup the consumer again if an IOError is raised:
https://github.com/onefinestay/nameko/blob/master/nameko/standalone/rpc.py#L128

Since ConnectionError is caught, I would expect IOError to also be
caught and for a RpcConnectionError to get raised and for an attempt to be
made to set up the consumer again. Is there a particular reason why this
isn't the case?

How come you're using the standalone RPC proxy in this service, rather
than the RpcProxy DependencyProvider?

The standalone proxy doesn't use kombu's ConsumerMixin, so doesn't
have the same behaviour of continuing to re-establish the connection. This
could probably be improved, but the behaviour you're seeing is expected.

I know there was a recent commit to handle IOError when the consumer
is first being set up:
Catch IOError in PollingQueueConsumer · nameko/nameko@5065d2d · GitHub
.

Should it also be handled in get_message?

Thanks,
Grace
On Thursday, September 8, 2016 at 7:50:03 AM UTC-7, Matt Yule-Bennett >>>>>> wrote:

Hi Grace,

Do your services connect directly to the RabbitMQ broker, or via a
load-balancer of some sort?

We have experienced some difficulties with connections being broken,
but only in noisy or unhealthy network environments.

I've put more comments inline below.

Matt.

On Tuesday, September 6, 2016 at 11:22:20 PM UTC+1, Grace Wong wrote:

Hi,

We're experiencing a lot of closed connection errors when a service
is not used frequently and aren't sure what to do about them.

This seems to be what's happening:

* Service A tries to call a method on Service B through RpcProxy

* Services B has been idle for more than 10 minutes

* An error is raised in Service A and it seems like the entrypoint
for Service B never fires.

Here's the traceback:

  Sep 06 10:33:22: Connection to broker lost, trying to re-establish
connection...

  Sep 06 10:33:22: Traceback (most recent call last):

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/mixins.py", line 177
, in run

  Sep 06 10:33:22: for _ in self.consume(limit=None): #
pragma: no cover

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/nameko/messaging.py",
line 404, in consume

  Sep 06 10:33:22: conn.drain_events(timeout=safety_interval)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/connection.py",
line 275, in drain_events

  Sep 06 10:33:22: return self.transport.drain_events(self.
connection, **kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py"
, line 95, in drain_events

  Sep 06 10:33:22: return connection.drain_events(**kwargs)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
303, in drain_events

  Sep 06 10:33:22: chanmap, None, timeout=timeout,

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
366, in _wait_multiple

  Sep 06 10:33:22: channel, method_sig, args, content =
read_timeout(timeout)

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/connection.py", line
337, in read_timeout

  Sep 06 10:33:22: return self.method_reader.read_method()

  Sep 06 10:33:22: File
"/venv/local/lib/python2.7/site-packages/amqp/method_framing.py",
line 189, in read_method

  Sep 06 10:33:22: raise m

  Sep 06 10:33:22: IOError: Socket close

Is this the traceback from Service A?

It looks like a Kombu Consumer has detected the broken connection
and attempted to reconnect, but found the socket to be closed. Does the
process crash at this point? If so, it's unusual; you'd normally expect
Kombu to continue to retry with an increasing backoff until the connection
is re-established.

It looks like Service B needs to reestablish its connection and so
an error is raised in Service A. I think that's what's happening because I
deployed our services in a separate environment where Service B is called
on a 15-minute interval. In that environment, when Service A calls Service
B, the connection error does not get raised.

It's unlikely that a connection error in Service B results in a
traceback in Service A. The two services' connections are independent. If
Service A managed to send a message to Service B, but Service B couldn't
respond, the behaviour you'd expect is for Service A to hang forever
waiting for its reply.

I don't know very much about amqp internals but my main questions
are:

1. Is the automatic closing of idle connections the expected
behavior?

No it's not. I would expect the connection to stay up in a healthy
network environment.

2. If this is the expected behavior, what is the best way to handle
this IOError? I'd rather not do a try/catch for an IOError every time I
make a RPC call!

Producers (including the RPC proxy) already have automatic retry if
they encounter an IOError. See
https://github.com/onefinestay/nameko/blob/master/nameko/rpc.py#L419,L420\.
The traceback you're seeing is probably coming from a Consumer that has
lost its connection, or a very unhealthy Producer after using all of its
retries.

One caveat to Producers losing their connection is that they don't
always notice immediately. The retry policy take cares of reestablishing
the connection and retrying, but only if the underlying publish raises an
exception. It takes some time for a Producer to notice that its socket is
dead, and during that time published messages are silently lost. A
workaround is to enable "publish confirms". The forces the Producer to
request acknowledgement of its message by the broker, and a side-effect is
that it notices immediately if the socket is dead. In that case it raises,
and then the retry policy kicks in. There is a PR to enable "publish
confirms" by default here:
https://github.com/onefinestay/nameko/pull/337

3. If this is an issue in amqp rather than nameko, what is the best
way to handle it?

The *best* way to handle flaky connections is with AMQP heartbeats,
which kombu unfortunately does not support. The heartbeat feature
constantly monitors connections and re-establishes them if they fail.

The way to guarantee that no messages are ever lost by a Producer is
to enable publish-confirms, although this comes with an overhead.

Consumers should already be able to detect bad connections and
re-establish themselves. If your process crashes after a traceback like the
one above, that's the place to start investigating.

Any help would be greatly appreciated!

If you can send more tracebacks it would be helpful.

Thanks,

Grace

1 Like