#! /usr/bin/env python import random import json import uuid import socket import time import tornado.ioloop import tornado.tcpserver import tornado.httpserver import tornado.web import imc.async from imc.proxy import Proxy,Connection import netio from netio import SocketStream,SocketConnection from tojauth import TOJAuth from test_blob import TOJBlobTable,TOJBlobHandle from imc.blobserver import BlobServer class Worker: def __init__(self,main_stream,link,idendesc,worker_info,center_link): global center_serv self.main_stream = main_stream self.link = link self.idendesc = idendesc self.sock_addr = (worker_info['sock_ip'],worker_info['sock_port']) netio.send_pack(self.main_stream,bytes(json.dumps({ 'idendesc':self.idendesc, 'worker_link':self.link, 'center_link':center_link }),'utf-8')) conn = SocketConnection(self.link,self.main_stream,self.sock_addr,center_serv.add_pend_filestream) conn.add_close_callback(lambda conn : self.close()) Proxy.instance.add_conn(conn) def close(self): pass class BackendWorker(Worker): def __init__(self,main_stream,link,idendesc,worker_info,center_link): global center_serv super().__init__(main_stream,link,idendesc,worker_info,center_link) self.ws_addr = (worker_info['ws_ip'],worker_info['ws_port']) center_serv.add_backend_worker(self) def close(self): global center_serv center_serv.del_backend_worker(self) print('disconnect') class CenterServer(tornado.tcpserver.TCPServer): def __init__(self): super().__init__() self._ioloop = tornado.ioloop.IOLoop.instance() self._pend_filestream_filekeymap = {} self._linkid_usemap = {} self._worker_linkmap = {} self._client_linkmap = {} self._client_backendmap = {} self._backend_workerlist = [] self._uid_clientmap = {} pubkey = open('pubkey.pem','r').read() privkey = open('privkey.pem','r').read() TOJAuth(pubkey,privkey) self._link = self._create_link('center') self._idendesc = TOJAuth.instance.create_iden(self._link,1,TOJAuth.ROLETYPE_TOJ) Proxy(self._link,TOJAuth.instance,self._idendesc) self._init_blobserver() Proxy.instance.register_call('core/','lookup_link',self._lookup_link) Proxy.instance.register_call('core/','create_iden',self._create_iden) Proxy.instance.register_call('core/','add_client',self._add_client) Proxy.instance.register_call('core/','del_client',self._del_client) Proxy.instance.register_call('core/','get_uid_clientlink',self._get_uid_clientlink) def handle_stream(self,stream,addr): def _recv_conn_info(data): info = json.loads(data.decode('utf-8')) conntype = info['conntype'] if conntype == 'main': self._handle_mainconn(sock_stream,addr,info) elif conntype == 'file': self._handle_fileconn(sock_stream,addr,info) fd = stream.fileno() self._ioloop.remove_handler(fd) sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) netio.recv_pack(sock_stream,_recv_conn_info) def add_backend_worker(self,backend): backend_link = backend.link self._worker_linkmap[backend_link] = backend self._backend_workerlist.append(backend) self._client_backendmap[backend_link] = set() def del_backend_worker(self,backend): backend_link = backend.link del self._worker_linkmap[backend_link] self._backend_workerlist.remove(backend) for link in self._client_backendmap[backend_link]: del self._client_linkmap[link] self._client_backendmap[backend_link].remove(linkid) Proxy.instance.unlink_conn(link) del self._client_backendmap[backend_link] def dispatch_client(self): size = len(self._backend_workerlist) if size == 0: return None link = self._create_link('client') idendesc = TOJAuth.instance.create_iden(link,2,TOJAuth.ROLETYPE_GUEST) backend = self._backend_workerlist[random.randrange(size)] ws_ip,ws_port = backend.ws_addr return (link,idendesc,backend.link,ws_ip,ws_port) def add_pend_filestream(self,filekey,callback): self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) @imc.async.caller def _init_blobserver(self): blobserver = BlobServer(Proxy.instance, TOJAuth.instance, self._idendesc, self._link, 'blobtmp/1', TOJBlobTable(1), TOJBlobHandle) def _create_link(self,linkclass): linkid = uuid.uuid1() while linkid in self._linkid_usemap: linkid = uuid.uuid1() linkid = str(linkid) self._linkid_usemap[linkid] = True return ''.join(['/',linkclass,'/',str(len(self._linkid_usemap)),'/']) @imc.async.caller def _lookup_link(self,link): try: #a = int(TOJAuth.get_current_iden()['linkid']) #b = int(linkid) #if b > a: # worker = self._worker_linkidmap[str(a + 1)] #else: # worker = self._worker_linkidmap[str(a - 1)] if TOJAuth.get_current_iden()['link'].split('/')[1] != 'client': linkclass,linkid = link.split('/')[1:3] if linkclass == 'client': backend_link = self._client_linkmap[link]['backend_link'] worker = self._worker_linkmap[backend_link] sock_ip,sock_port = worker.sock_addr return { 'worker_link':worker.link, 'sock_ip':sock_ip, 'sock_port':sock_port } else: worker = self._worker_linkmap[link] sock_ip,sock_port = worker.sock_addr return { 'worker_link':worker.link, 'sock_ip':sock_ip, 'sock_port':sock_port } return None except KeyError: return None def _handle_mainconn(self,main_stream,addr,info): linkclass = info['linkclass'] if linkclass == 'backend': link = self._create_link('backend') idendesc = TOJAuth.instance.create_iden(link,1,TOJAuth.ROLETYPE_TOJ) BackendWorker(main_stream,link,idendesc,info,self._link) def _handle_fileconn(self,file_stream,addr,info): try: self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream) except KeyError: pass @imc.async.caller @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) def _create_iden(self,link,idenid,roletype,payload): linkclass = link.split('/')[1] if linkclass == 'client' and roletype == TOJAuth.ROLETYPE_USER: uid = payload['uid'] client = self._client_linkmap[link] client['uid'] = uid if uid not in self._uid_clientmap: self._uid_clientmap[uid] = {link:client} else: self._uid_clientmap[uid][link] = client return TOJAuth.instance.create_iden(link,idenid,roletype,payload) @imc.async.caller @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) def _add_client(self,client_link,backend_link): self._client_linkmap[client_link] = { 'client_link':client_link, 'backend_link':backend_link, 'uid':None } self._client_backendmap[backend_link].add(client_link) conn = Proxy.instance.get_conn(backend_link) Proxy.instance.link_conn(client_link,conn) print(client_link); @imc.async.caller @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) def _del_client(self,client_link,backend_link): uid = self._client_linkmap[client_link]['uid'] if uid != None: clients = self._uid_clientmap[uid] del clients[client_link] if len(clients) == 0: del self._uid_clientmap[uid] del self._client_linkmap[client_link] self._client_backendmap[backend_link].remove(client_link) Proxy.instance.unlink_conn(client_link) @imc.async.caller def _get_uid_clientlink(self,uid): if uid == 0: return list(self._client_linkmap.keys()) else: if uid in self._uid_clientmap: clients = self._uid_clientmap[uid] return list(clients.keys()) return [] @imc.async.caller def _test_get_client_list(self,talk,talk2): return list(self._client_linkmap.items()) class WebConnHandler(tornado.web.RequestHandler): def set_default_headers(self): self.set_header('Access-Control-Allow-Origin','*') def post(self): global center_serv data = center_serv.dispatch_client() if data == None: self.write('Eno_backend') else: client_link,client_idendesc,backend_link,ip,port = data self.write(json.dumps({ 'client_link':client_link, 'client_idendesc':client_idendesc, 'backend_link':backend_link, 'ip':ip, 'port':port })) if __name__ == '__main__': global center_serv center_serv = CenterServer() center_serv.listen(5730) http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([ ('/conn',WebConnHandler), ])) http_serv.listen(83) tornado.ioloop.IOLoop.instance().start()