Websocket with Nameko

Hello,
I wanted to implement a websocket server, which listens for rpc calls using nameko.
The primary aim is to emit events on the websocket connections, after an rpc is called in the same implementation.
I went through the test websocket code.
The problem I am facing is, the websocket hub is redifinig “rpc” in its class.
Now, I am able to spawn a websocket, and also receive events from the html example code.

But I want rpc calls (implemented on the websocket server side), to be called from the backed, and emit events to the frontend (html client).

I am not able to figure out how to implement these RPC calls, as normally I would use nameko.rpc, but here the rpc is overloaded by the websocket hub.

Ideally I want something like this:

{Browser based Web socket client}<—> {Nameko WS server + RPC server}<— {Backend Nameko RPC
Client}

Any pointers would help.

Hi @rarindam,

Can you share a code sample to understand exactly your problem. In any case, if it’s just a problem of redefining “rpc”, why don’t you just import on service file, websocket rpc as ws_rpc or something else just to avoid shadowing of rpc. Something like that:

from nameko.rpc import rpc
from nameko.web.websocket import rpc as ws_rpc

Hi,
Thank you for replying.

  1. I tried that route. Issue is the following:
    a. Using a combination of standard python Websocket, along with nameko, needs the usage of monekypatches. When doing this, the standard websocket fails. The nameko rpc calls work.

  2. I found out the solution is to use Nameko’s own websockethub implementation. In this way I am using nameko runtime, and its own websocket implementation, which is fine and seems to work.

from nameko.web.websocket import WebSocketHubProvider, rpc
from nameko.events import event_handler
class WebsocketService(object):
name = “websockets”

container_id = ContainerIdentifier()
websocket_hub = WebSocketHubProvider()

@event_handler("event_listener", "evt_received")
def handle_event(self, payload):
    self.websocket_hub.broadcast('test_channel', 'ping', {
        'value': "payload {} from {}".format(payload, self.container_id),
    })
  1. Another way of achieving the same is to use combination of python websockets and kombo messaging:

from kombu import Exchange, Queue
from nameko.messaging import Publisher, consume
MY_ROUTING_KEY = “my_routing_key”
my_exchange = Exchange(name=“my_exchange”)

my_queue = Queue(exchange=my_exchange, routing_key=MY_ROUTING_KEY, name=MY_ROUTING_KEY)
@consume(queue=my_queue)
def update_ws(self, payload):
self.websocket_hub.broadcast(‘test_channel’, ‘ping’, {
‘value’: “payload {} from {}”.format(payload, self.container_id),
})

Hi @rarindam,

I am glad to hear that you found a way to achieve your goal.

I have followed many times option (2) and it has worked me well.

Finally the only tricky part is that you need to implement websocket heartbeat , especially if your nameko service gets deployed behind a reverse proxy like nginx.

@rarindam @spyrosmarko just like this~

code

#! -*- coding: utf-8 -*-


# author: forcemain@163.com

import uuid


from nameko.rpc import rpc
from jinja2 import Template
from nameko.web.handlers import http
from werkzeug.wrappers import Response
from nameko.web.websocket import rpc as ws_rpc
from nameko.dependency_providers import Config
from nameko.web.websocket import WebSocketHubProvider


ws_html_template = '''
<!DOCTYPE html>
<html>
<head>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>
<style>
    #console{
        padding:5px;
        border:1px solid black;
    }
    #console p {
        margin:0;
    }
    .event {
        color:#999;
    }
    .warning{
        color: orange;
    }
</style>
<title>Nameko Websocket Test</title>
</head>
<body>
<div id="wrapper">
    <h1>Nameko Websocket Test</h1>
    <button id="disconnect">Disconnect</button>
    <div id="container">
        <div id="console">
    </div>
</div>
<script type="text/javascript">
    function connect(){
        var socket;
        var host = "ws://{{ws_server_address}}/ws";
        try{
            var socket = new WebSocket(host);
            message('<p class="event">Socket Status: '+socket.readyState);
            socket.onopen = function(){
                message('<p class="event">Socket Status: '+socket.readyState+' (open)');
                subscribe();
            }
            socket.onmessage = function(msg){
                message('<p class="message">Received: '+msg.data);
            }
            socket.onclose = function(){
                message('<p class="event">Socket Status: '+socket.readyState+' (Closed)');
            }
        } catch(exception){
            message('<p>Error'+exception);
        }
        function subscribe() {
            try{
                var json_msg = `{
                    "method": "subscribe",
                    "data": {}
                }`;
                socket.send(json_msg);
                message('<p class="event">Sent: '+text)
            } catch(exception){
                message('<p class="warning">');
            }
        }
        function message(msg){
            $('#console').append(msg+'</p>');
        }
        $('#disconnect').click(function(){
            socket.close();
        });
    }
    $(document).ready(function() {
        if(!("WebSocket" in window)){
            $('<p>This demo requires browser websocket support</p>').appendTo('#container');
            return
        }
        connect();
    });
</script>
</body>
</html>
'''


# 服务定义
class ServiceA(object):
    # 服务名称
    name = 'service_a'

    # 一点建议: 改为服务依赖
    uuid = str(uuid.uuid4())

    # 服务依赖 - 配置对象
    config = Config()
    # 服务依赖 - 连接管理
    ws_hub = WebSocketHubProvider()

    @http('GET', '/')
    def index(self, request):
        ws_server_address = self.config.get('WS_SERVER_ADDRESS', '127.0.0.1:8000')
        data = Template(ws_html_template).render({
            'ws_server_address': ws_server_address
        })
        headers = {'Content-Type': 'text/html'}
        return Response(data, status=200, headers=headers)

    # 广播方法
    @rpc
    def broadcast(self, event, data):
        channel = self.uuid
        self.ws_hub.broadcast(channel, event, data)
        return {'code': '0', 'message': 'success'}

    # 单播方法
    @rpc
    def unicast(self, socket_id, event, data):
        self.ws_hub.unicast(socket_id, event, data)
        return {'code': '0', 'message': 'success'}

    # 取消订阅
    @rpc
    def unsubscribe(self, socket_id):
        channel = self.uuid
        self.ws_hub.unsubscribe(socket_id, channel)
        return {'code': '0', 'message': 'success'}

    # 关闭连接
    @rpc
    def close(self, socket_id):
        client = self.ws_hub._server.sockets.get(socket_id)
        client.socket.close()
        return {'code': '0', 'message': 'success'}

    # WS处理
    @ws_rpc
    def subscribe(self, socket_id):
        channel = self.uuid
        self.ws_hub.subscribe(socket_id, channel)
        return 'subscribe to {}'.format(channel)

usage

In [1]: n.rpc.service_a.broadcast('service_a.broadcast', {'container_id': 'd4ee0618-2238-4436-b550-54261f5535e1'})
Out[1]: {u'code': u'0', u'message': u'success'}
In [2]: n.rpc.service_a.unicast('45bd8018-7e99-49d0-8b80-6aab6387da2d', 'service_a.unicast', {'container_id': 'd4ee0618-2238-4436-b550-54261f5535e1'})
Out[2]: {u'code': u'0', u'message': u'success'}
In [3]: n.rpc.service_a.unsubscribe('45bd8018-7e99-49d0-8b80-6aab6387da2d')
Out[3]: {u'code': u'0', u'message': u'success'}
In [4]: n.rpc.service_a.close('45bd8018-7e99-49d0-8b80-6aab6387da2d')
Out[4]: {u'code': u'0', u'message': u'success'}

webpage

hi @forcemain, yes I think that your sample is correct. Nice one!