calling a nameko service from a Java client


#21

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

···

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois > M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"
));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint will
serialize (if it can) whatever is returned from the decorated method and
return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON with
{args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload in
Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply queue
(bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't have a
complete copy of your client so let me just explain in English how the RPC
flow works.

* The listening service will have declared the "nameko-rpc" exchange,
and a queue called "rpc-<service-name>", bound to it with the routing key
"<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be anything
but the nameko Python client names them "rpc.reply-<service-name>-<uuid>".
It must be bound to the "nameko-rpc" exchange with a unique name; the
Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc" exchange,
with "<service-name>.<method-name>" as the routing key. The payload must be
a serialised dict containing the key "args" and "kwargs", with the values
being the parameters for the method call. You must specify a "reply_to"
header with the routing key you've used to bind the reply queue to the RPC
exchange. You should also specify a "correlation_id" header so your client
can issue multiple concurrent requests and match the replies back up again.
You also need to specify content-type and encoding headers so the service
knows how to deserialize the incoming data. By default the listening
service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your client
should consume it and use the correlation id to match it up with a pending
request.

Looking at your first post, the client is not publishing the request
message correctly. It's sending it directly to the service queue via the
default exchange, without the correct routing key. Your service is probably
consuming the message and replying with a MethodNotFound message, which was
being lost because your reply queue wasn't bound correctly either. Your
payload also isn't correct in the first version. It should be (assuming
JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see the
request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a écrit :

The RPC reply queue must be bound to the `nameko-rpc` exchange.
The reply message is published to that exchange with the 'reply_to' as the
routing key; if you haven't bound the reply queue to the correct exchange,
the reply message will be discarded by the broker as not matching any
routes.

Correlation ID has nothing to do with routing messages. It's only
used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, lauren...@gmail.com >>>>>>>>>> wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com a
écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ RPC/Java/CorrelationId
section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and error
in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received and
processed without any error, including sending the reply.

The reply message is almost certainly being published but not
routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see my
message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack, but i
do not understand why the replied message is not sent


#22

Can you post your complete client?

···

On Thursday, March 15, 2018 at 1:02:25 PM UTC, Laurent Bois M/45/174/90/CF-L1 wrote:

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"
));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint will
serialize (if it can) whatever is returned from the decorated method and
return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload in
Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't have a
complete copy of your client so let me just explain in English how the RPC
flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the request
message correctly. It's sending it directly to the service queue via the
default exchange, without the correct routing key. Your service is probably
consuming the message and replying with a MethodNotFound message, which was
being lost because your reply queue wasn't bound correctly either. Your
payload also isn't correct in the first version. It should be (assuming
JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see the
request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc` exchange.
The reply message is published to that exchange with the 'reply_to' as the
routing key; if you haven't bound the reply queue to the correct exchange,
the reply message will be discarded by the broker as not matching any
routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, lauren...@gmail.com >>>>>>>>>>> wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com a
écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received and
processed without any error, including sending the reply.

The reply message is almost certainly being published but not
routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see my
message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack, but
i do not understand why the replied message is not sent


#23

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

···

Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"
));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint will
serialize (if it can) whatever is returned from the decorated method and
return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload in
Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't have a
complete copy of your client so let me just explain in English how the RPC
flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the request
message correctly. It's sending it directly to the service queue via the
default exchange, without the correct routing key. Your service is probably
consuming the message and replying with a MethodNotFound message, which was
being lost because your reply queue wasn't bound correctly either. Your
payload also isn't correct in the first version. It should be (assuming
JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see the
request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc` exchange.
The reply message is published to that exchange with the 'reply_to' as the
routing key; if you haven't bound the reply queue to the correct exchange,
the reply message will be discarded by the broker as not matching any
routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, lauren...@gmail.com >>>>>>>>>>> wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com a
écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received and
processed without any error, including sending the reply.

The reply message is almost certainly being published but not
routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see my
message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack, but
i do not understand why the replied message is not sent


#24

The problem is that you have bound your *reply queue* to the rpc exchange
with *the same routing key as the service* . Your reply consumer is
consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

···

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a
écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"
));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint
will serialize (if it can) whatever is returned from the decorated method
and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload
in Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't have
a complete copy of your client so let me just explain in English how the
RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see the
request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc` exchange.
The reply message is published to that exchange with the 'reply_to' as the
routing key; if you haven't bound the reply queue to the correct exchange,
the reply message will be discarded by the broker as not matching any
routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, lauren...@gmail.com >>>>>>>>>>>> wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com a
écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received
and processed without any error, including sending the reply.

The reply message is almost certainly being published but
not routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see
my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack, but
i do not understand why the replied message is not sent


#25

Thanks it works

i see the error message {"content":"Returned message from service:
{\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\",
\"value\": \"string indices must be integers\", \"exc_type\":
\"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
coming from my Py server

···

Le mardi 20 mars 2018 13:24:47 UTC+1, Matt Yule-Bennett a écrit :

The problem is that you have bound your *reply queue* to the rpc exchange
with *the same routing key as the service* . Your reply consumer is
consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois > M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a
écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes(
"UTF-8"));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint
will serialize (if it can) whatever is returned from the decorated method
and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload
in Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't have
a complete copy of your client so let me just explain in English how the
RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see the
request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc`
exchange. The reply message is published to that exchange with the
'reply_to' as the routing key; if you haven't bound the reply queue to the
correct exchange, the reply message will be discarded by the broker as not
matching any routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, >>>>>>>>>>>>> lauren...@gmail.com wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com >>>>>>>>>>>>>> a écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received
and processed without any error, including sending the reply.

The reply message is almost certainly being published but
not routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see
my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack,
but i do not understand why the replied message is not sent


#26

Your service is throwing an error. You should be able to see a traceback in
the service logs.

I guess that "result" on the line below is a string, not a dict as you're
assuming.

return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

Also note that the RPC entrypoint will attempt to serialize whatever your
service method returns. So you don't need to do this string concatenation
at all.

···

On Tuesday, March 20, 2018 at 3:19:02 PM UTC, Laurent Bois M/45/174/90/CF-L1 wrote:

Thanks it works

i see the error message {"content":"Returned message from service:
{\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\",
\"value\": \"string indices must be integers\", \"exc_type\":
\"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
coming from my Py server

Le mardi 20 mars 2018 13:24:47 UTC+1, Matt Yule-Bennett a écrit :

The problem is that you have bound your *reply queue* to the rpc
exchange with *the same routing key as the service* . Your reply
consumer is consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois >> M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a
écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>>>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes(
"UTF-8"));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint
will serialize (if it can) whatever is returned from the decorated method
and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the payload
in Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't
have a complete copy of your client so let me just explain in English how
the RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com >>>>>>>>>>>> wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see
the request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc`
exchange. The reply message is published to that exchange with the
'reply_to' as the routing key; if you haven't bound the reply queue to the
correct exchange, the reply message will be discarded by the broker as not
matching any routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, >>>>>>>>>>>>>> lauren...@gmail.com wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com >>>>>>>>>>>>>>> a écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett a
écrit :

An ack for the request message means that it was received
and processed without any error, including sending the reply.

The reply message is almost certainly being published but
not routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i see
my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack,
but i do not understand why the replied message is not sent


#27

Hello

I increased the log level in the console with this config file

AMQP_URI: 'pyamqp://guest:guest@localhost'

WEB_SERVER_ADDRESS: '0.0.0.0:8000'

rpc_exchange: 'nameko-rpc'

max_workers: 10

parent_calls_tracked: 10

LOGGING:

    version: 1

    handlers:

        console:

            class: logging.StreamHandler

    root:

        level: DEBUG

        handlers: [console]

Here is what i see when i hit my service from my Java client

Start from server, version: 0.9, properties: {u'information': u'Licensed
under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ',
u'copyright': u'Copyright (C) 2007-2018 Pivotal Software, Inc.',
u'capabilities': {u'exchange_exchange_bindings': True,
u'connection.blocked': True, u'authentication_failure_close': True,
u'direct_reply_to': True, u'basic.nack': True, u'per_consumer_qos': True,
u'consumer_priorities': True, u'consumer_cancel_notify': True,
u'publisher_confirms': True}, u'cluster_name': u'rabbit@my-rabbit',
u'platform': u'Erlang/OTP 20.2.2', u'version': u'3.7.3'}, mechanisms:
[u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']

Open OK!

using channel_id: 1

Channel open

So i do not see any error messages

My service is simple

from nameko.rpc import rpc, RpcProxy

import json

from StringIO import StringIO

class Crawler(object):

    name = "crawler"

    @rpc

    def scrapit(self, username):

        return ('message', username)

My Client do a call on crawler.scrapit using args/kwargs "{'args':
['lbois']}"

And my resulting error is :

{"content":"Returned message from service: {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}

I think it comes from the args/kwargs not the returned value

Hope this helps

···

Le mercredi 21 mars 2018 17:49:33 UTC+1, Matt Yule-Bennett a écrit :

Your service is throwing an error. You should be able to see a traceback
in the service logs.

I guess that "result" on the line below is a string, not a dict as you're
assuming.

return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

Also note that the RPC entrypoint will attempt to serialize whatever your
service method returns. So you don't need to do this string concatenation
at all.

On Tuesday, March 20, 2018 at 3:19:02 PM UTC, Laurent Bois > M/45/174/90/CF-L1 wrote:

Thanks it works

i see the error message {"content":"Returned message from service:
{\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\",
\"value\": \"string indices must be integers\", \"exc_type\":
\"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
coming from my Py server

Le mardi 20 mars 2018 13:24:47 UTC+1, Matt Yule-Bennett a écrit :

The problem is that you have bound your *reply queue* to the rpc
exchange with *the same routing key as the service* . Your reply
consumer is consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois >>> M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a
écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>>>>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes(
"UTF-8"));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint
will serialize (if it can) whatever is returned from the decorated method
and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in JSON
with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a
écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the
payload in Java. In my code i just do a byte conversion from string
(service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't
have a complete copy of your client so let me just explain in English how
the RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, >>>>>>>>>>>>> lauren...@gmail.com wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see
the request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc`
exchange. The reply message is published to that exchange with the
'reply_to' as the routing key; if you haven't bound the reply queue to the
correct exchange, the reply message will be discarded by the broker as not
matching any routes.

Correlation ID has nothing to do with routing messages. It's
only used to match replies up with requests so a client can have multiple
requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, >>>>>>>>>>>>>>> lauren...@gmail.com wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, lauren...@gmail.com >>>>>>>>>>>>>>>> a écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from
client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett >>>>>>>>>>>>>>>>> a écrit :

An ack for the request message means that it was received
and processed without any error, including sending the reply.

The reply message is almost certainly being published but
not routed to the appropriate reply queue. Looking at your code... is your
reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i
see my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack,
but i do not understand why the replied message is not sent


#28

Well this has been a fun exercise in me remembering how to program in
Java...

You're right that the problem is with the input. There are three problems:

1. You're not specifying a content type. The message was treated as a
string rather than being JSON-decoded back into an object.
2. Your payload is incomplete. You have to specify *both* "args" and
"kwargs" keys.
3. Your payload is not valid JSON, which requires keys to be surrounded
with *double* quotes.

The reason that you don't see a traceback in the service is that the RPC
entrypoint handles these malformed request errors and returns them in the
reply, rather than blowing up.

The client
at https://gist.github.com/mattbennett/f12b3d0136a5fbb12c4e5852973127a7
works.

···

On Thursday, 22 March 2018 08:45:38 UTC, Laurent Bois M/45/174/90/CF-L1 wrote:

Hello

I increased the log level in the console with this config file

AMQP_URI: 'pyamqp://guest:guest@localhost'

WEB_SERVER_ADDRESS: '0.0.0.0:8000'

rpc_exchange: 'nameko-rpc'

max_workers: 10

parent_calls_tracked: 10

LOGGING:

    version: 1

    handlers:

        console:

            class: logging.StreamHandler

    root:

        level: DEBUG

        handlers: [console]

Here is what i see when i hit my service from my Java client

Start from server, version: 0.9, properties: {u'information': u'Licensed
under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ',
u'copyright': u'Copyright (C) 2007-2018 Pivotal Software, Inc.',
u'capabilities': {u'exchange_exchange_bindings': True,
u'connection.blocked': True, u'authentication_failure_close': True,
u'direct_reply_to': True, u'basic.nack': True, u'per_consumer_qos': True,
u'consumer_priorities': True, u'consumer_cancel_notify': True,
u'publisher_confirms': True}, u'cluster_name': u'rabbit@my-rabbit',
u'platform': u'Erlang/OTP 20.2.2', u'version': u'3.7.3'}, mechanisms:
[u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']

Open OK!

using channel_id: 1

Channel open

So i do not see any error messages

My service is simple

from nameko.rpc import rpc, RpcProxy

import json

from StringIO import StringIO

class Crawler(object):

    name = "crawler"

    @rpc

    def scrapit(self, username):

        return ('message', username)

My Client do a call on crawler.scrapit using args/kwargs "{'args':
['lbois']}"

And my resulting error is :

{"content":"Returned message from service: {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}

I think it comes from the args/kwargs not the returned value

Hope this helps

Le mercredi 21 mars 2018 17:49:33 UTC+1, Matt Yule-Bennett a écrit :

Your service is throwing an error. You should be able to see a traceback
in the service logs.

I guess that "result" on the line below is a string, not a dict as you're
assuming.

return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

Also note that the RPC entrypoint will attempt to serialize whatever your
service method returns. So you don't need to do this string concatenation
at all.

On Tuesday, March 20, 2018 at 3:19:02 PM UTC, Laurent Bois >> M/45/174/90/CF-L1 wrote:

Thanks it works

i see the error message {"content":"Returned message from service:
{\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\",
\"value\": \"string indices must be integers\", \"exc_type\":
\"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
coming from my Py server

Le mardi 20 mars 2018 13:24:47 UTC+1, Matt Yule-Bennett a écrit :

The problem is that you have bound your *reply queue* to the rpc
exchange with *the same routing key as the service* . Your reply
consumer is consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois >>>> M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 a
écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>>>>>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes(
"UTF-8"));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC entrypoint
will serialize (if it can) whatever is returned from the decorated method
and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form: {"args":
<positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>>>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in
JSON with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a
écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the
payload in Java. In my code i just do a byte conversion from string
(service.method)

In my last tries i sent the payload to the echange, with a reply
queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a écrit :

I took a closer look at this and found more problems. I don't
have a complete copy of your client so let me just explain in English how
the RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening service's
queue, consumed, processed and ack'd. The service will then reply by
publishing to the RPC exchange using the "reply_to" as the routing key and
including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, lauren...@gmail.com >>>>>>>>>>>> wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, >>>>>>>>>>>>>> lauren...@gmail.com wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see
the request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc`
exchange. The reply message is published to that exchange with the
'reply_to' as the routing key; if you haven't bound the reply queue to the
correct exchange, the reply message will be discarded by the broker as not
matching any routes.

Correlation ID has nothing to do with routing messages.
It's only used to match replies up with requests so a client can have
multiple requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, >>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, >>>>>>>>>>>>>>>>> lauren...@gmail.com a écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent
from client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and
error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt Yule-Bennett >>>>>>>>>>>>>>>>>> a écrit :

An ack for the request message means that it was
received and processed without any error, including sending the reply.

The reply message is almost certainly being published
but not routed to the appropriate reply queue. Looking at your code... is
your reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i
see my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack,
but i do not understand why the replied message is not sent


#29

Matt

I thank you for the fix.
Yes now it works

Happy to communicate with a Py service from Java.. I continue to explore

Thanks a lot

···

Le jeudi 22 mars 2018 17:37:59 UTC+1, Matt Yule-Bennett a écrit :

Well this has been a fun exercise in me remembering how to program in
Java...

You're right that the problem is with the input. There are three problems:

1. You're not specifying a content type. The message was treated as a
string rather than being JSON-decoded back into an object.
2. Your payload is incomplete. You have to specify *both* "args" and
"kwargs" keys.
3. Your payload is not valid JSON, which requires keys to be surrounded
with *double* quotes.

The reason that you don't see a traceback in the service is that the RPC
entrypoint handles these malformed request errors and returns them in the
reply, rather than blowing up.

The client at
https://gist.github.com/mattbennett/f12b3d0136a5fbb12c4e5852973127a7
works.

On Thursday, 22 March 2018 08:45:38 UTC, Laurent Bois M/45/174/90/CF-L1 > wrote:

Hello

I increased the log level in the console with this config file

AMQP_URI: 'pyamqp://guest:guest@localhost'

WEB_SERVER_ADDRESS: '0.0.0.0:8000'

rpc_exchange: 'nameko-rpc'

max_workers: 10

parent_calls_tracked: 10

LOGGING:

    version: 1

    handlers:

        console:

            class: logging.StreamHandler

    root:

        level: DEBUG

        handlers: [console]

Here is what i see when i hit my service from my Java client

Start from server, version: 0.9, properties: {u'information': u'Licensed
under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ',
u'copyright': u'Copyright (C) 2007-2018 Pivotal Software, Inc.',
u'capabilities': {u'exchange_exchange_bindings': True,
u'connection.blocked': True, u'authentication_failure_close': True,
u'direct_reply_to': True, u'basic.nack': True, u'per_consumer_qos': True,
u'consumer_priorities': True, u'consumer_cancel_notify': True,
u'publisher_confirms': True}, u'cluster_name': u'rabbit@my-rabbit',
u'platform': u'Erlang/OTP 20.2.2', u'version': u'3.7.3'}, mechanisms:
[u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']

Open OK!

using channel_id: 1

Channel open

So i do not see any error messages

My service is simple

from nameko.rpc import rpc, RpcProxy

import json

from StringIO import StringIO

class Crawler(object):

    name = "crawler"

    @rpc

    def scrapit(self, username):

        return ('message', username)

My Client do a call on crawler.scrapit using args/kwargs "{'args':
['lbois']}"

And my resulting error is :

{"content":"Returned message from service: {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}

I think it comes from the args/kwargs not the returned value

Hope this helps

Le mercredi 21 mars 2018 17:49:33 UTC+1, Matt Yule-Bennett a écrit :

Your service is throwing an error. You should be able to see a traceback
in the service logs.

I guess that "result" on the line below is a string, not a dict as
you're assuming.

return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

Also note that the RPC entrypoint will attempt to serialize whatever
your service method returns. So you don't need to do this string
concatenation at all.

On Tuesday, March 20, 2018 at 3:19:02 PM UTC, Laurent Bois >>> M/45/174/90/CF-L1 wrote:

Thanks it works

i see the error message {"content":"Returned message from service:
{\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\",
\"value\": \"string indices must be integers\", \"exc_type\":
\"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
coming from my Py server

Le mardi 20 mars 2018 13:24:47 UTC+1, Matt Yule-Bennett a écrit :

The problem is that you have bound your *reply queue* to the rpc
exchange with *the same routing key as the service* . Your reply
consumer is consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with *a unique
identifier* (and that same identifier passed as the reply_to when
publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

On Thursday, March 15, 2018 at 5:10:29 PM UTC, Laurent Bois >>>>> M/45/174/90/CF-L1 wrote:

Here it is

The class :

package com.mycompany.app;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {

    private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

    public String call(String routingKey, String args)
            throws java.io.IOException, InterruptedException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
            channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

            String corrId = UUID.randomUUID().toString();

                    AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
            //System.out.println(" [x] Sent '" + message + "'");

            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });

            return response.take();

    }
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
Le jeudi 15 mars 2018 14:02:25 UTC+1, Laurent Bois M/45/174/90/CF-L1 >>>>>> a écrit :

My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client =
MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' +
str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments : "{'args': ['lbois']}"

And the returned response is an echo of the call:

{"content":"Returned message from service: {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Le jeudi 15 mars 2018 10:56:58 UTC+1, Matt Yule-Bennett a écrit :

Congratulations, your client is working.

On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois >>>>>>>> M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args':
['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes(
"UTF-8"));

Now i got a returned message like this

"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

This is what you've asked your service to return. The RPC
entrypoint will serialize (if it can) whatever is returned from the
decorated method and return that to the client.

Laurent

Le mercredi 14 mars 2018 17:23:55 UTC+1, Matt Yule-Bennett a
écrit :

The *routing_key* needs to be "<service-name>.<method-name>".
The *payload* needs to be a serialized dict in the form:
{"args": <positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

*routing_key*: service.method
*payload*: {'args': ['hello', 'world', 'kwargs': {'c':'foo',
'd':'bar'}]}

The service would print:

hello world foo bar

On Tuesday, March 13, 2018 at 3:30:46 PM UTC, Laurent Bois >>>>>>>>>> M/45/174/90/CF-L1 wrote:

Hi folks

How can you translate a payload : service-name-method_name in
JSON with {args: [] , kwargs: []} notation?

Thanks

Laurent

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a
écrit :

Thanks for this great explanation

I think the problem can come from the serialization of the
payload in Java. In my code i just do a byte conversion from string
(service.method)

In my last tries i sent the payload to the echange, with a
reply queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

Le lundi 12 mars 2018 10:41:31 UTC+1, Matt Yule-Bennett a
écrit :

I took a closer look at this and found more problems. I don't
have a complete copy of your client so let me just explain in English how
the RPC flow works.

* The listening service will have declared the "nameko-rpc"
exchange, and a queue called "rpc-<service-name>", bound to it with the
routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be
anything but the nameko Python client names them
"rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc"
exchange with a unique name; the Python client uses a uuid.
* To make a call, you publish a message to the "nameko-rpc"
exchange, with "<service-name>.<method-name>" as the routing key. The
payload must be a serialised dict containing the key "args" and "kwargs",
with the values being the parameters for the method call. You must specify
a "reply_to" header with the routing key you've used to bind the reply
queue to the RPC exchange. You should also specify a "correlation_id"
header so your client can issue multiple concurrent requests and match the
replies back up again. You also need to specify content-type and encoding
headers so the service knows how to deserialize the incoming data. By
default the listening service will be expecting JSON.
* The request message will be routed to the listening
service's queue, consumed, processed and ack'd. The service will then reply
by publishing to the RPC exchange using the "reply_to" as the routing key
and including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your
client should consume it and use the correlation id to match it up with a
pending request.

Looking at your first post, the client is not publishing the
request message correctly. It's sending it directly to the service queue
via the default exchange, without the correct routing key. Your service is
probably consuming the message and replying with a MethodNotFound message,
which was being lost because your reply queue wasn't bound correctly
either. Your payload also isn't correct in the first version. It should be
(assuming JSON serialization) '{"args": [], "kwargs": []}'

On Monday, March 5, 2018 at 8:27:32 AM UTC, >>>>>>>>>>>>> lauren...@gmail.com wrote:

I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .

I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Le dimanche 4 mars 2018 22:54:06 UTC+1, Matt Yule-Bennett a écrit :

On Sunday, March 4, 2018 at 8:30:23 PM UTC, >>>>>>>>>>>>>>> lauren...@gmail.com wrote:

Thanks.

In my client, after

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");

Have you bound your reply queue with the *literal value*
"reply_to"? That won't work. It needs to be the value that you've specified
in the "reply to" header in your request message. In your case that's
replyQueueName.

But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?

That is correct, yes. If it wasn't correct you would not see
the request message being ack'd by your service.

Laurent

Le dimanche 4 mars 2018 20:57:35 UTC+1, Matt Yule-Bennett a
écrit :

The RPC reply queue must be bound to the `nameko-rpc`
exchange. The reply message is published to that exchange with the
'reply_to' as the routing key; if you haven't bound the reply queue to the
correct exchange, the reply message will be discarded by the broker as not
matching any routes.

Correlation ID has nothing to do with routing messages.
It's only used to match replies up with requests so a client can have
multiple requests in flight at the same time.

On Sunday, March 4, 2018 at 6:37:01 PM UTC, >>>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

The problem is before the correlationId.

In the Java source code that is handleDelivery which
blocks

Le dimanche 4 mars 2018 19:25:01 UTC+1, >>>>>>>>>>>>>>>>>> lauren...@gmail.com a écrit :

Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ
RPC/Java/CorrelationId section
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent
from client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue,
and error in my client
Laurent

Le dimanche 4 mars 2018 12:48:11 UTC+1, Matt >>>>>>>>>>>>>>>>>>> Yule-Bennett a écrit :

An ack for the request message means that it was
received and processed without any error, including sending the reply.

The reply message is almost certainly being published
but not routed to the appropriate reply queue. Looking at your code... is
your reply queue bound to the RPC exchange? If not, that's your problem.

On Sunday, March 4, 2018 at 11:43:49 AM UTC, >>>>>>>>>>>>>>>>>>>> lauren...@gmail.com wrote:

I only see a ack on the RabbitMQ management page... i
see my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an
ack, but i do not understand why the replied message is not sent