I need to use a gRPC server interceptor to get some metrics. I’m extending grpc.ServerInterceptor if that helps. Using basic gRPC server examples, it’s very straightforward, but how I would implement this using nameko?
My server code
import socket
import time
import traceback
from nameko.rpc import rpc
from nameko_grpc.entrypoint import Grpc
from pred_pb2 import PredictReply
from pred_pb2_grpc import PredictorStub
from word_predictor import RwGenerator
# Docker Command for running local rabbitmq instance
# docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
# implements the predictor stub and allows you to use gRPC endpoints
grpc = Grpc.implementing(PredictorStub)
class PredictorService:
# Declaration of the Predictor Service
name = "Predictor" # Name of the service
predictor = None # the random word predictor model
@staticmethod
def is_iterable(a):
try:
iter(a)
except TypeError:
return False
else:
return True
# Initializes the predictor service
# Loads the corpus text file
# Does the data preparation in the init so that there's less lag when sending predictions
def __init__(self):
path = "corpus.txt"
s = open(path, encoding="ascii", errors="surrogateescape").read().lower()
self.predictor = RwGenerator(s)
self.predictor.prepare_data(None)
# Sends five random words back in PredictReply
@grpc
def make_prediction(self, request, context):
if self.is_iterable(request):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("Sent a stream, was expecting a unary request")
return PredictReply()
print("Unary-Unary Prediction")
words = self.predictor.predict_completions(None, 5)
return PredictReply(word1=words[0], word2=words[1], word3=words[2], word4=words[3], word5=words[4])
# Sends five random words in stream format
@grpc
def unary_stream_prediction(self, request, context):
if self.is_iterable(request):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("Sent a stream, was expecting a unary request")
return PredictReply()
print("Unary-Stream Prediction")
words = self.predictor.predict_completions(None, 5)
for word in words:
yield PredictReply(word1=word)
# Gets a stream of inputs and sends 5 words back in a single request
@grpc
def stream_unary_prediction(self, request_iterator, context):
print("Stream-Unary Prediction")
words = self.predictor.predict_completions(None, 5)
return PredictReply(word1=words[0], word2=words[1], word3=words[2], word4=words[3], word5=words[4])
# Gets a stream of inputs and sends 8 words in a stream
@grpc
def stream_stream_prediction(self, request_iterator, context):
print("Stream-Stream Prediction")
words = self.predictor.predict_completions(None, 5)
for word in words:
print(word)
yield PredictReply(word1=word)
# Method for checking if server is up and running and can be easily executed from nameko shell
@rpc
def hello(self, name):
return "hello, " + socket.gethostbyname(socket.gethostname()) + " {}!".format(name)
`indent preformatted text by 4 spaces`