Support for init method on service

From @rizplate on Thu Mar 29 2018 21:56:23 GMT+0000 (UTC)

Can the nameko framework call an init_service method on service, where we can initialize some variables? or do we have to create extensions/dependancies

Copied from original issue: https://github.com/nameko/nameko/issues/529

From @mattbennett on Wed Apr 25 2018 15:36:57 GMT+0000 (UTC)

Sorry for the slow response to this @rizplate. The answer is that you have to create extensions to do this.

Nameko service classes are instantiated for every worker and discarded afterwards. That means if you save some state to self.whatever in one method, it won’t be there when the next worker executes. You should use dependency providers to manage state between workers.

Hi Matt,

I thought I had this sorted. I have a connect dependency for determining a connection, and then keeping it open. I understood that a dependency provider __init__, only executed once, when the service was started, and then retained state between worker call. So I have a self.connected value and a self.connectionString and a piece of logic like this:

if not self.connected:
    self.connectionString=self.connect()
    self.connected=1
return self.connectionString

And it always seems to do self.connect()

Do I need to post more code, or have I completely misunderstood this?

Regards

Steve

I understood that a dependency provider __init__ , only executed once, when the service was started

DependencyProvider.setup() and start() are only executed once, at service startup time.

__init__() is actually called twice though – once at class declaration time and once when the service class is bound to a container. See the note here about avoiding side-effects in that method.

You probably want to move your connection code into DependencyProvider.setup(). Does this explain the behaviour you’re seeing?

So any variables either global variables defined under the class statement or as self.var will persist across worker calls. Is that correct? If that is not happening then it’s a bug.

I’ll try it with .setup() and start() as well to see if it makes any difference.

If there is still a problem, I’ll start a new thread.

Thanks.

Steve.

any variables either global variables defined under the class statement or as self.var will persist across worker calls. Is that correct?

On the DependencyProvider? It is persisted, but the worker does not get access to the DependencyProvider instance, only what is returned from DependencyProvider.get_dependency(). If get_dependency() returns self you’ll get the behaviour you want.

Note that returning self is not best practice though. The value of the dependency injection pattern is in the layer of abstraction between the dependency provider and the functional part that the service actually needs. Think of the difference between code that maintains a connection pool, and the connection object itself; the pool belongs in the provider, whereas the service only needs access to a single connection.

Does it have to be named DependencyProvider.get_dependency()?

Actually, I have a series of calls to the dependency provider:

connect()
write(a)
readline()
disconnect()
getstatus()

and one or two others.

My dependencyprovider really does the work, and the worker just passes the parameter to it.

The connection object stays in dependency provider (because I don’t know how to serialise it) and I just pass stuff to it. Actually also because there is an overhead to connecting and I want to save on this overhead.

Regards

Steve.

Please post your code, or a simplified version. Not sure what you mean from just the description.

Have you read the docs on creating dependency providers?

Here’s my test code. It works if called from a shell on another machine, but it always reconnects.

Yes, I read the docs, but I think they’ve changed a lot from when I last saw them.

Here’s the code:

#!/usr/bin/env python

import subprocess
import serial  # For Arduino comms: python2.7 -m pip install pyserial
import time

from nameko.rpc import rpc, RpcProxy
from nameko.extensions import DependencyProvider

class USBConnection(DependencyProvider):
    name = "Connection_service"
    
    def __init__(self):
        self.portList2=[]
        if not hasattr(self, "connected"):
            self.portList2 = self.getPorts()
            if len(self.portList2):
                port = self.getPort()
                self.ser = self.connect(port)
            else:
                print "004 USB - No serial device found)"
        else:
            print "006 USB already connected...)"
    
    @rpc
    def getPorts(self):
    
        batcmd='ls /dev/tty*'
        portList = subprocess.check_output(batcmd, shell=True)
        portList= portList.splitlines()
        # Look for serial ports of format "ttyUSBn"
        portList2 = []
        for x in portList:
            if u'ttyUSB' in x.decode('utf-8'):
                portList2.append(x)
        
        print ("008 USB getPorts: %s") % portList2
        return portList2

    @rpc
    def getPort(self, port=0):
        
        if len(self.portList2):
            port = str(self.portList2[port].decode("utf-8"))
            print ("010 USB getPort: %s") % port
            self.ser=1
        else:
            error= u'USB not connected'
            print ("020 USB getPort: %s\n") % error
            self.err = "1"
            self.ser=0
            
        return port
    
    @rpc
    def connect(self, port):
        print ("100 USB connecting: %s") %  port
        try:
            self.conn = serial.Serial(port,9600, timeout=0) #, parity=serial.PARITY_NONE)
            time.sleep(2)  
            self.error=""
            self.ser=1
            self.connected=1
        except:
            print("130 USB connect: %s\n") % "Connect failed"
            self.ser=0
            self.connected=0
            
        return self.ser
        
    @rpc
    def write(self,A_Command):
        
        print ("210 USB write: ", self.conn, "\n")
        print ("220 USB write USBConnection '%s'" % A_Command.encode('utf-8'))
        try:
            self.conn.write(A_Command)# .encode('utf-8'))
        except:
            errMsg = "230 USB write '%s' command failed." % A_Command.encode('utf-8')
            print (errMsg)
            self.connected=0
    
    @rpc
    def writeDelayed(self,delaySecs, B_Command):
        
        time.sleep(delaySecs)
        # Clear Buffers
        self.conn.flush()
        self.conn.reset_input_buffer()
        self.conn.reset_output_buffer()
        print ("300 USB write USBConnection '%s'" % B_Command.encode('utf-8'))
        try:
            self.conn.write(A_Command.encode('utf-8'))
        except:
            errMsg = "310 USB write '%s' command failed." % B_Command.encode('utf-8')
            print (errMsg)
            connected=0
    
    @rpc
    def readline(self,A_Command):
        time.sleep(2) # wait for Arduino
        print ("405 USB readline: ", self.conn)
        try:
            self.conn.write(A_Command.encode('utf-8'))
            print ("410 USB written '%s' command." % A_Command.encode('utf-8'))
        except:
            print ("420 USB write '%s' command failed." % A_Command.encode('utf-8'))
            self.connected=0
        
        time.sleep (.4)
        a=""
        try:
            self.printConn()
            a= self.conn.readline() #.decode("utf-8") #.strip('\n').strip('\r')
            print ("430 USB readline: '%s'." % a.strip('\n').strip('\r'))
        except:
            errMsg = "440 USB readline command failed."
            print (errMsg)
            a= "read error"
        return a.strip('\n').strip('\r')
    
    @rpc
    def printConn(self):
        print (self.conn)
        return
    
class Arduino(object):
    
    name = "Arduino_service"
    usb = RpcProxy("Connection_service")
    reporting=0
    ser=0
    
    # Initialise Arduino
    @rpc
    def connect(self):
        print ("000 Arduino connect: %s\n") % "started"
        # List serial ports
        
        portList2 = self.usb.getPorts()
        if len(portList2):
            port = str(portList2[0].decode("utf-8"))
            print ("010 Arduino connect: %s\n") % port
            self.usb.connect(port)
            self.ser=1
        else:
            error= u'Arduino not connected'
            print ("020 Arduino connect: %s\n") % "Arduino NOT connected\n"
            self.ser=0

        return self.ser
    @rpc        
    def write(self,A_Command):
        if not self.usb.connected:
            print "100 Arduino Connecting..."
            self.connect()
        else:
            print "110 Arduino Connected..."
        print ("120 Arduino write '%s'" % A_Command)
        errMsg=""
        try:
            self.usb.write(A_Command)
        except:
            errMsg = "130 Arduino write '%s' command failed." % A_Command
            print (errMsg)
    
        return json.dumps(errMsg)
    @rpc
    def readline(self, A_Command):
        if not self.usb.connected:
            print "200 Arduino Connecting..."
            self.connect()
        else:
            print "210 Arduino Connected..."
        print ("220 Arduino read usb: %s" ) % A_Command
        try:
            return self.usb.readline(A_Command) #.decode("utf-8").strip('\n').strip('\r')
        except:
            errMsg = "230 Arduino readline command failed."
            print (errMsg)
            return "read error"
    
    @rpc
    def readline2(self, A_Command):
        
        ser = serial.Serial("/dev/ttyUSB1",9600, timeout=.1)  # open first serial port
        print ser.portstr       # check which port was really used

        time.sleep(4)   
        ser.write(A_Command.encode('utf-8'))      # write a string
        time.sleep(1)
        a=ser.readline()
        ser.close()             # close port
        return a.strip('\n').strip('\r')
    
    @rpc
    def disconnect(self):
        try:
            Arduino.ser.write(b'DISCONNECT#')
            print ("300 Arduino Disconnected")
        except:
            errMsg =  "Arduino write 'DISCONNECT#' command failed."
            if Arduino.reporting:
                Arduino.Log.AppendText( " " + str(errMsg) + "\n$")
            else:
                print (errMsg)
    @rpc
    def printConn(self):
        self.usb.printConn ()
        
    @rpc
    def connStatus(self):
        return self.usb.connected

This is very broken I’m afraid.

Your USBConnection class is extending DependencyProvider but being used like a service. DependencyProviders don’t have entrypoints like @rpc.

Suggest you study a simple example like those in https://github.com/nameko/nameko-examples to get a better idea. In particular, note how the Products service uses the Storage dependency to interact with Redis.

Thanks for saying it like it is.

I’ve not used Redis before. It seems to be a bit like a ram-disk. Is that right? And it’s one of the ways that the dependency info persists between calls. The others being databases or files.

So If I look at the example you gave, the steps are as follows:

  1. The app calls, eg,
    @rpc
    def get(self, product_id):
    product = self.storage.get(product_id)
    return schemas.Product().dump(product).data
  2. Which calls:
    storage.get
    But get is part of StorageWrapper. How do they connect? They don’t seem to be descended from one another.

Regards

Steve

StorageWrapper is the object returned by get_dependency.

This is Nameko’s dependency injection in action. The Storage DepedencyProvider is what is declared on the Products service class, but what is actually injected into the service when a worker fires is an instance of whatever is returned from Storage.get_dependency – in this case an instance of StorageWrapper.

This pattern allows you to keep the logic of maintaining connections (in this case just the creation of the client in Storage.setup()) separate from the logic of using those connections (in this case everything in StorageWrapper).

Hi Matt,

OK, well I really didn’t get that paradigm. I’ve searched “Dependency Injection” on Google and it’s a lot clearer. Anyhow, I’ve implemented it and it works beautifully. It’s much faster and much more error free.

I’ll try to post my code as sample code later.

Regards

Steve.

Excellent, very pleased to hear that!