Nameko ships with built-in support for some messaging paradigms, but not
all. Out of the box you get:
* Bare "messaging" which just allows you to publish messages to exchanges
or consume them from queues
* RPC, which is implemented with messaging but really intended to be used
via API
* Events (aka pub-sub) where messages fan-out from one publisher to
multiple subscribers
The use-case you've described (take a message off a queue, process it, put
the result on another queue) is somewhere between the first two.
Rather than reverse engineering RPC, I would recommend building your own
entrypoint on top of the @consumer. That would allow you to control the
queue names, routing keys and so on.
Something like the following ought to do it:
class TaskConsumer(Consumer):
def handle_result(self, message, result, exc_info):
""" Very similar to nameko.rpc.RpcConsumer.handle_result
"""
responder = TaskResponder(self.container.config, message)
result, exc_info = responder.send_response(result, exc_info)
self.queue_consumer.ack_message(message)
return result, exc_info
class TaskResponder(object):
""" Very similar to nameko.rpc.Responder
"""
def __init__(self, config, message):
self.config = config
self.message = message
def send_response(self, result, exc_info, **kwargs):
error = None
if exc_info is not None:
error = serialize(exc_info[1])
# disaster avoidance serialization check: `result` must be
# serializable, otherwise the container will commit suicide assuming
# unrecoverable errors (and the message will be requeued for another
# victim)
serializer = self.config.get(
SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER)
try:
kombu.serialization.dumps(result, serializer)
except Exception:
exc_info = sys.exc_info()
# `error` below is guaranteed to serialize to json
error = serialize(UnserializableValueError(result))
result = None
conn = Connection(self.config[AMQP_URI_CONFIG_KEY])
exchange = ... # determine "task" exchange, or use the default
exchange
with producers[conn].acquire(block=True) as producer:
routing_key = self.message.properties['reply_to']
# you won't need a correlation id if the reply queues are unique
correlation_id = self.message.properties.get('correlation_id')
msg = { # or however you want your payload to be formatted
'result': result, 'error': error
}
producer.publish(
msg,
exchange=exchange, routing_key=routing_key,
serializer=serializer,
correlation_id=correlation_id, **kwargs)
return result, exc_info
···
On Thursday, September 8, 2016 at 11:15:28 PM UTC+1, sha...@gmail.com wrote:
I feel like I follow everything that you're saying. It's unfortunately
that nameko doesn't do what I want out of the box, especially since my
scenario doesn't seem too far fetched as far as microservice architectures
go. One thing that I can't figure out though is how to, in the RabbitMQ
console, send my nameko a message successfully. I feel like if I can figure
that out, then I'm good to go.
class DBConnect(object):
name = "sql_test"
@rpc
def insert(self, contents):
Pass
Let's say that I have the above nameko service running. I see the
nameko-rpc Exchange, which is what my service is listening to. I also see
this:
TO ROUTING KEY
rpc-sql_test sql_test.*
So then I go to Queue rpc-sql_test. But here's the problem: no message
that I send from here ever hits my above insert() function. I've tried
renaming things, binding things, etc., but no matter what I try from
RabbitMQ, I can't get it to work.
I feel like if I can just get the message to the function, then it'll all
be smooth sailing.
Thoughts?
-S
On Thursday, September 8, 2016 at 2:22:19 PM UTC-7, David Szotten wrote:
I think there might be a slight misunderstanding, yes.
Nameko provides a few ways of interacting with rabbitmq. There is the
`@consumer` decorator, which just consumes from a queue. That's all it
does, and if you want further semantics on top (like rpc replies), that's
up to you. Nameko also has an rpc implementation built on top of the
`consumer`, which implements rpc semantics with replies. However, this
comes with its own rpc protocol, which includes headers, routing keys and
queue names (what you refer to as "convention-over-configuration" though
i'm not sure i agree with that characterisation, it's more of a custom rpc
protocol). As I mentioned, we haven't formalised a spec for the rpc
implenentation, but there is an overview of some of the internals in the
github issue i linked to above. In particular, you might be missing e.g.
routing keys, which the nameko rpc protocol uses to choose service method.
For clients of nameko `@rpc`, nameko provides libraries (the various rpc
proxies) which implement the client side of the protocol, including
specifying routing keys and managing reply queues.
To summarise, i think you have two options: @consumer (or a modified
version if you want access to the headers, though you might want the
Entrypoint to handle some of that for you), or, if you want to use @rpc,
i'd recommend the nameko clients, or you can manage things yourself as long
as you conform to the protocol
Best,
David
On Thursday, 8 September 2016 22:06:04 UTC+1, sha...@gmail.com wrote:
Maybe I'm just completely misunderstanding what nameko is doing. I have
some external app that submits messages to a RabbitMQ server. I want
nameko sitting "behind" the MQ, observing all the queues, and then
responding properly to each request. It seems like a pretty standard
microservices architecture (and I use that term loosely for what I'm
describing). There are different queues where different messages come in
that request different data (else it can be one queue with different
routing keys). @consume allows me to tell a specific function to run when
a message hits a specific queue. @rpc doesn't seem to be able to do that.
And my problem is that @rpc isn't responding to ANYTHING from the RabbitMQ
server, though it works fine when hitting it from the nameko shell. Is
there some convention-over-configuration that I need to do in order to get
my @rpc function to be called when a message comes in? When I run my
nameko service, it successfully connects to the RabbitMQ server:
starting services: sql_test
Connected to amqp://***:**@somwhere.compute.amazonaws.com:xxxx/yyyy
So I know that part is working. I just can't for the life of me get my
service to respond to a message.
Thoughts?
-S
On Thursday, September 8, 2016 at 12:42:47 PM UTC-7, David Szotten wrote:
Just to check that you are intentionally not using the provided nameko
rpc client(s)? (from the interactive shell, via a nameko service rpc proxy,
or the standalone rpc proxy for non-nameko python code; these are all in
the documentation)
If so, and you still want to use the `rpc` decorator, you need to
conform to the nameko rpc protocol. There is no official spec yet, though
there are some details in one of my recent answers on github:
Missing detailed docs on RPC implementation (integration with other frameworks) · Issue #354 · nameko/nameko · GitHub
D
On Thursday, 8 September 2016 20:26:22 UTC+1, sha...@gmail.com wrote:
So if I have this:
class DBConnect(object):
name = "sql_test"
@rpc
def insert(self, contents):
logging.debug("contents: " + contents)
How does RabbitMQ know to call the insert function? I'm sending test
messages from the RabbitMQ console, using the rpc-sql_test queue in the
nameko-rpc exchange, but that function is not being called. What else do I
need to do in order to call it?
Thanks,
S
On Thursday, September 8, 2016 at 8:09:53 AM UTC-7, Matt Yule-Bennett >>>>> wrote:
No, @consume just consumes messages from a queue. It is vanilla AMQP
message consumption, and there is no concept of a reply.
If you want to send a response back to the caller, use RPC.
On Thursday, September 8, 2016 at 3:58:34 PM UTC+1, sha...@gmail.com >>>>>> wrote:
Ah, that's pretty much what I was after, though in my example I was
using @rpc to test things out. On my server, I was using @consume. Does
@consume still utilize the reply_to header to return the data?
Thanks so much,
-S
On Thursday, September 8, 2016 at 1:26:19 AM UTC-7, David Szotten >>>>>>> wrote:
Hi Shawn,
Perhaps you can give a bit more detail about what you are trying to
achieve. For example, you shouldn't need to worry about `reply_to` when
using the `@rpc` decorator. Whatever the `@rpc` decorated method returns is
sent back to the requesting service. It might be helpful to have a look at
the examples in the docs
<Built-in Extensions — nameko 2.12.0 documentation;
Separately, you might want to look at
GitHub - nameko/nameko-sqlalchemy: SQLAlchemy dependency for nameko services for connecting to
the db
Best,
David
On Wednesday, 7 September 2016 23:43:27 UTC+1, sha...@gmail.com >>>>>>>> wrote:
I am using the standard @consume to get a message from my RabbitMQ
server, but I don't know how to get the reply_to header/property out of the
message. Does anyone know how to do this?
ew_sendmail_queue_insert = Queue('namekotest', exchange=
ew_exchange, routing_key='insert_status')
class DBConnect(object):
name = "sql_test"
insert = Publisher(queue=ew_sendmail_queue_insert)
get_statuses_for_user = Publisher(queue=ew_sendmail_queue_list
)
# @consume(ew_sendmail_queue_insert)
@rpc
def insert(self, contents):
decoder = HeaderDecoder()
# res = decoder.unpack_message_headers(None)
# print("headers maybe? " + res)
try:
conn = psycopg2.connect(database="status",user="yyyyy"
,host="db.server.com",port="5432",password="xxxxxxxx")
conn.autocommit = True
except psycopg2.Error as e:
print("something went horribly wrong. could not
connect")
print("error: " + e.pgerror)
....more code here....
Thanks for any help you can give me.
-S