Calling an RPC service within a dependency

Hi,

I have a LogService which receives log and exception info via RPC in the
Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services. The idea is
that this dependency provides methods to log at various levels (e.g.
log_info(msg), log_warn(msg) etc., and also collects exception info (via
worker_result). The dependency would then build an appropriate GELF object
and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the
dependency. Any tips appreciated!

Thanks,
Chris

Nice. I've been waiting for someone to write a GELF DependencyProvider :slight_smile:

Is there a reason the DP has to make RPC calls to your logging service
rather than just publishing AMQP messages directly?

You may also want to take a look
at GitHub - Overseas-Student-Living/nameko-tracer, which you
could probably use by simply plugging in a GELF log handler.

···

On Saturday, January 13, 2018 at 7:24:35 PM UTC, Chris Platts wrote:

Hi,

I have a LogService which receives log and exception info via RPC in the
Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services. The idea is
that this dependency provides methods to log at various levels (e.g.
log_info(msg), log_warn(msg) etc., and also collects exception info (via
worker_result). The dependency would then build an appropriate GELF object
and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the
dependency. Any tips appreciated!

Thanks,
Chris

Thanks, Matt -- I'll take a look through the tracer to see how it works.

I suppose there's no harm putting the GELF-over-AMQP stuff into the DP
rather than a separate service. Would I still be able to use
nameko.messaging.Publisher inside a DP to handle the AMQP stuff, or would
an AMQP client have to be handled in the DP's own code?

Thanks,
Chris

Hi, Matt

I'm in a similar situation where I'm trying to capture all logging output
(including from nameko-tracer) and send it to a specific service, although
this service writes it to a PostgreSQL db rather than sending it to Graylog.

Each of our databases is 'owned' by a single service, so in theory all logs
would be inserted into the logging (service information) database through
the logging (service information) service, because the service owns that
data domain and handles all interactions with that database.

Is there a good way to configure the logging to send all logs to a queue
that can be processed by a service? Is there a better approach?

(I'm currently working off the example you've provided for sending to
Graylog, but I wondered if you had any opinions on sending traffic to a
nameko service.)

Thank you for your help

···

On Saturday, January 13, 2018 at 1:13:23 PM UTC-7, Matt Yule-Bennett wrote:

Nice. I've been waiting for someone to write a GELF DependencyProvider :slight_smile:

Is there a reason the DP has to make RPC calls to your logging service
rather than just publishing AMQP messages directly?

You may also want to take a look at
GitHub - Overseas-Student-Living/nameko-tracer, which you could
probably use by simply plugging in a GELF log handler.

On Saturday, January 13, 2018 at 7:24:35 PM UTC, Chris Platts wrote:

Hi,

I have a LogService which receives log and exception info via RPC in the
Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services. The idea is
that this dependency provides methods to log at various levels (e.g.
log_info(msg), log_warn(msg) etc., and also collects exception info (via
worker_result). The dependency would then build an appropriate GELF object
and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the
dependency. Any tips appreciated!

Thanks,
Chris

...answered my own question - looks like dependencies are injected into
services, but not each other. Naively trying to use Publisher() in my
Logger DP leads to:

`TypeError: bind() missing 1 required positional argument: 'attr_name'`

So would I be better off importing ampqlib or somesuch and using it
directly in the DP?

Hi Brooke,

I would recommend almost the same solution as suggested above. In your
setup though, the AMQP publisher would publish messages to be consumed by
your logging service, rather than a Graylog consumer. You can use the
Consumer entrypoint in nameko.messaging to consume messages from an
arbitrary queue, and then simply write them to your logging database. This
would be asynchronous messaging rather than RPC, but I think in this
logging scenario a reply is not desired.

Incidentally, the above pattern is exactly what we did at Student.com for a
while -- using a separate Nameko service to consume AMQP messages and write
to a database (Elasticsearch in our case). We have since ditched the
intermediate service in favour of writing to ES directly from the tracer.
While I strongly advocate the one-database-per-service rule for *domain*
data, I think logging is a separate concern and it's fine for the
repository of those logs to be shared. This is just my opinion with no
knowledge of what your logging / service-information service is actually
doing though, so take it with a pinch of salt.

Hope that helps,
Matt.

···

On Monday, January 15, 2018 at 11:55:48 PM UTC, Brooke Lynne Weaver Skousen wrote:

Hi, Matt

I'm in a similar situation where I'm trying to capture all logging output
(including from nameko-tracer) and send it to a specific service, although
this service writes it to a PostgreSQL db rather than sending it to Graylog.

Each of our databases is 'owned' by a single service, so in theory all
logs would be inserted into the logging (service information) database
through the logging (service information) service, because the service owns
that data domain and handles all interactions with that database.

Is there a good way to configure the logging to send all logs to a queue
that can be processed by a service? Is there a better approach?

(I'm currently working off the example you've provided for sending to
Graylog, but I wondered if you had any opinions on sending traffic to a
nameko service.)

Thank you for your help

On Saturday, January 13, 2018 at 1:13:23 PM UTC-7, Matt Yule-Bennett wrote:

Nice. I've been waiting for someone to write a GELF DependencyProvider :slight_smile:

Is there a reason the DP has to make RPC calls to your logging service
rather than just publishing AMQP messages directly?

You may also want to take a look at
GitHub - Overseas-Student-Living/nameko-tracer, which you
could probably use by simply plugging in a GELF log handler.

On Saturday, January 13, 2018 at 7:24:35 PM UTC, Chris Platts wrote:

Hi,

I have a LogService which receives log and exception info via RPC in the
Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services. The idea
is that this dependency provides methods to log at various levels (e.g.
log_info(msg), log_warn(msg) etc., and also collects exception info (via
worker_result). The dependency would then build an appropriate GELF object
and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the
dependency. Any tips appreciated!

Thanks,
Chris

...answered my own question - looks like dependencies are injected into
services, but not each other. Naively trying to use Publisher() in my
Logger DP leads to:

`TypeError: bind() missing 1 required positional argument: 'attr_name'`

Right, DependencyProviders are specifically about injecting dependencies
into Nameko services -- you can't put a DP inside another DP.

So would I be better off importing ampqlib or somesuch and using it
directly in the DP?

Actually if you look at the nameko.messaging.Publisher DP you'll see that
(apart from inheriting from DependencyProvider and implementing
get_dependency()) it doesn't do very much except create an instance of
nameko.amqp.publish.Publisher -- this is a utility class that makes it
easier to publish messages to RabbitMQ. I would use this, rather than
amqplib or kombu directly.

When I directed you to nameko-tracer I forgot that the critical part of
actually publishing messages to AMQP was spun into another (currently
closed-source) library. If you choose to use the tracer, you'll want to
plug the AMQP publishing part in as a Python logging handler.

Here's a skeleton implementation:

import logging

from nameko.amqp.publish import Publisher

logger = logging.getLogger(__name__)

class PublisherHandler(logging.Handler):
    """ Handler for publishing trace messages to RabbitMQ
    """
    def __init__(
        self, amqp_uri, exchange_name, routing_key,
        serializer=None, content_type=None
    ):
        self.publisher = Publisher(
            amqp_uri,
            exchange=exchange_name,
            routing_key=routing_key,
            serializer=serializer,
            content_type=content_type
        )
        super(PublisherHandler, self).__init__()

    def emit(self, log_record):
        try:
            self.publisher.publish(self.format(log_record))
        except Exception:
            self.handleError(log_record)

Use by registering it as a handler using the normal logging configuration:

LOGGING:
  formatters:
    tracer:
      (): yourib.logging.formatters.GELFFormatter
  handlers:
    tracer:
      formatter: tracer
      (): yourlib.logging.handlers.PublisherHandler
      amqp_uri: <GELF_AMQP_URI>
      exchange_name: <GELF_EXCHANGE_NAME>
      routing_key: <GELF_ROUTING_KEY>
      serializer: json
      content_type: application/json
  loggers:
    nameko_tracer:
      level: INFO
      handlers:
        - tracer

I would personally do it this way, implementing the GELF format as a
standard logging formatter too. If there's anything required by Graylog
which is not already collected by the tracer, I'm sure a PR to
nameko-tracer would be gratefully received.

···

On Saturday, 13 January 2018 22:27:39 UTC, Chris Platts wrote:

I forgot to respond a couple days ago; this helped a lot, thank you. We've
got it working exactly how we want it now. I appreciate your input.

···

On Tuesday, January 16, 2018 at 12:20:59 AM UTC-7, Matt Yule-Bennett wrote:

Hi Brooke,

I would recommend almost the same solution as suggested above. In your
setup though, the AMQP publisher would publish messages to be consumed by
your logging service, rather than a Graylog consumer. You can use the
Consumer entrypoint in nameko.messaging to consume messages from an
arbitrary queue, and then simply write them to your logging database. This
would be asynchronous messaging rather than RPC, but I think in this
logging scenario a reply is not desired.

Incidentally, the above pattern is exactly what we did at Student.com for
a while -- using a separate Nameko service to consume AMQP messages and
write to a database (Elasticsearch in our case). We have since ditched the
intermediate service in favour of writing to ES directly from the tracer.
While I strongly advocate the one-database-per-service rule for *domain*
data, I think logging is a separate concern and it's fine for the
repository of those logs to be shared. This is just my opinion with no
knowledge of what your logging / service-information service is actually
doing though, so take it with a pinch of salt.

Hope that helps,
Matt.

On Monday, January 15, 2018 at 11:55:48 PM UTC, Brooke Lynne Weaver > Skousen wrote:

Hi, Matt

I'm in a similar situation where I'm trying to capture all logging output
(including from nameko-tracer) and send it to a specific service, although
this service writes it to a PostgreSQL db rather than sending it to Graylog.

Each of our databases is 'owned' by a single service, so in theory all
logs would be inserted into the logging (service information) database
through the logging (service information) service, because the service owns
that data domain and handles all interactions with that database.

Is there a good way to configure the logging to send all logs to a queue
that can be processed by a service? Is there a better approach?

(I'm currently working off the example you've provided for sending to
Graylog, but I wondered if you had any opinions on sending traffic to a
nameko service.)

Thank you for your help

On Saturday, January 13, 2018 at 1:13:23 PM UTC-7, Matt Yule-Bennett >> wrote:

Nice. I've been waiting for someone to write a GELF DependencyProvider :slight_smile:

Is there a reason the DP has to make RPC calls to your logging service
rather than just publishing AMQP messages directly?

You may also want to take a look at
GitHub - Overseas-Student-Living/nameko-tracer, which you
could probably use by simply plugging in a GELF log handler.

On Saturday, January 13, 2018 at 7:24:35 PM UTC, Chris Platts wrote:

Hi,

I have a LogService which receives log and exception info via RPC in
the Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services. The idea
is that this dependency provides methods to log at various levels (e.g.
log_info(msg), log_warn(msg) etc., and also collects exception info (via
worker_result). The dependency would then build an appropriate GELF object
and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the
dependency. Any tips appreciated!

Thanks,
Chris

...and I forgot too :slight_smile:

Ended up using Publisher within a DP. The DP does a handful of things:

* Builds GELF messages with a few standard extra properties -- e.g.
_serviceName, _callID, _hostName, etc.
* Logs service lifecycle events via GELF-over-AMQP
* Provides log.info(), log.debug(), log.error(), log.warn(), log.fatal()
functions to the service code.

However, I'm wondering if I should copy trace's approach and use Python's
logging library. That would give config-file based setup of targets and
levels. However, using Graylog's making me think that perhaps I shouldn't
worry about such things -- I'm quite happy for all log data to always be
sent.

(we're using Nameko due to the architectural advantages, not because we
handle hundreds of calls per second, so log data is a stream rather than a
flood!)

···

On Friday, 19 January 2018 19:31:54 UTC, Brooke Lynne Weaver Skousen wrote:

I forgot to respond a couple days ago; this helped a lot, thank you. We've
got it working exactly how we want it now. I appreciate your input.

Well, this was all going well until it wasn't!

I think I'm being naive when creating the kombu AMQP connection/queue in
the logging dependency. Once we have several workers active, we start
getting exceptions regarding multiple writes to a file descriptor
(apologies, I'm paraphrasing).

The dependecy's creating a single queue in its setup() method, which I now
realise is being shared by all workers. I think the right thing to do is
refactor this so that a queue connection is created for each worker.
Perhaps set up the connection in *worker_setup() *and store it in a shared
dictionary for inclusion in the object returned by *get_dependency()* and
then cleaned-up in *worker_result()*.

I'll see how that goes!

Well, this was all going well until it wasn't!

I think I'm being naive when creating the kombu AMQP connection/queue in
the logging dependency. Once we have several workers active, we start
getting exceptions regarding multiple writes to a file descriptor
(apologies, I'm paraphrasing).

"Second simultaneous write detected"?

This is Eventlet protecting you from multiple threads writing to the same
file descriptor (probably a socket in this case) at the same time. As
per Writing Extensions — nameko 2.12.0 documentation,
whatever is returned from get_dependency() needs to be safe for concurrent
access by multiple worker threads.

The dependecy's creating a single queue in its setup() method, which I now
realise is being shared by all workers.

In AMQP you publish to *exchanges* rather than queues. RabbitMQ has a
shortcut that allows you to publish a message into a named queue, but
really what it's doing is publishing to the default exchange, which is
implicitly bound to every queue by their name.

So the shared object is not the queue, but perhaps the connection being
used by your publisher.

What's weird is that the built-in Nameko Publisher and EventDispatcher use
a publisher shared between worker threads and don't have any protection
against multiple access. This is fine if the write to the socket is atomic
and I've never seen the lack of protection cause a problem.

Are you sending particularly large payloads?

I think the right thing to do is refactor this so that a queue connection
is created for each worker. Perhaps set up the connection in *worker_setup()
*and store it in a shared dictionary for inclusion in the object returned
by *get_dependency()* and then cleaned-up in *worker_result()*.

This would work, but it would be slow because you're creating a new
connection every time you publish a message. If you're going to do that you
may as well ignore worker_setup() and worker_teardown() and just create
the connection in get_dependency() or lazily when you publish. A more
efficient way would be to project the publisher with a lock, to serialise
the concurrent access.

I'll see how that goes!

If you can post some code here and/or a stack trace we might be able to
figure out what's going wrong. It may be a platform specific problem.

···

On Thursday, February 22, 2018 at 8:40:46 PM UTC, Chris Platts wrote: