RPC-extension and heartbeat if my service doing long task (above 20min)

The error lies in the fact that the received task from the queue returns to
the queue back some time later, if you look at the logs there is an error
associated with the timeouts.
Here are the detailed logs and question, how do I increase the count of
heartbeats?

TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying to re-establish
connection...
Traceback (most recent call last):
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py", line
199, in consume
conn.drain_events(timeout=safety_interval)
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py",
line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File
"/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events
return connection.drain_events(**kwargs)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 303, in drain_events
chanmap, None, timeout=timeout,
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 337, in read_timeout
return self.method_reader.read_method()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 189, in read_method
raise m
File "/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 154, in read_frame
frame_header = read(7, True)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 277, in _read
s = recv(n - len(rbuf))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 360, in recv
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 354, in _recv_loop
self._read_trampoline()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 325, in _read_trampoline
timeout_exc=socket_timeout('timed out'))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 207, in _trampoline
mark_as_closed=self._mark_as_closed)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py",
line 163, in trampoline
return hub.switch()
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py",
line 295, in switch
return self.greenlet.switch()
socket.timeout: timed out

Thanks all!

I had a similar issue.

The default is to send a heartbeat every 60 seconds, with 3 retries.

You can increase the heartbeat timeout with the `HEARTBEAT` config value.

If your 'long task' happens to be a SubProcess call (like mine was) you
should make sure you import the 'greened' Subprocess module (Subprocess is
not monkey-patched automatically, you have to specifically import it).

Geoff

···

On Monday, April 2, 2018 at 2:31:16 AM UTC-7, ts...@atlas.ru wrote:

The error lies in the fact that the received task from the queue returns
to the queue back some time later, if you look at the logs there is an
error associated with the timeouts.
Here are the detailed logs and question, how do I increase the count of
heartbeats?

>> TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying to
re-establish connection...
Traceback (most recent call last):
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py",
line 199, in consume
conn.drain_events(timeout=safety_interval)
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py",
line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File
"/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events
return connection.drain_events(**kwargs)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 303, in drain_events
chanmap, None, timeout=timeout,
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 337, in read_timeout
return self.method_reader.read_method()
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 189, in read_method
raise m
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 154, in read_frame
frame_header = read(7, True)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 277, in _read
s = recv(n - len(rbuf))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 360, in recv
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 354, in _recv_loop
self._read_trampoline()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 325, in _read_trampoline
timeout_exc=socket_timeout('timed out'))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 207, in _trampoline
mark_as_closed=self._mark_as_closed)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py",
line 163, in trampoline
return hub.switch()
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py",
line 295, in switch
return self.greenlet.switch()
socket.timeout: timed out

Thanks all!

Yeap! Thanks for advice! I fixed it with eventlet.

···

On Monday, April 2, 2018 at 7:24:55 PM UTC+3, juko...@gmail.com wrote:

I had a similar issue.

The default is to send a heartbeat every 60 seconds, with 3 retries.

You can increase the heartbeat timeout with the `HEARTBEAT` config value.

If your 'long task' happens to be a SubProcess call (like mine was) you
should make sure you import the 'greened' Subprocess module (Subprocess is
not monkey-patched automatically, you have to specifically import it).

Geoff

On Monday, April 2, 2018 at 2:31:16 AM UTC-7, ts...@atlas.ru wrote:

The error lies in the fact that the received task from the queue returns
to the queue back some time later, if you look at the logs there is an
error associated with the timeouts.
Here are the detailed logs and question, how do I increase the count of
heartbeats?

>> TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying to
re-establish connection...
Traceback (most recent call last):
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py",
line 199, in consume
conn.drain_events(timeout=safety_interval)
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py",
line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File
"/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events
return connection.drain_events(**kwargs)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 303, in drain_events
chanmap, None, timeout=timeout,
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 337, in read_timeout
return self.method_reader.read_method()
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 189, in read_method
raise m
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 154, in read_frame
frame_header = read(7, True)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 277, in _read
s = recv(n - len(rbuf))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 360, in recv
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 354, in _recv_loop
self._read_trampoline()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 325, in _read_trampoline
timeout_exc=socket_timeout('timed out'))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 207, in _trampoline
mark_as_closed=self._mark_as_closed)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py",
line 163, in trampoline
return hub.switch()
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py"
, line 295, in switch
return self.greenlet.switch()
socket.timeout: timed out

Thanks all!

I'm having problems with setting the "HEARTBEAT" parameter. It seems like
kombu (or the underlying amqp lib) uses the minimum of server-heartbeat and
client-heartbeat. So if the server heartbeat is set to 30 seconds, we
can't increase the heartbeat to 5 minutes for example.

Is there any (functional) reason that the client heartbeat can't be larger
than the server heartbeat?

btw. This issue here: RPC erroneously restarted if connection between worker and broker times out · Issue #477 · nameko/nameko · GitHub deals
with the same problem and has some more solutions (unfortunately the tpool
solution does not work for all cases, hence the heartbeat question above).

Davis

···

Am Montag, 2. April 2018 18:24:55 UTC+2 schrieb juko...@gmail.com:

I had a similar issue.

The default is to send a heartbeat every 60 seconds, with 3 retries.

You can increase the heartbeat timeout with the `HEARTBEAT` config value.

If your 'long task' happens to be a SubProcess call (like mine was) you
should make sure you import the 'greened' Subprocess module (Subprocess is
not monkey-patched automatically, you have to specifically import it).

Geoff

On Monday, April 2, 2018 at 2:31:16 AM UTC-7, ts...@atlas.ru wrote:

The error lies in the fact that the received task from the queue returns
to the queue back some time later, if you look at the logs there is an
error associated with the timeouts.
Here are the detailed logs and question, how do I increase the count of
heartbeats?

>> TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying to
re-establish connection...
Traceback (most recent call last):
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py",
line 199, in consume
conn.drain_events(timeout=safety_interval)
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py",
line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File
"/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py",
line 95, in drain_events
return connection.drain_events(**kwargs)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 303, in drain_events
chanmap, None, timeout=timeout,
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 337, in read_timeout
return self.method_reader.read_method()
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 189, in read_method
raise m
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 154, in read_frame
frame_header = read(7, True)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 277, in _read
s = recv(n - len(rbuf))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 360, in recv
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 354, in _recv_loop
self._read_trampoline()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 325, in _read_trampoline
timeout_exc=socket_timeout('timed out'))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 207, in _trampoline
mark_as_closed=self._mark_as_closed)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py",
line 163, in trampoline
return hub.switch()
File "/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py"
, line 295, in switch
return self.greenlet.switch()
socket.timeout: timed out

Thanks all!

Davis,

Sorry, I completely missed this post.

The server setting the max heartbeat is a RabbitMQ design. See
https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout:

Note that in case RabbitMQ server has a non-zero heartbeat timeout

configured (which is the default in versions starting with 3.6.x), the
client can only lower the value but not increase it.

However, a service method that blocks for more than a few seconds is a bad
idea. As I described in RPC erroneously restarted if connection between worker and broker times out · Issue #477 · nameko/nameko · GitHub, this
is just part of how implicitly yielding threads work. You must yield
occasionally otherwise any other threads will starve (which is what's
happening to the AMQP consumer thread here)

In this situation you generally have two choices:

1. Explicitly yield. If you've got some code that's purely CPU bound with
no I/O, make sure insert an eventlet.yield() occasionally
2. If you can't insert a yield, for example because the blocking code is in
some library that you don't control, use the tpool (this is exactly what
tpool is for)

···

On Wednesday, 23 May 2018 16:54:21 UTC+1, Davis Kirkendall wrote:

I'm having problems with setting the "HEARTBEAT" parameter. It seems like
kombu (or the underlying amqp lib) uses the minimum of server-heartbeat and
client-heartbeat. So if the server heartbeat is set to 30 seconds, we
can't increase the heartbeat to 5 minutes for example.

Is there any (functional) reason that the client heartbeat can't be larger
than the server heartbeat?

btw. This issue here: RPC erroneously restarted if connection between worker and broker times out · Issue #477 · nameko/nameko · GitHub
<Redirect Notice;
deals with the same problem and has some more solutions (unfortunately the
tpool solution does not work for all cases, hence the heartbeat question
above).

Davis

Am Montag, 2. April 2018 18:24:55 UTC+2 schrieb juko...@gmail.com:

I had a similar issue.

The default is to send a heartbeat every 60 seconds, with 3 retries.

You can increase the heartbeat timeout with the `HEARTBEAT` config value.

If your 'long task' happens to be a SubProcess call (like mine was) you
should make sure you import the 'greened' Subprocess module (Subprocess is
not monkey-patched automatically, you have to specifically import it).

Geoff

On Monday, April 2, 2018 at 2:31:16 AM UTC-7, ts...@atlas.ru wrote:

The error lies in the fact that the received task from the queue returns
to the queue back some time later, if you look at the logs there is an
error associated with the timeouts.
Here are the detailed logs and question, how do I increase the count of
heartbeats?

>> TRACE

2018-03-07 15:14:07,102 - WARNING - Connection to broker lost, trying
to re-establish connection...
Traceback (most recent call last):
File "/home/etl/.local/lib/python3.5/site-packages/kombu/mixins.py",
line 199, in consume
conn.drain_events(timeout=safety_interval)
File "/home/etl/.local/lib/python3.5/site-packages/kombu/connection.py"
, line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File
"/home/etl/.local/lib/python3.5/site-packages/kombu/transport/pyamqp.py"
, line 95, in drain_events
return connection.drain_events(**kwargs)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 303, in drain_events
chanmap, None, timeout=timeout,
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 366, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/connection.py",
line 337, in read_timeout
return self.method_reader.read_method()
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 189, in read_method
raise m
File
"/home/etl/.local/lib/python3.5/site-packages/amqp/method_framing.py",
line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 154, in read_frame
frame_header = read(7, True)
File "/home/etl/.local/lib/python3.5/site-packages/amqp/transport.py",
line 277, in _read
s = recv(n - len(rbuf))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 360, in recv
return self._recv_loop(self.fd.recv, b'', bufsize, flags)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 354, in _recv_loop
self._read_trampoline()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 325, in _read_trampoline
timeout_exc=socket_timeout('timed out'))
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/greenio/base.py",
line 207, in _trampoline
mark_as_closed=self._mark_as_closed)
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/__init__.py"
, line 163, in trampoline
return hub.switch()
File
"/home/etl/.local/lib/python3.5/site-packages/eventlet/hubs/hub.py",
line 295, in switch
return self.greenlet.switch()
socket.timeout: timed out

Thanks all!

Hi,

i have same issue with training a ML model.

After train ended (about 1h) i received the error

Connection to broker lost, trying to re-establish connection...
Traceback (most recent call last):
  File "/Users/.../lib/python3.8/site-packages/kombu/mixins.py", line 193, in consume
    conn.drain_events(timeout=safety_interval)
  File "/Users/.../lib/python3.8/site-packages/kombu/connection.py", line 317, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/Users/.../lib/python3.8/site-packages/kombu/transport/pyamqp.py", line 169, in drain_events
    return connection.drain_events(**kwargs)
  File "/Users/.../lib/python3.8/site-packages/amqp/connection.py", line 522, in drain_events
    while not self.blocking_read(timeout):
  File "/Users/.../lib/python3.8/site-packages/amqp/connection.py", line 527, in blocking_read
    frame = self.transport.read_frame()
  File "/Users/.../lib/python3.8/site-packages/amqp/transport.py", line 310, in read_frame
    frame_header = read(7, True)
  File "/Users/.../lib/python3.8/site-packages/amqp/transport.py", line 639, in _read
    s = recv(n - len(rbuf))
  File "/Users/.../lib/python3.8/site-packages/eventlet/greenio/base.py", line 370, in recv
    return self._recv_loop(self.fd.recv, b'', bufsize, flags)
  File "/Users/.../lib/python3.8/site-packages/eventlet/greenio/base.py", line 364, in _recv_loop
    self._read_trampoline()
  File "/Users/.../lib/python3.8/site-packages/eventlet/greenio/base.py", line 332, in _read_trampoline
    self._trampoline(
  File "/Users/.../lib/python3.8/site-packages/eventlet/greenio/base.py", line 211, in _trampoline
    return trampoline(fd, read=read, write=write, timeout=timeout,
  File "/Users/.../lib/python3.8/site-packages/eventlet/hubs/__init__.py", line 159, in trampoline
    return hub.switch()
  File "/Users/t.../lib/python3.8/site-packages/eventlet/hubs/hub.py", line 313, in switch
    return self.greenlet.switch()
socket.timeout: timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/.../lib/python3.8/site-packages/kombu/mixins.py", line 171, in run
    for _ in self.consume(limit=None, **kwargs):
  File "/Users/.../lib/python3.8/site-packages/kombu/mixins.py", line 195, in consume
    conn.heartbeat_check()
  File "/Users/.../lib/python3.8/site-packages/kombu/connection.py", line 306, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
  File "/Users/.../lib/python3.8/site-packages/kombu/transport/pyamqp.py", line 220, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
  File "/Users/.../lib/python3.8/site-packages/amqp/connection.py", line 767, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

Can you tell me how you fixed it?