message_ack failing and retriggering task

Hi,

I have an rpc endpoint that does some processing and takes around 20 min to
complete. It basically kicks off a bunch of other async calls to other rpc
endpoints and waits for all of those to complete.
I frequently make async calls to this endpoint to do some processing.

However, I get this error message:
Traceback (most recent call last):
  File "site-packages/kombu/mixins.py", line 177, in run
    for _ in self.consume(limit=None): # pragma: no cover
  File "site-packages/kombu/mixins.py", line 197, in consume
    self.on_iteration()
  File "site-packages/nameko/messaging.py", line 417, in on_iteration
    self._process_pending_message_acks()
  File "site-packages/nameko/messaging.py", line 368, in
_process_pending_message_acks
    msg.ack()
  File "site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "site-packages/amqp/abstract_channel.py", line 50, in _send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed

It looks like, by the time nameko sends it's ack_message, the worker's
connection to rabbitmq is lost, so I get a RecoverableConnectionError. This
means that the ack isn't transmitted correctly, so the message get's
requeued, and the worker attempts to process the same message again.

Is there a way to avoid this? Right now I've done a very hacky fix: I
dispatch events instead of making async_calls. This ensures that the
ack_message occurs immediately, but I'd like to find a sustainable fix. Is
it possible to set a timeout parameter (e.g. RpcProxy(timeout=20*60) # 20
min ) when I make the async_call?

Thanks for all of your help! And sorry for the barrage of questions.

Hi,

I have an rpc endpoint that does some processing and takes around 20 min
to complete. It basically kicks off a bunch of other async calls to other
rpc endpoints and waits for all of those to complete.
I frequently make async calls to this endpoint to do some processing.

However, I get this error message:

Traceback (most recent call last):
  File "site-packages/kombu/mixins.py", line 177, in run
    for _ in self.consume(limit=None): # pragma: no cover
  File "site-packages/kombu/mixins.py", line 197, in consume
    self.on_iteration()
  File "site-packages/nameko/messaging.py", line 417, in on_iteration
    self._process_pending_message_acks()
  File "site-packages/nameko/messaging.py", line 368, in
_process_pending_message_acks
    msg.ack()
  File "site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "site-packages/amqp/abstract_channel.py", line 50, in _send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed

It looks like, by the time nameko sends it's ack_message, the worker's
connection to rabbitmq is lost, so I get a RecoverableConnectionError. This
means that the ack isn't transmitted correctly, so the message get's
requeued, and the worker attempts to process the same message again.

Is there a way to avoid this? Right now I've done a very hacky fix: I
dispatch events instead of making async_calls. This ensures that the
ack_message occurs immediately, but I'd like to find a sustainable fix. Is
it possible to set a timeout parameter (e.g. RpcProxy(timeout=20*60) # 20
min ) when I make the async_call?

This isn't the RPC request timing out (there actually isn't a timeout on
RPC requests, though it would be a useful feature). Rather it's the
connection to Rabbit being closed. Are you connecting to Rabbit via a
load-balancer or proxy of some kind? Oftentimes they will close idle
connections after a number of minutes.

Which version of Nameko are you using? Until 2.5.4 message acks were sent
by the consumer thread, and consumer heartbeats are enabled from 2.4.4
onwards. If you're pre-2.4.4, upgrading will enable heartbeats that would
stop the connection being determined as idle. In 2.5.4,
RecoverableConnectionErrors are explicitly caught and retried (see
https://github.com/nameko/nameko/blob/v2.5.4/nameko/messaging.py#L326\),
although it's possible there's a bug there.

Thanks for all of your help! And sorry for the barrage of questions.

No problem.

One other note -- rather than long-running RPC, have you considered using
another entrypoint as a callback in the client, so it can be notified the
work when it's done?

···

On Saturday, April 22, 2017 at 10:42:23 PM UTC+1, di...@clearmetal.com wrote:

I seem to be having the same Problem using nameko 2.6.0.
Would there be a way of somehow reconnecting if a connection error es
detected and retrying the ack instead of requeuing the job?

The reason for using the rpc jobs is so that the caller is blocked until
the long running job returns (this is helpful for logging, timing and
resource/cpu/memory management).

Thanks for the help and the great library!

···

Am Sonntag, 23. April 2017 00:20:08 UTC+2 schrieb Matt Yule-Bennett:

On Saturday, April 22, 2017 at 10:42:23 PM UTC+1, di...@clearmetal.com > wrote:

Hi,

I have an rpc endpoint that does some processing and takes around 20 min
to complete. It basically kicks off a bunch of other async calls to other
rpc endpoints and waits for all of those to complete.
I frequently make async calls to this endpoint to do some processing.

However, I get this error message:

Traceback (most recent call last):
  File "site-packages/kombu/mixins.py", line 177, in run
    for _ in self.consume(limit=None): # pragma: no cover
  File "site-packages/kombu/mixins.py", line 197, in consume
    self.on_iteration()
  File "site-packages/nameko/messaging.py", line 417, in on_iteration
    self._process_pending_message_acks()
  File "site-packages/nameko/messaging.py", line 368, in
_process_pending_message_acks
    msg.ack()
  File "site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "site-packages/amqp/abstract_channel.py", line 50, in _send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed

It looks like, by the time nameko sends it's ack_message, the worker's
connection to rabbitmq is lost, so I get a RecoverableConnectionError. This
means that the ack isn't transmitted correctly, so the message get's
requeued, and the worker attempts to process the same message again.

Is there a way to avoid this? Right now I've done a very hacky fix: I
dispatch events instead of making async_calls. This ensures that the
ack_message occurs immediately, but I'd like to find a sustainable fix. Is
it possible to set a timeout parameter (e.g. RpcProxy(timeout=20*60) # 20
min ) when I make the async_call?

This isn't the RPC request timing out (there actually isn't a timeout on
RPC requests, though it would be a useful feature). Rather it's the
connection to Rabbit being closed. Are you connecting to Rabbit via a
load-balancer or proxy of some kind? Oftentimes they will close idle
connections after a number of minutes.

Which version of Nameko are you using? Until 2.5.4 message acks were sent
by the consumer thread, and consumer heartbeats are enabled from 2.4.4
onwards. If you're pre-2.4.4, upgrading will enable heartbeats that would
stop the connection being determined as idle. In 2.5.4,
RecoverableConnectionErrors are explicitly caught and retried (see
https://github.com/nameko/nameko/blob/v2.5.4/nameko/messaging.py#L326\),
although it's possible there's a bug there.

Thanks for all of your help! And sorry for the barrage of questions.

No problem.

One other note -- rather than long-running RPC, have you considered using
another entrypoint as a callback in the client, so it can be notified the
work when it's done?

I think RPC erroneously restarted if connection between worker and broker times out · Issue #477 · nameko/nameko · GitHub probably describes
that's going on here. Do you have a workload that doesn't yield or block
for I/O?

···

On Monday, September 25, 2017 at 2:16:18 PM UTC+1, Davis Kirkendall wrote:

I seem to be having the same Problem using nameko 2.6.0.
Would there be a way of somehow reconnecting if a connection error es
detected and retrying the ack instead of requeuing the job?

The reason for using the rpc jobs is so that the caller is blocked until
the long running job returns (this is helpful for logging, timing and
resource/cpu/memory management).

Thanks for the help and the great library!

Am Sonntag, 23. April 2017 00:20:08 UTC+2 schrieb Matt Yule-Bennett:

On Saturday, April 22, 2017 at 10:42:23 PM UTC+1, di...@clearmetal.com >> wrote:

Hi,

I have an rpc endpoint that does some processing and takes around 20 min
to complete. It basically kicks off a bunch of other async calls to other
rpc endpoints and waits for all of those to complete.
I frequently make async calls to this endpoint to do some processing.

However, I get this error message:

Traceback (most recent call last):
  File "site-packages/kombu/mixins.py", line 177, in run
    for _ in self.consume(limit=None): # pragma: no cover
  File "site-packages/kombu/mixins.py", line 197, in consume
    self.on_iteration()
  File "site-packages/nameko/messaging.py", line 417, in on_iteration
    self._process_pending_message_acks()
  File "site-packages/nameko/messaging.py", line 368, in
_process_pending_message_acks
    msg.ack()
  File "site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "site-packages/amqp/abstract_channel.py", line 50, in _send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed

It looks like, by the time nameko sends it's ack_message, the worker's
connection to rabbitmq is lost, so I get a RecoverableConnectionError. This
means that the ack isn't transmitted correctly, so the message get's
requeued, and the worker attempts to process the same message again.

Is there a way to avoid this? Right now I've done a very hacky fix: I
dispatch events instead of making async_calls. This ensures that the
ack_message occurs immediately, but I'd like to find a sustainable fix. Is
it possible to set a timeout parameter (e.g. RpcProxy(timeout=20*60) # 20
min ) when I make the async_call?

This isn't the RPC request timing out (there actually isn't a timeout on
RPC requests, though it would be a useful feature). Rather it's the
connection to Rabbit being closed. Are you connecting to Rabbit via a
load-balancer or proxy of some kind? Oftentimes they will close idle
connections after a number of minutes.

Which version of Nameko are you using? Until 2.5.4 message acks were sent
by the consumer thread, and consumer heartbeats are enabled from 2.4.4
onwards. If you're pre-2.4.4, upgrading will enable heartbeats that would
stop the connection being determined as idle. In 2.5.4,
RecoverableConnectionErrors are explicitly caught and retried (see
https://github.com/nameko/nameko/blob/v2.5.4/nameko/messaging.py#L326\),
although it's possible there's a bug there.

Thanks for all of your help! And sorry for the barrage of questions.

No problem.

One other note -- rather than long-running RPC, have you considered using
another entrypoint as a callback in the client, so it can be notified the
work when it's done?