Dynamic dependency Injection from @event_handler method

Hey guys,

We are trying to implement data pipeline, where services communicate with
each other using nameko.
One of the services query config service and passes the config object using
dispatch() method.

Other method needs to parse the config and use it to connect to the outside
dependency.

#Some storage service (google-storage, s3 etc....)
from storage import *
from nameko.events import EventDispatcher, event_handler
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc

class Storage(DependencyProvider):

    def __init__(self, json_key, project_name, bucket_name):
        self.json_key = json_key
        self.project_name = project_name
        self.bucket_name = bucket_name

    def setup(self):
        credentials = service_account.Credentials.from_service_account_file(self.json_key)
        self.storage_client = storage.Client(project=self.project_name, credentials=credentials)

    def get_dependency(self, worker_ctx):
        bucket = self.storage_client.bucket(self.bucket_name)
        return bucket

class SendConfig:
    name = "send_config"

    dispatch = EventDispatcher()
    #dispatch config
    @rpc
    def dispatch_method(self, payload):
        self.dispatch('config', payload)

class ListFiles:

    name = "list_files"

    def list_files(self):
        for blob in self.bucket.list_blobs():
            return blob.path
    # get config and try to connect to the storage
    @event_handler("send_config", 'pid_config')

    def print_config(self, payload):
    # Dependency ingestion
    self.bucket = Storage(payload["json_key"], payload["project_name"], payload["bucket"])
    # printing the results from storage
    print self.list_log_files()

When I run it, I know that setup method is not fired hence it doesn't
return the bucket from get_dependency(). Is there a way to start injection
manually with received config?

Thanks,
Yevgeniy

DependencyProviders must be declared on the service class, not in service
methods. Dependency injection happens when a new worker is spawned, before
the service method is executed.

If you need to dynamically configure your storage client based on a payload
received by another service method, you'll need to do it inside the service
method.

···

On Thursday, December 21, 2017 at 10:58:44 PM UTC, Yevgeniy Viller wrote:

Hey guys,

We are trying to implement data pipeline, where services communicate with
each other using nameko.
One of the services query config service and passes the config object
using dispatch() method.

Other method needs to parse the config and use it to connect to the
outside dependency.

#Some storage service (google-storage, s3 etc....)
from storage import *
from nameko.events import EventDispatcher, event_handler
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc

class Storage(DependencyProvider):

    def __init__(self, json_key, project_name, bucket_name):
        self.json_key = json_key
        self.project_name = project_name
        self.bucket_name = bucket_name

    def setup(self):
        credentials = service_account.Credentials.from_service_account_file(self.json_key)
        self.storage_client = storage.Client(project=self.project_name, credentials=credentials)

    def get_dependency(self, worker_ctx):
        bucket = self.storage_client.bucket(self.bucket_name)
        return bucket

class SendConfig:
    name = "send_config"

    dispatch = EventDispatcher()
    #dispatch config
    @rpc
    def dispatch_method(self, payload):
        self.dispatch('config', payload)

class ListFiles:

    name = "list_files"

    def list_files(self):
        for blob in self.bucket.list_blobs():
            return blob.path
    # get config and try to connect to the storage
    @event_handler("send_config", 'pid_config')

    def print_config(self, payload):
    # Dependency ingestion
    self.bucket = Storage(payload["json_key"], payload["project_name"], payload["bucket"])
    # printing the results from storage
    print self.list_log_files()

When I run it, I know that setup method is not fired hence it doesn't
return the bucket from get_dependency(). Is there a way to start injection
manually with received config?

Thanks,
Yevgeniy

Thanks, Matt.

···

On Friday, December 22, 2017 at 7:04:38 AM UTC-5, Matt Yule-Bennett wrote:

DependencyProviders must be declared on the service class, not in service
methods. Dependency injection happens when a new worker is spawned, before
the service method is executed.

If you need to dynamically configure your storage client based on a
payload received by another service method, you'll need to do it inside the
service method.

On Thursday, December 21, 2017 at 10:58:44 PM UTC, Yevgeniy Viller wrote:

Hey guys,

We are trying to implement data pipeline, where services communicate with
each other using nameko.
One of the services query config service and passes the config object
using dispatch() method.

Other method needs to parse the config and use it to connect to the
outside dependency.

#Some storage service (google-storage, s3 etc....)
from storage import *
from nameko.events import EventDispatcher, event_handler
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc

class Storage(DependencyProvider):

    def __init__(self, json_key, project_name, bucket_name):
        self.json_key = json_key
        self.project_name = project_name
        self.bucket_name = bucket_name

    def setup(self):
        credentials = service_account.Credentials.from_service_account_file(self.json_key)
        self.storage_client = storage.Client(project=self.project_name, credentials=credentials)

    def get_dependency(self, worker_ctx):
        bucket = self.storage_client.bucket(self.bucket_name)
        return bucket

class SendConfig:
    name = "send_config"

    dispatch = EventDispatcher()
    #dispatch config
    @rpc
    def dispatch_method(self, payload):
        self.dispatch('config', payload)

class ListFiles:

    name = "list_files"

    def list_files(self):
        for blob in self.bucket.list_blobs():
            return blob.path
    # get config and try to connect to the storage
    @event_handler("send_config", 'pid_config')

    def print_config(self, payload):
    # Dependency ingestion
    self.bucket = Storage(payload["json_key"], payload["project_name"], payload["bucket"])
    # printing the results from storage
    print self.list_log_files()

When I run it, I know that setup method is not fired hence it doesn't
return the bucket from get_dependency(). Is there a way to start injection
manually with received config?

Thanks,
Yevgeniy