Kombu Custom Serializer..

Hi

Is it possible to create a custom Kombu serializer using nameko ?

I have a publisher using JMS to publish to RabbitMQ. The JMS plugin for
rabbit is setting the content-type to application/octet-stream. The actual
content is JSON.

My consumer in nameko is destroying the message:

kombu.mixins - ERROR - Can't decode message body:
ContentDisallowed('Refusing to deserialize untrusted content of type
application/octet-stream (application/octet-stream)',)
(type:u'application/octet-stream' encoding:None raw:"'message body value'"')

I can see in the kombu docs how to do it but haven't been able to work out
how to apply that within nameko ....

Thanks

This helps I think.. Will work thru it.

Hi Mick,

Looks like you figured this out yourself already. The Nameko `SERIALIZER`
config key is used to find a serializer/deserializer that has been
registered with Kombu. Kombu defaults to JSON and supports pickle and YAML
too, and you can register your own as shown in that test.

If the message publishers and consumers have been configured with the same
`SERIALIZER` value, everything will work -- the value determines the
content type and encoding of messages, and the "accept" value of consumers.
The ContentDisallowed exception is raised when a consumer receives a
message with a content type outside of its "accept" whitelist.

It would be nice if Nameko exposed its own serializer registry rather than
forcing people to configure Kombu. Also note that there is a known
deficiency <Issues · nameko/nameko · GitHub; in the Nameko
shell, which doesn't honour custom serializers.

···

On Friday, September 15, 2017 at 9:43:04 PM UTC+1, Mick wrote:

Custom serialization by mattbennett · Pull Request #264 · nameko/nameko · GitHub

This helps I think.. Will work thru it.

Hi Matt

Yes got it working - initially went down a rabbit hole looking at the
Celery way to configure it and then worked out that serializer in the
config was what I needed.

So regarding the shell - it works for me (for some reason i've not dug into
deeply). I suspect the shell issue is to do with @rpc rather than
@consume.

The issue I now have is that the version of amqp is older and is not
handling the JMS Timestamp when using the Publisher to publish a response.
The Integer value is too large - later versions of amqp (>3.0) have
corrected this to write them as long integers but not the version in nameko.

Any thoughts on that one ? I'm currently trying to work out if its
possible to edit the JMS headers after I consume the message.

Reading this - and trying to work out what it all means :slight_smile: Am I on the
right track here ?
https://groups.google.com/forum/#!searchin/nameko-dev/headers|sort:relevance/nameko-dev/hacTP0CMcwQ/aplIxHCiBAAJ

Thanks again

Mick

When you say the version of "amqp" is too old, do you
mean GitHub - celery/py-amqp: amqplib fork?

I think the chance of editing the header before consumption depends on
where the exception happens. If it's deep inside an underlying library then
maybe you won't have an opportunity to do so. If you could post a stack
trace it might help.

···

On Wednesday, September 20, 2017 at 8:58:52 PM UTC+1, Mick wrote:

Hi Matt

Yes got it working - initially went down a rabbit hole looking at the
Celery way to configure it and then worked out that serializer in the
config was what I needed.

So regarding the shell - it works for me (for some reason i've not dug
into deeply). I suspect the shell issue is to do with @rpc rather than
@consume.

The issue I now have is that the version of amqp is older and is not
handling the JMS Timestamp when using the Publisher to publish a response.
The Integer value is too large - later versions of amqp (>3.0) have
corrected this to write them as long integers but not the version in nameko.

Any thoughts on that one ? I'm currently trying to work out if its
possible to edit the JMS headers after I consume the message.

Reading this - and trying to work out what it all means :slight_smile: Am I on the
right track here ?

Redirecting to Google Groups

Thanks again

Mick

Hi

Here are the particulars:

Versions I'm running:

nameko=2.6.0
amqp = 1.4.9
kombu=3.0.37

Specific code in amqp 1.4.9 giving the issue is here:

elif isinstance(v, int_types):
     self.write(pack('>ci', b'I', v))

In amqp 2.1 this line is now:

elif isinstance(v, int_types):
if v > 2147483647 or v < -2147483647:
     write(pack('>cq', b'L', v))
else:
     write(pack('>ci', b'I', v))

Value in JMS Timestamp is: 1506007083273

Consumption is ok for some reason I haven't worked out. The pattern I have
is consume the message from a Queue and then publish the response on
another queue. The issue comes about when publishing:

            try:
                logEarn.info('Failed Item: ' + str(Output))
                self.pubfailQueue(Output, routing_key=failQueue,
content_type='application/json')
            except Exception, e:
                traceback.print_exc(file=sys.stdout)
                logEarn.error('Exception in posting failure message: ' + str
(sys.exc_info()))
                raise

Exception:

Traceback (most recent call last):
  File "./rpcQueueWorker.py", line 325, in processEarningsMessage
    self.pubfailQueue(Output, routing_key=failQueue, content_type=
'application/json')
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/nameko/messaging.py"
, line 182, in publish
    **kwargs
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/kombu/messaging.py"
, line 172, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/kombu/connection.py"
, line 449, in _ensured
    return fun(*args, **kwargs)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/kombu/messaging.py"
, line 188, in _publish
    mandatory=mandatory, immediate=immediate,
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/channel.py"
, line 2130, in basic_publish_confirm
    ret = self._basic_publish(*args, **kwargs)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/channel.py"
, line 2123, in _basic_publish
    self._send_method((60, 40), args, msg)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/abstract_channel.py"
, line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/method_framing.py"
, line 219, in write_method
    properties = content._serialize_properties()
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/serialization.py"
, line 502, in _serialize_properties
    getattr(raw_bytes, 'write_' + proptype)(val)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/serialization.py"
, line 350, in write_table
    table_data.write_item(v, k)
  File
"/home/cloud-user/earningsEng2.0/lib/python2.7/site-packages/amqp/serialization.py"
, line 369, in write_item
    self.write(pack('>ci', b'I', v))

error: 'i' format requires -2147483648 <= number <= 2147483647

Header values are:

{'nameko.JMSMessageID': u'ID:b4aa64b8-9dfe-449d-a2a6-51e57e0ccd5c',
'nameko.JMSPriority': 4, 'nameko.JMSDeliveryMode': u'PERSISTENT',
'nameko.JMSType': u'TextMessage', 'nameko.JMSTimestamp': 1506007083273,
'nameko.call_id_stack': [u
'asyncEarningsQueueWorker.processEarningsMessage.bb368e99-7690-4050-a34b-08992bc60dc7'
], 'nameko.JMSCorrelationID': u'1241a152-780c-1ee7-a7dc-01c2541031e9',
'nameko.content_type': u'application/json'}

Thanks

Mick

My thought here was to edit the value in the JMS Timestamp and set it to a
new value for the publish action. But I can't find a way to do it as it
stands. I think the get_dependency method might work but I'm still getting
my head around how that works....

I did put the amqp change in and it resolves the issue but clearly this is
not the right way to do it!

Mick

Here is my rough attempt at getting the header data out of the @consume
using a dependency provider (first time so be gentle):

class JMSHeader(DependencyProvider):

        def __init__(self):
            return

        def worker_setup(self, worker_ctx, result=None, exc_info=None):
            return worker_ctx.context_data

        def worker_result(self, worker_ctx, result=None, exc_info=None):
            return worker_ctx.context_data

class qWorker(object):

       headers = JMSHeaders()
       
       @consume(sendQueue, requeue_on_error=True)
       def getHeaders(self, msg):
              print(self.headers)
               ......

I get None but within the class I can see the headers - just can't return
them out.

Ah right. Unfortunately py-amqp 2.x introduced some backwards incompatible
changes, so Nameko is stuck with 1.x for now (this is also the reason we're
on kombu < 4.0)

The commit making the change
<https://github.com/celery/py-amqp/commit/67209e50816fff46fdae8ce7c990d12da6eb90f1&gt;
looks like a bug fix (although it's not in the changelog), which may
explain why consumption is fine but publishing fails.

The good news is that since only publishing is broken, you do have an
opportunity to scrub the value in your service. You will need to subclass
the nameko.messaging.Publisher class, and override get_message_headers.

···

On Thursday, September 21, 2017 at 4:28:25 PM UTC+1, Mick wrote:

My thought here was to edit the value in the JMS Timestamp and set it to a
new value for the publish action. But I can't find a way to do it as it
stands. I think the get_dependency method might work but I'm still getting
my head around how that works....

I did put the amqp change in and it resolves the issue but clearly this is
not the right way to do it!

Mick

Sorry, I replied to your previous message before seeing this one.

Here is my rough attempt at getting the header data out of the @consume
using a dependency provider (first time so be gentle):

class JMSHeader(DependencyProvider):

        def __init__(self):
            return

        def worker_setup(self, worker_ctx, result=None, exc_info=None):
            return worker_ctx.context_data

        def worker_result(self, worker_ctx, result=None, exc_info=None):
            return worker_ctx.context_data

class qWorker(object):

       headers = JMSHeaders()
       
       @consume(sendQueue, requeue_on_error=True)
       def getHeaders(self, msg):
              print(self.headers)
               ......

I get None but within the class I can see the headers - just can't return
them out.

You're on the right track using the DependencyProvider. worker_setup and
worker_result aren't expected to return anything though, so your current
implementation won't do anything. It's missing a get_dependency method.
Whatever get_dependency returns is what is injected into the service
instance. This will allow the service methods to read the contents of
context_data:

class JMSHeader(DependencyProvider):

    def get_dependency(self, worker_ctx):
         return worker_ctx.context_data.copy()

class qWorker(object):
       name = "qService"

       headers = JMSHeaders()
       
       @consume(sendQueue, requeue_on_error=True)
       def getHeaders(self, msg):
           print(self.headers)

I guess that you're trying to read the headers here so you can set them
again when publishing the message on, but with the timestamp updated.

In my other reply I mentioned overloading Publisher.get_message_headers.
This is because the publisher automatically inserts any context data (which
was extracted from incoming message headers) and sets it as headers in the
outgoing message. So if you override the method that does the extraction
you won't need to manually manipulate them in your service.

···

On Thursday, September 21, 2017 at 9:22:30 PM UTC+1, Mick wrote:

Awesome Matt

Thanks for the help here. Got it working:

class CustomPublisher(Publisher):
    # need to subclass Publisher so we can adjust the headers
    # JMS needs specific headers returned
    # defect in amqp 1.4.9. prevents long integers from being published.
    # nameko 2.6.0 is restricted to amqp < 2.0
    # JMS Timestamp is a long integer update to a smaller value

    def get_message_headers(self, worker_ctx):
        data = worker_ctx.context_data

        if 'JMSTimestamp' in data:
            data['JMSTimestamp'] = int(datetime.datetime.utcnow().strftime(
"%m%d%H%M%S"))

        return data

And I got the headers to return as well - not needed but certainly a good
education.

Thanks again

Mick