Hey guys,
We are trying to implement data pipeline, where services communicate with
each other using nameko.
One of the services query config service and passes the config object using
dispatch() method.
Other method needs to parse the config and use it to connect to the outside
dependency.
#Some storage service (google-storage, s3 etc....)
from storage import *
from nameko.events import EventDispatcher, event_handler
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
class Storage(DependencyProvider):
def __init__(self, json_key, project_name, bucket_name):
self.json_key = json_key
self.project_name = project_name
self.bucket_name = bucket_name
def setup(self):
credentials = service_account.Credentials.from_service_account_file(self.json_key)
self.storage_client = storage.Client(project=self.project_name, credentials=credentials)
def get_dependency(self, worker_ctx):
bucket = self.storage_client.bucket(self.bucket_name)
return bucket
class SendConfig:
name = "send_config"
dispatch = EventDispatcher()
#dispatch config
@rpc
def dispatch_method(self, payload):
self.dispatch('config', payload)
class ListFiles:
name = "list_files"
def list_files(self):
for blob in self.bucket.list_blobs():
return blob.path
# get config and try to connect to the storage
@event_handler("send_config", 'pid_config')
def print_config(self, payload):
# Dependency ingestion
self.bucket = Storage(payload["json_key"], payload["project_name"], payload["bucket"])
# printing the results from storage
print self.list_log_files()
When I run it, I know that setup method is not fired hence it doesn't
return the bucket from get_dependency(). Is there a way to start injection
manually with received config?
Thanks,
Yevgeniy