So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?
2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to amqp: //services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to amqp: //services:**@10.10.32.86:5672//
nameko_rpc.py
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
nameko_event_handler.py
import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()
Another example of SharedExtensions in Nameko is the WebServer that powers
the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.
The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.
So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.
If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.
I hope that makes sense.
Matt.
···
On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:
So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?
2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
nameko_rpc.py
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
nameko_event_handler.py
import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()
Another example of SharedExtensions in Nameko is the WebServer that powers
the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.
The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.
So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.
If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.
I hope that makes sense.
Matt.
On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:
So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?
2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services:
account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
nameko_rpc.py
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
nameko_event_handler.py
import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()
If anyone else is interested, this is what I ended up with. Seems to be
working as expected.
# nameko_entrypoints.py
import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcProxy as NamekoRpcProxy
from nameko.rpc import ReplyListener as NamekoReplyListener
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()
Another example of SharedExtensions in Nameko is the WebServer that
powers the Http entrypoints -- in that case we want to have as many @http
decorated methods as we need but only bind to a single port.
The AMQP entrypoints don't actually *need* to share a single consumer,
channel or connection, but that's how the QueueConsumer normally operates.
In fact this is a bit of a mis-design, since you really want distinct
consumers and channels inside a shared connection, but that is a more
involved discussion.
So it's fine for you to be running distinct QueueConsumers for your RPC
entrypoints and event handlers. The reason you've ended up with distinct
ones is that the `sharing_key` that determines whether an existing instance
of a SharedExtension can be reused is the *type* of the instance, and
you've done the override once in each module.
If you want to end up with a single QueueConsumer (and therefore single
connection), you can simply make the override once, in a shared module. Or
override the `sharing_key`.
I hope that makes sense.
Matt.
On Tuesday, April 11, 2017 at 9:58:13 PM UTC+1, Conor Seabrook wrote:
So I'm trying to set a prefetch_count not tied to max_workers based on
Matt's suggestion. It seems I have something off as I'm now getting 2 log
events with consumers connecting to rabbit. Any suggestions on what I may
be doing wrong?
2017-04-11 16:20:22 - INFO - runners.py:83 - start() - starting services
: account
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
2017-04-11 16:20:22 - INFO - mixins.py:231 - Consumer() - Connected to
amqp://services:**@10.10.32.86:5672//
nameko_rpc.py
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class RpcConsumer(NamekoRpcConsumer):
queue_consumer = QueueConsumer()
class Rpc(NamekoRpc):
rpc_consumer = RpcConsumer()
rpc = Rpc.decorator
nameko_event_handler.py
import socket
from nameko.events import EventHandler as NamekoEventHandler
from nameko.messaging import QueueConsumer as NamekoQueueConsumer
class QueueConsumer(NamekoQueueConsumer):
prefetch_count = 10
class EventHandler(NamekoEventHandler):
queue_consumer = QueueConsumer()