Hello,
I wanted to implement a websocket server, which listens for rpc calls using nameko.
The primary aim is to emit events on the websocket connections, after an rpc is called in the same implementation.
I went through the test websocket code.
The problem I am facing is, the websocket hub is redifinig “rpc” in its class.
Now, I am able to spawn a websocket, and also receive events from the html example code.
But I want rpc calls (implemented on the websocket server side), to be called from the backed, and emit events to the frontend (html client).
I am not able to figure out how to implement these RPC calls, as normally I would use nameko.rpc, but here the rpc is overloaded by the websocket hub.
Ideally I want something like this:
{Browser based Web socket client}<—> {Nameko WS server + RPC server}<— {Backend Nameko RPC
Client}
Any pointers would help.
Hi @rarindam,
Can you share a code sample to understand exactly your problem. In any case, if it’s just a problem of redefining “rpc”, why don’t you just import on service file, websocket rpc as ws_rpc or something else just to avoid shadowing of rpc. Something like that:
from nameko.rpc import rpc
from nameko.web.websocket import rpc as ws_rpc
Hi,
Thank you for replying.
-
I tried that route. Issue is the following:
a. Using a combination of standard python Websocket, along with nameko, needs the usage of monekypatches. When doing this, the standard websocket fails. The nameko rpc calls work.
-
I found out the solution is to use Nameko’s own websockethub implementation. In this way I am using nameko runtime, and its own websocket implementation, which is fine and seems to work.
from nameko.web.websocket import WebSocketHubProvider, rpc
from nameko.events import event_handler
class WebsocketService(object):
name = “websockets”
container_id = ContainerIdentifier()
websocket_hub = WebSocketHubProvider()
@event_handler("event_listener", "evt_received")
def handle_event(self, payload):
self.websocket_hub.broadcast('test_channel', 'ping', {
'value': "payload {} from {}".format(payload, self.container_id),
})
- Another way of achieving the same is to use combination of python websockets and kombo messaging:
from kombu import Exchange, Queue
from nameko.messaging import Publisher, consume
MY_ROUTING_KEY = “my_routing_key”
my_exchange = Exchange(name=“my_exchange”)
my_queue = Queue(exchange=my_exchange, routing_key=MY_ROUTING_KEY, name=MY_ROUTING_KEY)
@consume(queue=my_queue)
def update_ws(self, payload):
self.websocket_hub.broadcast(‘test_channel’, ‘ping’, {
‘value’: “payload {} from {}”.format(payload, self.container_id),
})
Hi @rarindam,
I am glad to hear that you found a way to achieve your goal.
I have followed many times option (2) and it has worked me well.
Finally the only tricky part is that you need to implement websocket heartbeat , especially if your nameko service gets deployed behind a reverse proxy like nginx.
@rarindam @spyrosmarko just like this~
code
#! -*- coding: utf-8 -*-
# author: forcemain@163.com
import uuid
from nameko.rpc import rpc
from jinja2 import Template
from nameko.web.handlers import http
from werkzeug.wrappers import Response
from nameko.web.websocket import rpc as ws_rpc
from nameko.dependency_providers import Config
from nameko.web.websocket import WebSocketHubProvider
ws_html_template = '''
<!DOCTYPE html>
<html>
<head>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>
<style>
#console{
padding:5px;
border:1px solid black;
}
#console p {
margin:0;
}
.event {
color:#999;
}
.warning{
color: orange;
}
</style>
<title>Nameko Websocket Test</title>
</head>
<body>
<div id="wrapper">
<h1>Nameko Websocket Test</h1>
<button id="disconnect">Disconnect</button>
<div id="container">
<div id="console">
</div>
</div>
<script type="text/javascript">
function connect(){
var socket;
var host = "ws://{{ws_server_address}}/ws";
try{
var socket = new WebSocket(host);
message('<p class="event">Socket Status: '+socket.readyState);
socket.onopen = function(){
message('<p class="event">Socket Status: '+socket.readyState+' (open)');
subscribe();
}
socket.onmessage = function(msg){
message('<p class="message">Received: '+msg.data);
}
socket.onclose = function(){
message('<p class="event">Socket Status: '+socket.readyState+' (Closed)');
}
} catch(exception){
message('<p>Error'+exception);
}
function subscribe() {
try{
var json_msg = `{
"method": "subscribe",
"data": {}
}`;
socket.send(json_msg);
message('<p class="event">Sent: '+text)
} catch(exception){
message('<p class="warning">');
}
}
function message(msg){
$('#console').append(msg+'</p>');
}
$('#disconnect').click(function(){
socket.close();
});
}
$(document).ready(function() {
if(!("WebSocket" in window)){
$('<p>This demo requires browser websocket support</p>').appendTo('#container');
return
}
connect();
});
</script>
</body>
</html>
'''
# 服务定义
class ServiceA(object):
# 服务名称
name = 'service_a'
# 一点建议: 改为服务依赖
uuid = str(uuid.uuid4())
# 服务依赖 - 配置对象
config = Config()
# 服务依赖 - 连接管理
ws_hub = WebSocketHubProvider()
@http('GET', '/')
def index(self, request):
ws_server_address = self.config.get('WS_SERVER_ADDRESS', '127.0.0.1:8000')
data = Template(ws_html_template).render({
'ws_server_address': ws_server_address
})
headers = {'Content-Type': 'text/html'}
return Response(data, status=200, headers=headers)
# 广播方法
@rpc
def broadcast(self, event, data):
channel = self.uuid
self.ws_hub.broadcast(channel, event, data)
return {'code': '0', 'message': 'success'}
# 单播方法
@rpc
def unicast(self, socket_id, event, data):
self.ws_hub.unicast(socket_id, event, data)
return {'code': '0', 'message': 'success'}
# 取消订阅
@rpc
def unsubscribe(self, socket_id):
channel = self.uuid
self.ws_hub.unsubscribe(socket_id, channel)
return {'code': '0', 'message': 'success'}
# 关闭连接
@rpc
def close(self, socket_id):
client = self.ws_hub._server.sockets.get(socket_id)
client.socket.close()
return {'code': '0', 'message': 'success'}
# WS处理
@ws_rpc
def subscribe(self, socket_id):
channel = self.uuid
self.ws_hub.subscribe(socket_id, channel)
return 'subscribe to {}'.format(channel)
usage
In [1]: n.rpc.service_a.broadcast('service_a.broadcast', {'container_id': 'd4ee0618-2238-4436-b550-54261f5535e1'})
Out[1]: {u'code': u'0', u'message': u'success'}
In [2]: n.rpc.service_a.unicast('45bd8018-7e99-49d0-8b80-6aab6387da2d', 'service_a.unicast', {'container_id': 'd4ee0618-2238-4436-b550-54261f5535e1'})
Out[2]: {u'code': u'0', u'message': u'success'}
In [3]: n.rpc.service_a.unsubscribe('45bd8018-7e99-49d0-8b80-6aab6387da2d')
Out[3]: {u'code': u'0', u'message': u'success'}
In [4]: n.rpc.service_a.close('45bd8018-7e99-49d0-8b80-6aab6387da2d')
Out[4]: {u'code': u'0', u'message': u'success'}
webpage
hi @forcemain, yes I think that your sample is correct. Nice one!