Use GRPC Server Interceptor

I need to use a gRPC server interceptor to get some metrics. I’m extending grpc.ServerInterceptor if that helps. Using basic gRPC server examples, it’s very straightforward, but how I would implement this using nameko?

My server code

import socket
import time
import traceback

from nameko.rpc import rpc
from nameko_grpc.entrypoint import Grpc

from pred_pb2 import PredictReply
from pred_pb2_grpc import PredictorStub
from word_predictor import RwGenerator

# Docker Command for running local rabbitmq instance
# docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
# implements the predictor stub and allows you to use gRPC endpoints
grpc = Grpc.implementing(PredictorStub)


class PredictorService:
    # Declaration of the Predictor Service

    name = "Predictor"  # Name of the service
    predictor = None  # the random word predictor model

    @staticmethod
    def is_iterable(a):
        try:
            iter(a)
        except TypeError:
            return False
        else:
            return True

    # Initializes the predictor service
    # Loads the corpus text file
    # Does the data preparation in the init so that there's less lag when sending predictions
    def __init__(self):
        path = "corpus.txt"
        s = open(path, encoding="ascii", errors="surrogateescape").read().lower()
        self.predictor = RwGenerator(s)
        self.predictor.prepare_data(None)

    # Sends five random words back in PredictReply
    @grpc
    def make_prediction(self, request, context):
        if self.is_iterable(request):
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Sent a stream, was expecting a unary request")
            return PredictReply()
        print("Unary-Unary Prediction")
        words = self.predictor.predict_completions(None, 5)

        return PredictReply(word1=words[0], word2=words[1], word3=words[2], word4=words[3], word5=words[4])

    # Sends five random words in stream format
    @grpc
    def unary_stream_prediction(self, request, context):
        if self.is_iterable(request):
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Sent a stream, was expecting a unary request")
            return PredictReply()
        print("Unary-Stream Prediction")
        words = self.predictor.predict_completions(None, 5)

        for word in words:
            yield PredictReply(word1=word)

    # Gets a stream of inputs and sends 5 words back in a single request
    @grpc
    def stream_unary_prediction(self, request_iterator, context):
        print("Stream-Unary Prediction")
        words = self.predictor.predict_completions(None, 5)
        return PredictReply(word1=words[0], word2=words[1], word3=words[2], word4=words[3], word5=words[4])

    # Gets a stream of inputs and sends 8 words in a stream
    @grpc
    def stream_stream_prediction(self, request_iterator, context):
        print("Stream-Stream Prediction")
        words = self.predictor.predict_completions(None, 5)
        for word in words:
            print(word)
            yield PredictReply(word1=word)

    # Method for checking if server is up and running and can be easily executed from nameko shell
    @rpc
    def hello(self, name):
        return "hello, " + socket.gethostbyname(socket.gethostname()) + " {}!".format(name)
    `indent preformatted text by 4 spaces`

Hi @ArjunBansil,

I’m not sure I understand your question. What is the grpc.ServerInterceptor code are you extending? Is your desire to implement the server interceptor in Nameko, or put the server interceptor in front of your Nameko service?

I’m sorry this is all very new to me. In normal grpc examples using futures package to create a server, you can use the following code block to add an interceptor.

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=(interceptor(), ))

So I have code that extends grpc.ServerInterceptor that works using grpc.server. How would I implement this similar functionality using Nameko? Hopefully this clears things up.

@mattbennett

OK thanks, I understand. Unfortunately nameko-grpc doesn’t have support for interceptors yet, but wouldn’t be too tricky to add it.

To accept a server interceptor, the GrpcServer class in nameko_grpc.entrypoint would need to start accepting interceptors (in the same way that grpc.Server does) and then instead of this:

… we’d do something similar to grpc.Server: