prefetch_count override

Moving this discussion to the development group.

So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?

2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to amqp:
//services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to amqp:
//services:**@10.10.32.86:5672//

nameko_rpc.py

from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 10

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

nameko_event_handler.py

import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer

class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10

class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()

@property
def broadcast_identifier(self):
return socket.gethostname()

event_handler = EventHandler.decorator

There's not necessarily anything wrong with having two Consumers connected.

The QueueConsumer is a SharedExtension
<https://github.com/nameko/nameko/blob/v2.5.3/nameko/extensions.py#L115-L134&gt;,
meaning that multiple entrypoints declaring it as an Extension (as both the
EventHandler and the RpcConsumer do) will actually *share* the same
instance.

Another example of SharedExtensions in Nameko is the WebServer that powers
the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.

The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.

So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.

If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.

I hope that makes sense.

Matt.

···

On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:

Moving this discussion to the development group.

Deadlock and AMQP reconnections when prefetched messages exceed max workers · Issue #428 · nameko/nameko · GitHub

So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?

2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//

nameko_rpc.py

from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 10

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

nameko_event_handler.py

import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer

class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10

class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()

@property
def broadcast_identifier(self):
return socket.gethostname()

event_handler = EventHandler.decorator

Makes total sense. Thanks again for the assist!

···

On Tuesday, April 11, 2017 at 5:16:30 PM UTC-4, Matt Yule-Bennett wrote:

There's not necessarily anything wrong with having two Consumers connected.

The QueueConsumer is a SharedExtension
<https://github.com/nameko/nameko/blob/v2.5.3/nameko/extensions.py#L115-L134&gt;,
meaning that multiple entrypoints declaring it as an Extension (as both the
EventHandler and the RpcConsumer do) will actually *share* the same
instance.

Another example of SharedExtensions in Nameko is the WebServer that powers
the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.

The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.

So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.

If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.

I hope that makes sense.

Matt.

On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:

Moving this discussion to the development group.

Deadlock and AMQP reconnections when prefetched messages exceed max workers · Issue #428 · nameko/nameko · GitHub

So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?

2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//

nameko_rpc.py

from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 10

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

nameko_event_handler.py

import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer

class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10

class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()

@property
def broadcast_identifier(self):
return socket.gethostname()

event_handler = EventHandler.decorator

If anyone else is interested, this is what I ended up with. Seems to be
working as expected.

# nameko_entrypoints.py
import socket

from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10

class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()

@property
def broadcast_identifier(self):
return socket.gethostname()

event_handler = EventHandler.decorator

class ReplyListener(NamekoReplyListener):
queue_consumer = QueueConsumer()

class RpcProxy(NamekoRpcProxy):
rpc_reply_listener = ReplyListener()

···

On Tuesday, April 11, 2017 at 5:40:55 PM UTC-4, Conor Seabrook wrote:

Makes total sense. Thanks again for the assist!

On Tuesday, April 11, 2017 at 5:16:30 PM UTC-4, Matt Yule-Bennett wrote:

There's not necessarily anything wrong with having two Consumers
connected.

The QueueConsumer is a SharedExtension
<https://github.com/nameko/nameko/blob/v2.5.3/nameko/extensions.py#L115-L134&gt;,
meaning that multiple entrypoints declaring it as an Extension (as both the
EventHandler and the RpcConsumer do) will actually *share* the same
instance.

Another example of SharedExtensions in Nameko is the WebServer that
powers the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.

The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.

So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.

If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.

I hope that makes sense.

Matt.

On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:

Moving this discussion to the development group.

Deadlock and AMQP reconnections when prefetched messages exceed max workers · Issue #428 · nameko/nameko · GitHub

So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?

2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services
: account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//

nameko_rpc.py

from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

class QueueConsumer(NamekoQueueConsumer):
    prefetch_count = 10

class RpcConsumer(NamekoRpcConsumer):
    queue_consumer = QueueConsumer()

class Rpc(NamekoRpc):
    rpc_consumer = RpcConsumer()

rpc = Rpc.decorator

nameko_event_handler.py

import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer

class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10

class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()

@property
def broadcast_identifier(self):
return socket.gethostname()

event_handler = EventHandler.decorator