Hello everyone.
I am trying to create corresponding services for sending services. I am using the event_handler
extension to receiving in a dictionary with image byte data. I run into the issue that the event handler seems to be creating queues with odd name appends. So, for example, I set the exchange as image_send_exchange
but it creates image_send_exchange.events
. I can see in the code that it appends the “.events” manually for some reason (still messing with it, so that does not happen). And then I set the queue as image_send_queue
but evt-image_send_exchange-image_send_queue--event_listen.handle_img
gets created in the broker. The routing key is set to image_send_queue
, which is fine, but the rest is a hassle. Still hacking with it, but any help would be appreciated!
Here is my code:
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container
from nameko.containers import ServiceContainer
import time
import os
import sys
import eventlet
eventlet.monkey_patch()
AMQP_URI = 'pyamqp://guest:guest@<host_ip_addr>'
config = {
'AMQP_URI': AMQP_URI, # e.g. "pyamqp://guest:guest@localhost"
'serializer': 'pickle'
}
class ServiceB:
name = 'event_listen'
@event_handler('image_send_exchange', 'image_send_queue')
def handle_img(self, payload):
for key, value in payload.items():
with open(f'sent/{key}', 'wb') as img:
img.write(value)
print('Image has been written')
if __name__ == '__main__':
try:
print('Listener Service started: Waiting for messages')
print('Container Started')
while True:
print('Remaking Container')
container = ServiceContainer(ServiceB, config)
print('Restarting Container')
container.start()
time.sleep(20)
print('Stoping the Container')
container.stop()
print('Killing the Container')
container.kill()
time.sleep(20)
except KeyboardInterrupt:
print('Close Out program')
try:
container.stop()
container.kill()
sys.exit(0)
except SystemExit:
os._exit(0)