Is it possible to create a custom Kombu serializer using nameko ?
I have a publisher using JMS to publish to RabbitMQ. The JMS plugin for
rabbit is setting the content-type to application/octet-stream. The actual
content is JSON.
My consumer in nameko is destroying the message:
kombu.mixins - ERROR - Can't decode message body:
ContentDisallowed('Refusing to deserialize untrusted content of type
application/octet-stream (application/octet-stream)',)
(type:u'application/octet-stream' encoding:None raw:"'message body value'"')
I can see in the kombu docs how to do it but haven't been able to work out
how to apply that within nameko ....
Looks like you figured this out yourself already. The Nameko `SERIALIZER`
config key is used to find a serializer/deserializer that has been
registered with Kombu. Kombu defaults to JSON and supports pickle and YAML
too, and you can register your own as shown in that test.
If the message publishers and consumers have been configured with the same
`SERIALIZER` value, everything will work -- the value determines the
content type and encoding of messages, and the "accept" value of consumers.
The ContentDisallowed exception is raised when a consumer receives a
message with a content type outside of its "accept" whitelist.
It would be nice if Nameko exposed its own serializer registry rather than
forcing people to configure Kombu. Also note that there is a known
deficiency <Issues · nameko/nameko · GitHub; in the Nameko
shell, which doesn't honour custom serializers.
···
On Friday, September 15, 2017 at 9:43:04 PM UTC+1, Mick wrote:
Yes got it working - initially went down a rabbit hole looking at the
Celery way to configure it and then worked out that serializer in the
config was what I needed.
So regarding the shell - it works for me (for some reason i've not dug into
deeply). I suspect the shell issue is to do with @rpc rather than @consume.
The issue I now have is that the version of amqp is older and is not
handling the JMS Timestamp when using the Publisher to publish a response.
The Integer value is too large - later versions of amqp (>3.0) have
corrected this to write them as long integers but not the version in nameko.
Any thoughts on that one ? I'm currently trying to work out if its
possible to edit the JMS headers after I consume the message.
I think the chance of editing the header before consumption depends on
where the exception happens. If it's deep inside an underlying library then
maybe you won't have an opportunity to do so. If you could post a stack
trace it might help.
···
On Wednesday, September 20, 2017 at 8:58:52 PM UTC+1, Mick wrote:
Hi Matt
Yes got it working - initially went down a rabbit hole looking at the
Celery way to configure it and then worked out that serializer in the
config was what I needed.
So regarding the shell - it works for me (for some reason i've not dug
into deeply). I suspect the shell issue is to do with @rpc rather than @consume.
The issue I now have is that the version of amqp is older and is not
handling the JMS Timestamp when using the Publisher to publish a response.
The Integer value is too large - later versions of amqp (>3.0) have
corrected this to write them as long integers but not the version in nameko.
Any thoughts on that one ? I'm currently trying to work out if its
possible to edit the JMS headers after I consume the message.
Reading this - and trying to work out what it all means Am I on the
right track here ?
elif isinstance(v, int_types):
if v > 2147483647 or v < -2147483647:
write(pack('>cq', b'L', v))
else:
write(pack('>ci', b'I', v))
Value in JMS Timestamp is: 1506007083273
Consumption is ok for some reason I haven't worked out. The pattern I have
is consume the message from a Queue and then publish the response on
another queue. The issue comes about when publishing:
My thought here was to edit the value in the JMS Timestamp and set it to a
new value for the publish action. But I can't find a way to do it as it
stands. I think the get_dependency method might work but I'm still getting
my head around how that works....
I did put the amqp change in and it resolves the issue but clearly this is
not the right way to do it!
Ah right. Unfortunately py-amqp 2.x introduced some backwards incompatible
changes, so Nameko is stuck with 1.x for now (this is also the reason we're
on kombu < 4.0)
The good news is that since only publishing is broken, you do have an
opportunity to scrub the value in your service. You will need to subclass
the nameko.messaging.Publisher class, and override get_message_headers.
···
On Thursday, September 21, 2017 at 4:28:25 PM UTC+1, Mick wrote:
My thought here was to edit the value in the JMS Timestamp and set it to a
new value for the publish action. But I can't find a way to do it as it
stands. I think the get_dependency method might work but I'm still getting
my head around how that works....
I did put the amqp change in and it resolves the issue but clearly this is
not the right way to do it!
I get None but within the class I can see the headers - just can't return
them out.
You're on the right track using the DependencyProvider. worker_setup and
worker_result aren't expected to return anything though, so your current
implementation won't do anything. It's missing a get_dependency method.
Whatever get_dependency returns is what is injected into the service
instance. This will allow the service methods to read the contents of
context_data:
I guess that you're trying to read the headers here so you can set them
again when publishing the message on, but with the timestamp updated.
In my other reply I mentioned overloading Publisher.get_message_headers.
This is because the publisher automatically inserts any context data (which
was extracted from incoming message headers) and sets it as headers in the
outgoing message. So if you override the method that does the extraction
you won't need to manually manipulate them in your service.
···
On Thursday, September 21, 2017 at 9:22:30 PM UTC+1, Mick wrote:
class CustomPublisher(Publisher):
# need to subclass Publisher so we can adjust the headers
# JMS needs specific headers returned
# defect in amqp 1.4.9. prevents long integers from being published.
# nameko 2.6.0 is restricted to amqp < 2.0
# JMS Timestamp is a long integer update to a smaller value
def get_message_headers(self, worker_ctx):
data = worker_ctx.context_data
if 'JMSTimestamp' in data:
data['JMSTimestamp'] = int(datetime.datetime.utcnow().strftime(
"%m%d%H%M%S"))
return data
And I got the headers to return as well - not needed but certainly a good
education.