diff options
author | pzread <netfirewall@gmail.com> | 2013-06-09 13:34:55 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-06-09 13:34:55 +0800 |
commit | 77dd822815744579b05da117efb14f43b7088038 (patch) | |
tree | ea2388767586a740d06bde30c4e29c940f9a91c4 /src | |
parent | 872567a4cf3bff7d9d310f5e66f465f5523d58d9 (diff) | |
parent | 8cf636373548c8e3484a137268ddd041d12bbe4a (diff) | |
download | taiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.tar.gz taiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.tar.zst taiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.zip |
Merge branch '2.0'
Conflicts:
README.md
Diffstat (limited to 'src')
-rw-r--r-- | src/js/imc.js | 220 | ||||
-rw-r--r-- | src/py/asyncdb.py | 331 | ||||
-rw-r--r-- | src/py/backend_server.py | 331 | ||||
-rw-r--r-- | src/py/center_server.py | 235 | ||||
-rw-r--r-- | src/py/imc/__init__.py | 1 | ||||
-rw-r--r-- | src/py/imc/async.py | 121 | ||||
-rw-r--r-- | src/py/imc/auth.py | 79 | ||||
-rw-r--r-- | src/py/imc/nonblock.py | 61 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 559 | ||||
-rw-r--r-- | src/py/netio.py | 511 | ||||
-rw-r--r-- | src/py/tojauth.py | 179 | ||||
-rwxr-xr-x | src/py/user.py | 336 | ||||
-rw-r--r-- | src/test/wstest.css | 81 | ||||
-rw-r--r-- | src/test/wstest.html | 97 | ||||
-rw-r--r-- | src/test/wstest.js | 178 |
15 files changed, 3320 insertions, 0 deletions
diff --git a/src/js/imc.js b/src/js/imc.js new file mode 100644 index 0000000..8e444a1 --- /dev/null +++ b/src/js/imc.js @@ -0,0 +1,220 @@ +var __extend__ = function(child,parent){ + child.prototype.__super__ = parent; +}; + +var imc = new function(){ + this.Connection = function(linkid){ + var that = this; + + that.link_linkidmap = {}; + that.close_callback = []; + that.linkid = linkid; + + that.send_msg = function(data){}; + that.start_recv = function(recv_callback){}; + + that.close = function(){ + var i; + + for(i = 0;i < that.close_callback.length;i++){ + that.close_callback[i](that); + } + }; + }; + + this.Proxy = function(linkid,auth,connect_linkid){ + var MSGTYPE_CALL = 'call'; + var MSGTYPE_RET = 'ret'; + + var that = this; + var caller_retid_count = 0; + var conn_linkidmap = {}; + var conn_retidmap = {}; + var call_pathmap = {}; + + var route_call = function(caller_retid,timeout,idendesc,dst,func_name,param,callback){ + var i; + var part; + var dst_linkid; + var dst_path; + var caller_linkid; + var func; + + var _add_wait_caller = function(conn_linkid){ + conn_retidmap[conn_linkid][caller_retid] = { + 'timeout':timeout, + 'callback':callback + } + }; + + part = dst.split('/'); + dst_linkid = part[2]; + dst_path = part.slice(3).join('/'); + + iden = auth.get_iden(idendesc); + + caller_linkid = iden.linkid + if(caller_retid.split('/')[0] != caller_linkid){ + return false; + } + + if(dst_linkid == linkid){ + if((func = call_pathmap[dst_path + func_name]) != undefined){ + _add_wait_caller(linkid); + + func(iden,param,function(data){ + if(linkid in conn_retidmap && caller_retid in conn_retidmap[linkid]){ + delete conn_retidmap[linkid][caller_retid]; + callback({'stat':true,'data':data}); + } + }); + }else{ + callback({'stat':false,'data':'Enoexist'}); + } + }else{ + that.request_conn(dst_linkid,function(conn){ + if(caller_linkid == linkid){ + _add_wait_caller(conn.linkid); + } + + send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param); + }); + } + }; + + var recv_dispatch = function(conn,data){ + msgo = JSON.parse(data); + if(msgo.type == MSGTYPE_CALL){ + recv_msg_call(conn,msgo); + }else if(msgo.type == MSGTYPE_RET){ + recv_msg_ret(conn,msgo); + } + }; + + var send_msg_call = function(conn,caller_retid,timeout,idendesc,dst,func_name,param){ + msg = { + 'type':MSGTYPE_CALL, + 'caller_retid':caller_retid, + 'timeout':timeout, + 'idendesc':idendesc, + 'dst':dst, + 'func_name':func_name, + 'param':param + }; + + conn.send_msg(JSON.stringify(msg)); + }; + var recv_msg_call = function(conn,msg){ + var caller_retid = msg.caller_retid; + var timeout = msg.timeout; + var idendesc = msg.idendesc; + var dst = msg.dst; + var func_name = msg.func_name; + var param = msg.param; + + route_call(caller_retid,timeout,idendesc,dst,func_name,param,function(result){ + that.request_conn(caller_retid,function(conn){ + var iden; + + iden = auth.get_iden(idendesc); + send_msg_ret(conn,iden.linkid,caller_retid,result); + }); + }); + }; + + var send_msg_ret = function(conn,caller_linkid,caller_retid,result){ + msg = { + 'type':MSGTYPE_RET, + 'caller_linkid':caller_linkid, + 'caller_retid':caller_retid, + 'result':result + }; + + conn.send_msg(JSON.stringify(msg)); + }; + var recv_msg_ret = function(conn,msg){ + var caller_linkid = msg['caller_linkid']; + var caller_retid = msg['caller_retid']; + var result = msg['result']; + + if(caller_linkid == linkid){ + if(conn.linkid in conn_retidmap && caller_retid in conn_retidmap[conn.linkid]){ + wait = conn_retidmap[conn.linkid][caller_retid]; + delete conn_retidmap[conn.linkid][caller_retid]; + + wait.callback(result); + } + }else{ + request_conn(caller_linkid,function(conn){ + send_msg_ret(conn,caller_linkid,caller_retid,result); + }); + } + }; + + that.add_conn = function(conn){ + conn_linkidmap[conn.linkid] = conn; + conn_retidmap[conn.linkid] = {}; + conn.start_recv(recv_dispatch); + }; + that.link_conn = function(linkid,conn){ + conn.link_linkidmap[linkid] = true; + conn_linkidmap[linkid] = conn; + }; + that.unlink_conn = function(linkid){ + conn = conn_linkidmap[linkid]; + delete conn_linkidmap[linkid]; + delete conn.link_linkidmap[linkid]; + }; + that.del_conn = function(conn){ + delete conn_linkidmap[conn.linkid]; + }; + that.request_conn = function(linkid,callback){ + var _conn_cb = function(conn){ + if(conn != null && conn.linkid != linkid){ + that.link_conn(linkid,conn); + } + + callback(conn); + }; + + conn = conn_linkidmap[linkid]; + if(conn != undefined){ + _conn_cb(conn); + }else{ + connect_linkid(linkid,_conn_cb); + } + }; + + that.call = function(idendesc,timeout,dst,func_name,param,callback){ + caller_retid = linkid + '/' + caller_retid_count; + caller_retid_count += 1; + + route_call(caller_retid,timeout,idendesc,dst,func_name,param,callback); + }; + + that.register_call = function(path,func_name,func){ + call_pathmap[path + func_name] = func; + }; + + conn_retidmap[linkid] = {}; + + imc.Proxy.instance = that; + }; + + this.Auth = function(){ + var that = this; + + that.get_iden = function(idendesc){ + return JSON.parse(JSON.parse(idendesc)[0]); + }; + + imc.Auth.instance = that; + }; +}; + +function imc_call(idendesc,dst,func_name,param,callback){ + imc.Proxy.instance.call(idendesc,10000,dst,func_name,param,callback); +} +function imc_register_call(path,func_name,func){ + imc.Proxy.instance.register_call(path,func_name,func); +} diff --git a/src/py/asyncdb.py b/src/py/asyncdb.py new file mode 100644 index 0000000..1e2bf7a --- /dev/null +++ b/src/py/asyncdb.py @@ -0,0 +1,331 @@ +from collections import deque +import random + +import tornado.ioloop +import tornado.stack_context +import psycopg2 + +import imc.async + +class RestrictCursor: + def __init__(self,db,cur): + self._db = db + self._cur = cur + self._ori_cur = cur + self._in_transaction = False + + self._init_implement() + + def __iter__(self): + return self._cur + + def execute(self,sql,param = None): + self._db.execute(self._cur,sql,param) + + self.arraysize = self._cur.arraysize + self.itersize = self._cur.itersize + self.rowcount = self._cur.rowcount + self.rownumber = self._cur.rownumber + self.lastrowid = self._cur.lastrowid + self.query = self._cur.query + self.statusmessage = self._cur.statusmessage + + def begin(self): + if self._in_transaction == True: + return + + self._cur = self._db.begin_transaction() + self._init_implement() + + self._db.execute(self._cur,'BEGIN;') + + self._in_transaction = True + + def commit(self): + if self._in_transaction == False: + return + + self._db.execute(self._cur,'COMMIT;') + if self._cur.statusmessage == 'COMMIT': + ret = True + + else: + ret = False + + self._db.end_transaction(self._cur.connection) + self._cur = self._ori_cur + + self._in_transaction = False + + return ret + + def rollback(self): + if self._in_transaction == False: + return + + self._db.execute(self._cur,'ROLLBACK;') + + self._db.end_transaction(self._cur.connection) + self._cur = self._ori_cur + + self._in_transaction = False + + def upsert(self,name,cond,value = None): + cond_keys = [] + cond_vals = [] + items = cond.items() + for key,val in items: + cond_keys.append('"' + key + '"') + cond_vals.append(val) + + value_keys = [] + value_vals = [] + if value != None: + items = value.items() + for key,val in items: + value_keys.append('"' + key + '"') + value_vals.append(val) + + query_list = ['UPDATE "' + name + '" SET '] + for key in value_keys: + query_list.append(key) + query_list.append('=%s') + query_list.append(',') + + query_list[-1] = ' WHERE ' + + for key in cond_keys: + query_list.append(key) + query_list.append('=%s') + query_list.append(' AND ') + + query_list[-1] = ';' + update_query = ''.join(query_list) + + update_param = list(value_vals) + update_param.extend(cond_vals) + + query_list = ['INSERT INTO "' + name + '" ('] + for key in cond_keys: + query_list.append(key) + query_list.append(',') + for key in value_keys: + query_list.append(key) + query_list.append(',') + + count = len(cond) + if value != None: + count += len(value) + + query_list[-1] = ') VALUES (' + query_list.extend(['%s,'] * (count - 1)) + query_list.append('%s);') + insert_query = ''.join(query_list) + + insert_param = list(cond_vals) + insert_param.extend(value_vals) + + while True: + self.begin() + + if value != None: + self.execute(update_query,update_param) + + if value == None or self.rowcount == 0: + try: + self.execute(insert_query,insert_param) + + except psycopg2.IntegrityError: + self.rollback() + if value == None: + break + + else: + continue + + if self.commit() == True: + break + + def _init_implement(self): + self.fetchone = self._cur.fetchone + self.fetchmany = self._cur.fetchmany + self.fetchall = self._cur.fetchall + self.scroll = self._cur.scroll + self.cast = self._cur.cast + self.tzinfo_factory = self._cur.tzinfo_factory + + self.arraysize = 0 + self.itersize = 0 + self.rowcount = 0 + self.rownumber = 0 + self.lastrowid = None + self.query = '' + self.statusmessage = '' + +class AsyncDB: + def __init__(self,dbname,user,password): + self.OPER_CURSOR = 0 + self.OPER_EXECUTE = 1 + + self._ioloop = tornado.ioloop.IOLoop.instance() + + self._dbname = dbname + self._user = user + self._password = password + self._conn_fdmap = {} + self._free_connpool = [] + self._share_connpool = [] + self._pendoper_fdmap = {} + self._opercallback_fdmap = {} + + for i in range(8): + conn = self._create_conn() + self._free_connpool.append(conn) + + self._ioloop.add_handler(conn.fileno(), + self._oper_dispatch, + tornado.ioloop.IOLoop.ERROR) + + self._ioloop.add_callback(self._oper_dispatch,conn.fileno(),0) + + for i in range(2): + conn = self._create_conn() + self._share_connpool.append(conn) + + self._ioloop.add_handler(conn.fileno(), + self._oper_dispatch, + tornado.ioloop.IOLoop.ERROR) + + self._ioloop.add_callback(self._oper_dispatch,conn.fileno(),0) + + def cursor(self): + return RestrictCursor(self,self._cursor()) + + def execute(self,cur,sql,param = None): + fd = cur.connection.fileno() + + self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),imc.async.get_retid())) + self._ioloop.add_callback(self._oper_dispatch,fd,0) + + imc.async.switch_top() + + def begin_transaction(self): + if len(self._free_connpool) > 0: + conn = self._free_connpool.pop() + + else: + conn = self._create_conn() + self._ioloop.add_handler(conn.fileno(), + self._oper_dispatch, + tornado.ioloop.IOLoop.ERROR) + + return self._cursor(conn) + + def end_transaction(self,conn): + if len(self._free_connpool) < 16: + self._free_connpool.append(conn) + + else: + self._close_conn(conn) + + def _cursor(self,conn = None): + if conn != None: + fd = conn.fileno() + + else: + fd = self._share_connpool[random.randrange(len(self._share_connpool))].fileno() + + self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,imc.async.get_retid())) + self._ioloop.add_callback(self._oper_dispatch,fd,0) + + cur = imc.async.switch_top() + return cur + + def _create_conn(self): + conn = psycopg2.connect(database = self._dbname, + user = self._user, + password = self._password, + async = 1) + + fd = conn.fileno() + self._conn_fdmap[fd] = conn + self._pendoper_fdmap[fd] = deque() + self._opercallback_fdmap[fd] = None + + return conn + + def _close_conn(self,conn): + fd = conn.fileno() + self._conn_fdmap.pop(fd,None) + self._pendoper_fdmap.pop(fd,None) + self._opercallback_fdmap.pop(fd,None) + + conn.close() + + def _oper_dispatch(self,fd,evt): + err = None + try: + conn = self._conn_fdmap[fd] + + except KeyError: + self._ioloop.remove_handler(fd) + return + + try: + stat = conn.poll() + + except Exception as e: + err = e + + if err != None or stat == psycopg2.extensions.POLL_OK: + self._ioloop.update_handler(fd, + tornado.ioloop.IOLoop.ERROR) + + elif stat == psycopg2.extensions.POLL_READ: + self._ioloop.update_handler(fd, + tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.ERROR) + + return + + elif stat == psycopg2.extensions.POLL_WRITE: + self._ioloop.update_handler(fd, + tornado.ioloop.IOLoop.WRITE | tornado.ioloop.IOLoop.ERROR) + + return + + cb = self._opercallback_fdmap[fd] + if cb != None: + self._opercallback_fdmap[fd] = None + cb(err) + + else: + try: + oper,data,retid = self._pendoper_fdmap[fd].popleft() + + except IndexError: + return + + if oper == self.OPER_CURSOR: + def _ret_cursor(err = None): + if err == None: + imc.async.ret(retid,conn.cursor()) + + else: + imc.async.ret(retid,err = err) + + self._opercallback_fdmap[fd] = _ret_cursor + + elif oper == self.OPER_EXECUTE: + def _ret_execute(err = None): + if err == None: + imc.async.ret(retid) + + else: + imc.async.ret(retid,err = err) + + cur,sql,param = data + + cur.execute(sql,param) + self._opercallback_fdmap[fd] = _ret_execute + + self._ioloop.add_callback(self._oper_dispatch,fd,0) diff --git a/src/py/backend_server.py b/src/py/backend_server.py new file mode 100644 index 0000000..7535183 --- /dev/null +++ b/src/py/backend_server.py @@ -0,0 +1,331 @@ +#! /usr/bin/env python + +import socket +import json +import datetime +import time +import random +from multiprocessing import Process + +import tornado.ioloop +import tornado.tcpserver +import tornado.httpserver +import tornado.websocket + +from imc import auth +import imc.async +from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call + +import netio +from netio import SocketStream,SocketConnection,WebSocketConnection +from tojauth import TOJAuth + +class BackendWorker(tornado.tcpserver.TCPServer): + def __init__(self,center_addr,ws_port): + super().__init__() + + self._ioloop = tornado.ioloop.IOLoop.current() + self.center_addr = center_addr + self.sock_addr = None + self.ws_port = ws_port + + self._linkid = None + self._idendesc = None + self._pend_mainconn_linkidmap = {} + self._pend_filestream_filekeymap = {} + self._client_linkidmap = {} + + def start(self): + sock_port = random.randrange(4096,8192) + self.sock_addr = ('10.8.0.10',sock_port) + + self.bind(sock_port,'',socket.AF_INET,65536) + super().start() + + self._conn_center() + + def handle_stream(self,stream,addr): + def _recv_conn_info(data): + info = json.loads(data.decode('utf-8')) + conntype = info['conntype'] + + if conntype == 'main': + self._handle_mainconn(sock_stream,addr,info) + + elif conntype == 'file': + self._handle_fileconn(sock_stream,addr,info) + + fd = stream.fileno() + self._ioloop.remove_handler(fd) + sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) + + netio.recv_pack(sock_stream,_recv_conn_info) + + def add_client(self,linkid,handler): + self._client_linkidmap[linkid] = {} + + conn = netio.WebSocketConnection('client',linkid,handler) + conn.add_close_callback(lambda conn : self.del_client(conn.linkid)) + Proxy.instance.add_conn(conn) + + #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','add_client',{'backend_linkid':self._linkid,'client_linkid':linkid}) + + return conn + + def del_client(self,linkid): + del self._client_linkidmap[linkid] + + #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','del_client',linkid) + + def _conn_center(self): + def __retry(conn): + print('retry connect center') + + self.center_conn = None + self._ioloop.add_timeout(datetime.timedelta(seconds = 5),self._conn_center) + + def __send_worker_info(): + def ___recv_info_cb(data): + info = json.loads(data.decode('utf-8')) + + pubkey = open('pubkey.pem','r').read() + TOJAuth(pubkey) + + self._idendesc = info['idendesc'] + iden = TOJAuth.instance.get_iden('backend',self._linkid,self._idendesc) + self._linkid = iden['linkid'] + Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid) + + self.center_conn = SocketConnection('center',info['center_linkid'],stream,self.center_addr) + self.center_conn.add_close_callback(__retry) + Proxy.instance.add_conn(self.center_conn) + + imc_register_call('','test_dst',self._test_dst) + #imc_register_call('','test_dsta',self._test_dsta) + #time.sleep(2) + + if int(self._linkid) == 2: + self._test_call('9') + + sock_ip,sock_port = self.sock_addr + netio.send_pack(stream,bytes(json.dumps({ + 'linkclass':'backend', + 'sock_ip':sock_ip, + 'sock_port':sock_port, + 'ws_ip':'210.70.137.215', + 'ws_port':self.ws_port + }),'utf-8')) + netio.recv_pack(stream,___recv_info_cb) + + stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + stream.set_close_callback(__retry) + stream.connect(self.center_addr,__send_worker_info) + + def _conn_linkid(self,linkid): + def __handle_pend(conn): + try: + retids = self._pend_mainconn_linkidmap.pop(worker_linkid) + + except KeyError: + return + + for retid in retids: + imc.async.ret(retid,conn) + + def __conn_cb(): + conn = Proxy.instance.get_conn(worker_linkid) + if conn != None: + __handle_pend(conn) + main_stream.set_close_callback(None) + main_stream.close() + + else: + sock_ip,sock_port = self.sock_addr + netio.send_pack(main_stream,bytes(json.dumps({ + 'conntype':'main', + 'linkclass':'backend', + 'linkid':self._linkid, + 'sock_ip':sock_ip, + 'sock_port':sock_port + }),'utf-8')) + netio.recv_pack(main_stream,__recv_cb) + + def __recv_cb(data): + stat = json.loads(data.decode('utf-8')) + if stat == True: + conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,sock_addr,self._add_pend_filestream) + Proxy.instance.add_conn(conn) + __handle_pend(conn) + + else: + main_stream.set_close_callback(None) + main_stream.close() + + if self.center_conn == None: + return None + + stat,ret = imc_call(self._idendesc,'/center/' + self.center_conn.linkid + '/','lookup_linkid',linkid) + + if stat == False or ret == None: + return None + + else: + worker_linkclass = ret['worker_linkclass'] + worker_linkid = ret['worker_linkid'] + + conn = Proxy.instance.get_conn(worker_linkid) + if conn != None: + return conn + + elif worker_linkid in self._pend_mainconn_linkidmap: + self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.get_retid()) + return imc.async.switch_top() + + else: + self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.get_retid()] + + sock_addr = (ret['sock_ip'],ret['sock_port']) + + main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + main_stream.set_close_callback(lambda conn : __handle_pend(None)) + main_stream.connect(sock_addr,__conn_cb) + + return imc.async.switch_top() + + def _add_pend_filestream(self,filekey,callback): + self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) + + def _handle_mainconn(self,main_stream,addr,info): + linkclass = info['linkclass'] + linkid = info['linkid'] + sock_ip = info['sock_ip'] + sock_port = info['sock_port'] + + conn = Proxy.instance.get_conn(linkid) + if conn != None: + return + + if (linkid not in self._pend_mainconn_linkidmap) or self._linkid > linkid: + conn = SocketConnection(linkclass,linkid,main_stream,(sock_ip,sock_port),self._add_pend_filestream) + Proxy.instance.add_conn(conn) + + netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8')) + + if linkid in self._pend_mainconn_linkidmap: + retids = self._pend_mainconn_linkidmap.pop(linkid) + for retid in retids: + imc.async.ret(retid,conn) + + else: + netio.send_pack(main_stream,bytes(json.dumps(False),'utf-8')) + + def _handle_fileconn(self,file_stream,addr,info): + try: + self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream) + + except KeyError: + pass + + @imc.async.caller + def _test_call(self,param): + dst = '/backend/' + '3' + '/' + ret = imc_call_async(self._idendesc,dst,'test_dst',lambda result : print(result),'test',113) + print(ret) + + ret = imc_call(self._idendesc,'/center/1/','create_iden','client','1234',1221,TOJAuth.ROLETYPE_USER,{'uid':31}) + print(ret) + + return + + pend = [] + for i in range(0,32): + if str((i % 16) + 2) == self._linkid: + continue + + fileres = Proxy.instance.sendfile('/backend/' + str((i % 16) + 2) + '/','Fedora-18-x86_64-DVD.iso') + + dst = '/backend/' + str((i % 16) + 2) + '/' + ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey) + + pend.append(fileres) + + for p in pend: + print(self._linkid + ' ' + p.wait()) + + print(self._linkid) + + @imc.async.caller + def _test_dst(self,param,sdfsdf): + #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) + #return ret + ' Too' + + print(param) + print(sdfsdf) + print(TOJAuth.get_current_iden()) + + #Proxy.instance.rejectfile(param) + #print('recv ' + iden['linkid'] + ' > ' + self._linkid) + #fileres = Proxy.instance.recvfile(param,'data') + #print('recv ' + fileres.wait()) + + return 'ok' + + @imc.async.caller + def _test_dsta(self,iden,param): + return param + ' Too' + +class WebSocketConnHandler(tornado.websocket.WebSocketHandler): + def open(self): + pass + + def on_message(self,msg): + global backend_worker + + if hasattr(self,'backend_conn'): + self.backend_conn.recv_msg(msg) + + else: + try: + info = json.loads(msg) + self.backend_conn = backend_worker.add_client(info['client_linkid'],self) + + except Exception: + self.close() + + def on_close(self): + global backend_backend + + if hasattr(self,'backend_conn'): + self.backend_conn.close() + +def start_backend_worker(ws_port): + global backend_worker + + http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([ + ('/conn',WebSocketConnHandler) + ])) + http_serv.listen(ws_port) + + backend_worker = BackendWorker(('10.8.0.10',5730),ws_port) + backend_worker.start() + + tornado.ioloop.IOLoop.instance().start() + +if __name__ == '__main__': + worker_list = [] + + worker_list.append(Process(target = start_backend_worker,args = (81, ))) + worker_list.append(Process(target = start_backend_worker,args = (82, ))) + #worker_list.append(Process(target = start_backend_worker,args = (181, ))) + #worker_list.append(Process(target = start_backend_worker,args = (182, ))) + #worker_list.append(Process(target = start_backend_worker,args = (183, ))) + #worker_list.append(Process(target = start_backend_worker,args = (184, ))) + #worker_list.append(Process(target = start_backend_worker,args = (185, ))) + #worker_list.append(Process(target = start_backend_worker,args = (186, ))) + + for proc in worker_list: + proc.start() + + for proc in worker_list: + proc.join() + diff --git a/src/py/center_server.py b/src/py/center_server.py new file mode 100644 index 0000000..f54a57c --- /dev/null +++ b/src/py/center_server.py @@ -0,0 +1,235 @@ +#! /usr/bin/env python + +import random +import json +import uuid +import socket + +import tornado.ioloop +import tornado.tcpserver +import tornado.httpserver +import tornado.web + +import imc.async +from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call + +import netio +from netio import SocketStream,SocketConnection +from tojauth import TOJAuth + +class Worker: + def __init__(self,main_stream,linkclass,linkid,idendesc,worker_info,center_linkid): + self.main_stream = main_stream + self.linkclass = linkclass + self.linkid = linkid + self.idendesc = idendesc + self.sock_addr = (worker_info['sock_ip'],worker_info['sock_port']) + + netio.send_pack(self.main_stream,bytes(json.dumps({ + 'idendesc':self.idendesc, + 'center_linkid':center_linkid + }),'utf-8')) + + conn = SocketConnection(self.linkclass,self.linkid,self.main_stream,self.sock_addr) + conn.add_close_callback(lambda conn : self.close()) + Proxy.instance.add_conn(conn) + + def close(self): + pass + +class BackendWorker(Worker): + def __init__(self,main_stream,linkid,idendesc,worker_info,center_linkid): + global center_serv + + super().__init__(main_stream,'backend',linkid,idendesc,worker_info,center_linkid) + self.ws_addr = (worker_info['ws_ip'],worker_info['ws_port']) + + center_serv.add_backend_worker(self) + + def close(self): + global center_serv + + center_serv.del_backend_worker(self) + print('disconnect') + +class CenterServer(tornado.tcpserver.TCPServer): + def __init__(self): + super().__init__() + + self._ioloop = tornado.ioloop.IOLoop.instance() + self._linkid_usemap = {} + self._worker_linkidmap = {} + self._backend_clientmap = {} + self._backend_workerlist = [] + + pubkey = open('pubkey.pem','r').read() + privkey = open('privkey.pem','r').read() + TOJAuth(pubkey,privkey) + + self._linkid = self._create_linkid() + + self._idendesc = TOJAuth.instance.create_iden('center',self._linkid,1,TOJAuth.ROLETYPE_TOJ) + Proxy('center',self._linkid,TOJAuth.instance,self._idendesc) + + imc_register_call('','lookup_linkid',self._lookup_linkid) + imc_register_call('','create_iden',self._create_iden) + imc_register_call('','add_client',self._add_client) + imc_register_call('','del_client',self._del_client) + + imc_register_call('','test_dst',self._test_dst) + imc_register_call('','test_dstb',self._test_dstb) + + def handle_stream(self,stream,addr): + def _recv_worker_info(data): + worker_info = json.loads(data.decode('utf-8')) + + linkclass = worker_info['linkclass'] + if linkclass == 'backend': + linkid = self._create_linkid() + idendesc = TOJAuth.instance.create_iden('backend',linkid,1,TOJAuth.ROLETYPE_TOJ) + BackendWorker(main_stream,linkid,idendesc,worker_info,self._linkid) + + fd = stream.fileno() + self._ioloop.remove_handler(fd) + main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) + + netio.recv_pack(main_stream,_recv_worker_info) + + def add_backend_worker(self,backend): + backend_linkid = backend.linkid + + self._worker_linkidmap[backend_linkid] = backend + self._backend_clientmap[backend_linkid] = {} + self._backend_workerlist.append(backend) + + def del_backend_worker(self,backend): + backend_linkid = backend.linkid + + del self._worker_linkidmap[backend_linkid] + del self._backend_clientmap[backend_linkid] + self._backend_workerlist.remove(backend) + + def dispatch_client(self): + size = len(self._backend_workerlist) + if size == 0: + return None + + linkid = self._create_linkid() + idendesc = TOJAuth.instance.create_iden('client',linkid,2,TOJAuth.ROLETYPE_GUEST) + backend = self._backend_workerlist[random.randrange(size)] + ws_ip,ws_port = backend.ws_addr + + return (idendesc,backend.linkid,ws_ip,ws_port) + + def _create_linkid(self): + linkid = uuid.uuid1() + while linkid in self._linkid_usemap: + linkid = uuid.uuid1() + + linkid = str(linkid) + self._linkid_usemap[linkid] = True + + linkid = str(len(self._linkid_usemap)) + + return linkid + + @imc.async.caller + def _lookup_linkid(self,linkid): + try: + worker = self._worker_linkidmap[linkid] + + #a = int(iden['linkid']) + #b = int(linkid) + + #if b > a: + # worker = self._worker_linkidmap[str(a + 1)] + + #else: + # worker = self._worker_linkidmap[str(a - 1)] + + if TOJAuth.get_current_iden()['linkclass'] != 'client': + sock_ip,sock_port = worker.sock_addr + return { + 'worker_linkclass':worker.linkclass, + 'worker_linkid':worker.linkid, + 'sock_ip':sock_ip, + 'sock_port':sock_port + } + + except KeyError: + return None + + @imc.async.caller + @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) + def _create_iden(self,linkclass,linkid,idenid,roletype,payload): + return TOJAuth.instance.create_iden(linkclass,linkid,idenid,roletype,payload) + + @imc.async.caller + def _add_client(self,param): + backend_linkid = iden['linkid'] + client_linkid = param['client_linkid'] + + self._backend_clientmap[backend_linkid][client_linkid] = True + conn = Proxy.instance.get_conn(backend_linkid) + Proxy.instance.link_conn(client_linkid,conn) + + print(client_linkid); + + @imc.async.caller + def _del_client(self,param): + backend_linkid = iden['linkid'] + client_linkid = param + + del self._backend_clientmap[backend_linkid][client_linkid] + conn = Proxy.instance.get_conn(client_linkid) + Proxy.instance.unlink_conn(client_linkid) + + + + + @imc.async.caller + def _test_dst(self,param): + linkidlist = [] + clientmaps = self._backend_clientmap.values() + for clientmap in clientmaps: + linkids = clientmap.keys() + for linkid in linkids: + linkidlist.append(linkid) + + return linkidlist + + @imc.async.caller + def _test_dstb(self,param): + return param + ' World' + +class WebConnHandler(tornado.web.RequestHandler): + def set_default_headers(self): + self.set_header('Access-Control-Allow-Origin','*') + + def post(self): + global center_serv + + data = center_serv.dispatch_client() + if data == None: + self.write('Eno_backend') + else: + client_idendesc,backend_linkid,ip,port = data + self.write(json.dumps({ + 'client_idendesc':client_idendesc, + 'backend_linkid':backend_linkid, + 'ip':ip, + 'port':port + })) + +if __name__ == '__main__': + global center_serv + + center_serv = CenterServer() + center_serv.listen(5730) + + http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([ + ('/conn',WebConnHandler), + ])) + http_serv.listen(83) + + tornado.ioloop.IOLoop.instance().start() diff --git a/src/py/imc/__init__.py b/src/py/imc/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/py/imc/__init__.py @@ -0,0 +1 @@ + diff --git a/src/py/imc/async.py b/src/py/imc/async.py new file mode 100644 index 0000000..c0df338 --- /dev/null +++ b/src/py/imc/async.py @@ -0,0 +1,121 @@ +import traceback +import uuid +import ssl + +import tornado.stack_context +from Crypto.Hash import SHA512 +from greenlet import greenlet + +from imc import auth + +gr_idmap = {} +ret_idmap = {} +gr_main = greenlet.getcurrent() + +def switch_top(): + global gr_main + + assert greenlet.getcurrent() != gr_main + + old_iden = auth.current_iden + old_contexts = tornado.stack_context._state.contexts + auth.current_iden = None + + result = gr_main.switch(None) + + tornado.stack_context._state.contexts = old_contexts + auth.current_iden = old_iden + + return result + +def caller(f): + def wrapper(*args,**kwargs): + global gr_main + global gr_idmap + global ret_idmap + + def _call(*args,**kwargs): + ret = f(*args,**kwargs) + retids = gr_idmap[grid] + for retid in retids: + del ret_idmap[retid] + + del gr_idmap[grid] + + return (True,ret) + + try: + gr = greenlet(_call) + grid = id(gr) + gr_idmap[grid] = set() + old_iden = auth.current_iden + old_contexts = tornado.stack_context._state.contexts + + result = gr.switch(*args,**kwargs) + + tornado.stack_context._state.contexts = old_contexts + auth.current_iden = old_iden + + if result == None: + return (False,None) + + if gr.dead == False: + gr.parent = gr_main + + return result + + except TypeError as err: + traceback.print_stack() + print(err) + return (False,'Eparameter') + + except Exception as err: + traceback.print_stack() + print(err) + return (False,'Einternal') + + return wrapper + +def get_retid(): + global gr_idmap + global ret_idmap + + gr = greenlet.getcurrent() + grid = id(gr) + retid = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() + + gr_idmap[grid].add(retid) + ret_idmap[retid] = gr + + return retid + +def ret(retid,value = None,err = None): + global gr_main + global gr_idmap + global ret_idmap + + assert greenlet.getcurrent() == gr_main + + try: + gr = ret_idmap.pop(retid) + gr_idmap[id(gr)].remove(retid) + + except KeyError: + return + + try: + old_iden = auth.current_iden + old_contexts = tornado.stack_context._state.contexts + + if err == None: + gr.switch(value) + + else: + gr.throw(err) + + tornado.stack_context._state.contexts = old_contexts + auth.current_iden = old_iden + + except TypeError as err: + traceback.print_stack() + print(err) diff --git a/src/py/imc/auth.py b/src/py/imc/auth.py new file mode 100644 index 0000000..03c15dc --- /dev/null +++ b/src/py/imc/auth.py @@ -0,0 +1,79 @@ +import time +import json +import binascii +import contextlib + +import tornado.stack_context +from Crypto.PublicKey import RSA +from Crypto.Hash import SHA512 +from Crypto.Signature import PKCS1_v1_5 + +current_iden = None + +class Auth: + def __init__(self): + global current_iden + + self._cache_hashmap = {} + current_iden = None + + @staticmethod + def get_current_iden(): + global current_iden + + return current_iden + + @staticmethod + def change_current_iden(iden): + @contextlib.contextmanager + def context(): + global current_iden + + old_iden = current_iden + current_iden = iden + + try: + yield + + finally: + current_iden = old_iden + + return tornado.stack_context.StackContext(context) + + def set_signkey(self,key): + self._signer = PKCS1_v1_5.new(RSA.importKey(key)) + + def set_verifykey(self,key): + self._verifier = PKCS1_v1_5.new(RSA.importKey(key)) + + def sign_iden(self,iden): + data = json.dumps(iden) + sign = binascii.hexlify(self._sign(bytes(data,'utf-8'))).decode('utf-8') + + return json.dumps([data,sign]) + + def get_iden(self,idendesc): + pair = json.loads(idendesc) + data = pair[0] + sign = pair[1] + + if self._verify(bytes(data,'utf-8'),binascii.unhexlify(sign)): + return json.loads(data) + + else: + return None + + def _sign(self,data): + return self._signer.sign(SHA512.new(data)) + + def _verify(self,data,sig): + h = SHA512.new(data) + if h in self._cache_hashmap: + return True + + if self._verifier.verify(h,sig) == True: + self._cache_hashmap[h] = True + + return True + else: + return False diff --git a/src/py/imc/nonblock.py b/src/py/imc/nonblock.py new file mode 100644 index 0000000..f68673d --- /dev/null +++ b/src/py/imc/nonblock.py @@ -0,0 +1,61 @@ +import types +from greenlet import greenlet + +gr_waitmap = {} + +gr_main = greenlet.getcurrent() + +def current(): + return greenlet.getcurrent() + +def switchtop(): + global gr_main + + return gr_main.switch(None) + +def callee(f): + def wrapper(*args,**kwargs): + kwargs['_grid'] = str(id(greenlet.getcurrent())) + return f(*args,**kwargs) + + return wrapper + +def caller(f): + def wrapper(*args,**kwargs): + global gr_waitmap + + try: + gr = greenlet(lambda *args,**kwargs : (str(id(greenlet.getcurrent())),f(*args,**kwargs))) + grid = str(id(gr)) + gr_waitmap[grid] = gr + + result = gr.switch(*args,**kwargs) + if result == None: + return (False,None) + + ret_grid,ret = result + del gr_waitmap[grid] + + if greenlet.getcurrent() == gr_main: + return (False,None) + + else: + while ret_grid != grid: + ret_grid,ret = gr_main.switch() + + return (True,ret) + + except Exception: + return (False,'Einternal') + + return wrapper + +def retcall(grid,value): + global gr_waitmap + + try: + gr = gr_waitmap[grid] + gr.switch(value) + + except Exception: + pass diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py new file mode 100755 index 0000000..00476f0 --- /dev/null +++ b/src/py/imc/proxy.py @@ -0,0 +1,559 @@ +import json +import uuid +import os +import datetime +import ssl + +from Crypto.Hash import SHA512 +import tornado.ioloop +import tornado.stack_context + +from imc import async +from imc.auth import Auth + +class Connection: + def __init__(self,linkclass,linkid): + self.linkclass = linkclass + self.linkid = linkid + self.link_linkidmap = {} + self._close_callback = [] + self._closed = False + + def send_msg(self,data): + pass + + def send_file(self,filekey,filepath,callback): + pass + + def recv_file(self,filekey,filesize,filepath,callback): + pass + + def send_filedata(self,filekey,filesize,callback): + pass + + def recv_filedata(self,filekey,filesize,send_fn): + pass + + def start_recv(self,recv_callback): + pass + + def abort_file(self,filekey): + pass + + def add_close_callback(self,callback): + self._close_callback.append(tornado.stack_context.wrap(callback)) + + def close(self): + self._closed = True + + for callback in self._close_callback: + callback(self) + + def closed(self): + return self._closed + +class FileResult(): + def __init__(self,filekey): + self.filekey = filekey + self._retid = None + self._result = None + + def ret_result(self,res): + if self._result != None: + return + + self._result = res + if self._retid != None: + async.ret(self._retid) + + def wait(self): + if self._result == None: + self._retid = async.get_retid() + async.switch_top() + + return self._result + +class Proxy: + def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None): + self.MSGTYPE_CALL = 'call' + self.MSGTYPE_RET = 'ret' + self.MSGTYPE_SENDFILE = 'sendfile' + self.MSGTYPE_ABORTFILE = 'abortfile' + + self._ioloop = tornado.ioloop.IOLoop.instance() + self._linkclass = linkclass + self._linkid = linkid + self._auth = auth_instance + self._idendesc = idendesc + + if conn_linkid_fn == None: + self._conn_linkid_fn = lambda : None + else: + self._conn_linkid_fn = conn_linkid_fn + + self._conn_linkidmap = {} + self._conn_retidmap = {self._linkid:{}} + self._conn_filekeymap = {self._linkid:{}} + self._call_pathmap = {} + + self._info_filekeymap = {} + + Proxy.instance = self + + self.register_call('imc/','pend_recvfile',self._pend_recvfile) + self.register_call('imc/','reject_sendfile',self._reject_sendfile) + + def add_conn(self,conn): + assert conn.linkid not in self._conn_linkidmap + + self._conn_linkidmap[conn.linkid] = conn + self._conn_retidmap[conn.linkid] = {} + self._conn_filekeymap[conn.linkid] = {} + + conn.add_close_callback(self._conn_close_cb) + conn.start_recv(self._recv_dispatch) + + def link_conn(self,linkid,conn): + assert conn.linkid in self._conn_linkidmap + + conn.link_linkidmap[linkid] = True + self._conn_linkidmap[linkid] = conn + + def unlink_conn(self,linkid): + assert linkid in self._conn_linkidmap + + conn = self._conn_linkidmap.pop(linkid) + del conn.link_linkidmap[linkid] + + def del_conn(self,conn): + waits = list(self._conn_retidmap[conn.linkid].values()) + for wait in waits: + wait['callback']((False,'Eclose')) + + waits = list(self._conn_filekeymap[conn.linkid].values()) + for wait in waits: + wait['callback']('Eclose') + + linkids = list(conn.link_linkidmap.keys()) + for linkid in linkids: + self.unlink_conn(linkid) + + del self._conn_linkidmap[conn.linkid] + del self._conn_retidmap[conn.linkid] + del self._conn_filekeymap[conn.linkid] + + def get_conn(self,linkid): + try: + return self._conn_linkidmap[linkid] + + except KeyError: + return None + + def register_call(self,path,func_name,func): + self._call_pathmap[''.join([path,func_name])] = func + + def call(self,idendesc,dst,func_name,timeout,*args): + return self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args)) + + def call_async(self,idendesc,dst,func_name,timeout,callback,*args): + @async.caller + def _call(): + result = self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args)) + if callback != None: + callback(result) + + self._ioloop.add_callback(tornado.stack_context.wrap(_call)) + + def sendfile(self,dst_link,filepath): + filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() + filesize = os.stat(filepath).st_size + + fileresult = FileResult(filekey) + + self._info_filekeymap[filekey] = { + 'filesize':filesize, + 'filepath':filepath, + 'fileresult':fileresult, + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) + } + + stat,ret = self.call(self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize},655360) + if stat == False: + raise ConnectionError(ret) + + return fileresult + + def recvfile(self,filekey,filepath): + def _callback(err = None): + try: + self._del_wait_filekey(in_conn.linkid,filekey) + + except KeyError: + return + + if err != None: + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) + + self._ioloop.add_callback(self._ret_sendfile,filekey,err) + + try: + info = self._info_filekeymap[filekey] + + except KeyError: + return + + src_linkid = info['src_linkid'] + filesize = info['filesize'] + + in_conn = self._request_conn(src_linkid) + self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback) + + in_conn.recv_file(filekey,filesize,filepath,_callback) + self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + + return info['fileresult'] + + def rejectfile(self,filekey): + try: + info = self._info_filekeymap.pop(filekey) + + except KeyError: + return + + dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) + self.call(self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey},65536) + + def _route_call(self,in_conn,caller_retid,idendesc,dst,func_name,timeout,param): + def __add_wait_caller(conn_linkid): + callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result)) + self._conn_retidmap[conn_linkid][caller_retid] = { + 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))), + 'callback':callback + } + + def __del_wait_caller(conn_linkid): + wait = self._conn_retidmap[conn_linkid].pop(caller_retid) + self._ioloop.remove_timeout(wait['timer']) + + def __ret(result): + if caller_linkid == self._linkid: + return result + + else: + conn = self._request_conn(caller_linkid) + if conn != None: + self._send_msg_ret(conn,caller_linkid,caller_retid,result) + + if in_conn != None: + in_linkclass = in_conn.linkclass + in_linkid = in_conn.linkid + + else: + in_linkclass = self._linkclass + in_linkid = self._linkid + + iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc) + if iden == None: + return __ret(False,'Eilliden') + + try: + dst_part = dst.split('/',3) + dst_linkid = dst_part[2] + dst_path = dst_part[3] + + except Exception: + return __ret(False,'Enoexist') + + caller_linkid = iden['linkid'] + + if dst_linkid == self._linkid: + __add_wait_caller(self._linkid) + + try: + with Auth.change_current_iden(iden): + result = self._call_pathmap[''.join([dst_path,func_name])](*param) + + except KeyError: + result = (False,'Enoexist') + + __del_wait_caller(self._linkid) + + return __ret(result) + + else: + conn = self._request_conn(dst_linkid) + if conn == None: + return __ret((False,'Enoexist')) + + else: + if caller_linkid == self._linkid: + __add_wait_caller(conn.linkid) + self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + + result = async.switch_top() + + __del_wait_caller(conn.linkid) + + return __ret(result) + + else: + self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + + return + + def _ret_call(self,caller_linkid,caller_retid,result): + @async.caller + def __ret_remote(): + conn = self._request_conn(caller_linkid) + if conn != None: + self._send_msg_ret(conn,caller_linkid,caller_retid,result) + + if caller_linkid == self._linkid: + async.ret(caller_retid,result) + + else: + __ret_remote() + + def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): + def __send_cb(err = None): + try: + self._del_wait_filekey(out_conn.linkid,filekey) + + except KeyError: + return + + if err != None: + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) + + self._ioloop.add_callback(self._ret_sendfile,filekey,err) + + def __bridge_cb(err = None): + try: + self._del_wait_filekey(in_conn,filekey) + + if err != None: + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) + + except KeyError: + pass + + try: + self._del_wait_filekey(out_conn,filekey) + + if err != None: + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) + + except KeyError: + pass + + if src_linkid == self._linkid: + try: + info = self._info_filekeymap[filekey] + if info['filesize'] != filesize: + self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize') + + except KeyError: + self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') + return + + self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb) + out_conn.send_file(filekey,info['filepath'],__send_cb) + + else: + in_conn = self._request_conn(src_linkid) + self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb) + self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb) + + send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) + in_conn.recv_filedata(filekey,filesize,send_fn) + + self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + + def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback): + callback = tornado.stack_context.wrap(callback) + self._conn_filekeymap[conn_linkid][filekey] = { + 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = min(filesize,1000)),lambda : callback('Etimeout')), + 'callback':callback + } + + def _del_wait_filekey(self,conn_linkid,filekey): + wait = self._conn_filekeymap[conn_linkid].pop(filekey) + self._ioloop.remove_timeout(wait['timer']) + + def _ret_sendfile(self,filekey,err = None): + try: + info = self._info_filekeymap.pop(filekey) + + except KeyError: + return + + self._ioloop.remove_timeout(info['timer']) + + fileresult = info['fileresult'] + if err == None: + fileresult.ret_result('Success') + + else: + fileresult.ret_result(err) + + def _request_conn(self,linkid): + try: + return self._conn_linkidmap[linkid] + + except KeyError: + conn = self._conn_linkid_fn(linkid) + + if conn != None and conn.linkid != linkid: + self.link_conn(linkid,conn) + + return conn + + def _conn_close_cb(self,conn): + self.del_conn(conn) + print('connection close') + + def _recv_dispatch(self,conn,data): + try: + msg = json.loads(data.decode('utf-8')) + + except: + return + + msg_type = msg['type'] + + if msg_type == self.MSGTYPE_CALL: + self._recv_msg_call(conn,msg) + + elif msg_type == self.MSGTYPE_RET: + self._recv_msg_ret(conn,msg) + + elif msg_type == self.MSGTYPE_SENDFILE: + self._recv_msg_sendfile(conn,msg) + + elif msg_type == self.MSGTYPE_ABORTFILE: + self._recv_msg_abortfile(conn,msg) + + def _send_msg_call(self,conn,caller_retid,idendesc,dst,func_name,timeout,param): + msg = { + 'type':self.MSGTYPE_CALL, + 'caller_retid':caller_retid, + 'idendesc':idendesc, + 'dst':dst, + 'func_name':func_name, + 'timeout':timeout, + 'param':param + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_call(self,conn,msg): + @async.caller + def __call(): + self._route_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + + caller_retid = msg['caller_retid'] + idendesc = msg['idendesc'] + dst = msg['dst'] + func_name = msg['func_name'] + timeout = msg['timeout'] + param = msg['param'] + + __call() + + def _send_msg_ret(self,conn,caller_linkid,caller_retid,result): + stat,data = result + msg = { + 'type':self.MSGTYPE_RET, + 'caller_linkid':caller_linkid, + 'caller_retid':caller_retid, + 'result':{'stat':stat,'data':data} + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_ret(self,conn,msg): + caller_linkid = msg['caller_linkid'] + caller_retid = msg['caller_retid'] + data = msg['result'] + result = (data['stat'],data['data']) + + self._ret_call(caller_linkid,caller_retid,result) + + def _send_msg_sendfile(self,conn,src_linkid,filekey,filesize): + msg = { + 'type':self.MSGTYPE_SENDFILE, + 'src_linkid':src_linkid, + 'filekey':filekey, + 'filesize':filesize + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_sendfile(self,conn,msg): + @async.caller + def __call(): + self._route_sendfile(conn,src_linkid,filekey,filesize) + + src_linkid = msg['src_linkid'] + filekey = msg['filekey'] + filesize = msg['filesize'] + + __call() + + def _send_msg_abortfile(self,conn,filekey,err): + msg = { + 'type':self.MSGTYPE_ABORTFILE, + 'filekey':filekey, + 'error':err + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_abortfile(self,conn,msg): + @async.caller + def __call(): + try: + self._conn_filekeymap[conn.linkid][filekey]['callback'](err) + + except KeyError: + pass + + filekey = msg['filekey'] + err = msg['error'] + + __call() + + @async.caller + def _pend_recvfile(self,iden,param): + filekey = param['filekey'] + filesize = param['filesize'] + + self._info_filekeymap[filekey] = { + 'src_linkclass':iden['linkclass'], + 'src_linkid':iden['linkid'], + 'filesize':filesize, + 'fileresult':FileResult(filekey), + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) + } + + @async.caller + def _reject_sendfile(self,iden,param): + filekey = param['filekey'] + self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject') + +def imc_call(idendesc,dst,func_name,*args): + return Proxy.instance.call(idendesc,dst,func_name,65536,*args) + +def imc_call_async(idendesc,dst,func_name,callback,*args): + Proxy.instance.call_async(idendesc,dst,func_name,65536,callback,*args) + +def imc_register_call(path,func_name,func): + Proxy.instance.register_call(path,func_name,func) diff --git a/src/py/netio.py b/src/py/netio.py new file mode 100644 index 0000000..b71ab0b --- /dev/null +++ b/src/py/netio.py @@ -0,0 +1,511 @@ +import os +import traceback +import json +import struct +import socket +from collections import deque + +import tornado.ioloop +import tornado.stack_context + +import imc.async +from imc.proxy import Connection + +def send_pack(stream,data): + stream.write(struct.pack('l',len(data)) + data) + +def recv_pack(stream,callback): + def _recv_size(data): + size, = struct.unpack('l',data) + stream.read_bytes(size,lambda data : callback(data)) + + stream.read_bytes(8,_recv_size) + +class SocketStream: + def __init__(self,sock): + self.DATA_BUF = 0 + self.DATA_NOBUF = 1 + self.DATA_FILE = 2 + + self._ioloop = tornado.ioloop.IOLoop.current() + self._sock = sock + + self._conning = False + self._closed = False + self._conn_callback = None + self._close_callback = None + + self._read_queue = deque() + self._write_queue = deque() + self._stat = tornado.ioloop.IOLoop.ERROR + + self._sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1) + self._sock.setblocking(False) + self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR) + + def connect(self,addr,callback): + if self._closed == True: + raise ConnectionError + + try: + self._conn_callback = tornado.stack_context.wrap(callback) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + self._conning = True + self._sock.connect(addr) + + except BlockingIOError: + pass + + def read_bytes(self,size,callback = None,nonbuf = False): + if self._closed == True: + raise ConnectionError + + if nonbuf == False: + self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)]) + + else: + self._read_queue.append([self.DATA_NOBUF,size,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.READ + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def write(self,buf,callback = None): + if self._closed == True: + raise ConnectionError + + self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def sendfile(self,fd,callback = None): + if self._closed == True: + raise ConnectionError + + size = os.fstat(fd).st_size + + self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.WRITE + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def recvfile(self,fd,size,callback = None): + if self._closed == True: + raise ConnectionError + + self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) + + self._stat |= tornado.ioloop.IOLoop.READ + self._ioloop.update_handler(self._sock.fileno(),self._stat) + + def set_close_callback(self,callback): + if callback == None: + self._close_callback = None + + else: + self._close_callback = tornado.stack_context.wrap(callback) + + def close(self): + if self._closed == True: + return + + self._closed = True + self._ioloop.remove_handler(self._sock.fileno()) + self._sock.close() + + if self._close_callback != None: + self._close_callback(self) + + def _handle_event(self,fd,evt): + if evt & tornado.ioloop.IOLoop.ERROR: + print(os.strerror(self._sock.getsockopt(socket.SOL_SOCKET,socket.SO_ERROR))) + self.close() + return + + if evt & tornado.ioloop.IOLoop.READ: + while len(self._read_queue) > 0: + iocb = self._read_queue[0] + datatype = iocb[0] + + if datatype == self.DATA_BUF: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(size) + if len(buf) == 0: + self.close() + return + + iocb[2].extend(buf) + size -= len(buf) + + if size == 0: + if iocb[3] != None: + iocb[3](iocb[2]) + + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + except Exception: + self.close() + return + + elif datatype == self.DATA_NOBUF: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(size) + if len(buf) == 0: + self.close() + return + + iocb[2](buf) + size -= len(buf) + + if size == 0: + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + except Exception: + self.close() + return + + elif datatype == self.DATA_FILE: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(min(size,65536)) + if len(buf) == 0: + self.close() + return + + os.write(iocb[2],buf) + size -= len(buf) + + if size == 0: + if iocb[3] != None: + iocb[3]() + + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + except Exception: + self.close() + return + + if evt & tornado.ioloop.IOLoop.WRITE: + if self._conning == True: + self._conning = False + + if self._conn_callback != None: + self._conn_callback() + + while len(self._write_queue) > 0: + iocb = self._write_queue[0] + datatype = iocb[0] + + if datatype == self.DATA_BUF: + off = iocb[1] + buf = iocb[2] + + try: + while True: + ret = self._sock.send(buf[off:]) + if ret == 0: + self.close() + return + + off += ret + + if off == len(buf): + if iocb[3] != None: + iocb[3]() + + self._write_queue.popleft() + break + + except BlockingIOError: + iocb[1] = off + break + + except Exception: + self.close() + return + + elif datatype == self.DATA_FILE: + size = iocb[1] + filefd = iocb[2] + sockfd = self._sock.fileno() + + try: + while True: + ret = os.sendfile(sockfd,filefd,None,min(size,65536)) + if ret == 0: + self.close() + return + + size -= ret + + if size == 0: + if iocb[3] != None: + iocb[3]() + + self._write_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + + except Exception: + self.close() + return + + if self._closed == True: + return + + stat = tornado.ioloop.IOLoop.ERROR + if len(self._read_queue) > 0: + stat |= tornado.ioloop.IOLoop.READ + + if len(self._write_queue) > 0: + stat |= tornado.ioloop.IOLoop.WRITE + + if stat != self._stat: + self._stat = stat + self._ioloop.update_handler(fd,stat) + +class SocketConnection(Connection): + def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None): + super().__init__(linkclass,linkid) + + self._ioloop = tornado.ioloop.IOLoop.current() + self._sendfile_filekeymap = {} + + self.main_stream = main_stream + self.main_stream.set_close_callback(lambda conn : self.close()) + self.file_addr = file_addr + self.add_pend_filestream = add_pend_filestream_fn + + def send_msg(self,data): + if self._closed == True: + raise ConnectionError + + self.main_stream.write(struct.pack('l',len(data)) + data) + + def send_file(self,filekey,filepath,callback): + def _conn_cb(): + self._add_wait_filekey(filekey,_callback) + + send_pack(file_stream,bytes(json.dumps({ + 'conntype':'file', + 'filekey':filekey + }),'utf-8')) + + file_stream.sendfile(fd,_callback) + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) + + callback(err) + + if self._closed == True: + raise ConnectionError + + fd = os.open(filepath,os.O_RDONLY) + filesize = os.fstat(fd).st_size + + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) + + def recv_file(self,filekey,filesize,filepath,callback): + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) + + file_stream.recvfile(fd,filesize,_callback) + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) + + if err != None: + try: + os.remove(filepath) + + except FileNotFoundError: + pass + + callback(err) + + if self._closed == True: + raise ConnectionError + + file_stream = None + + self.add_pend_filestream(filekey,_conn_cb) + fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) + + def send_filedata(self,filekey,filesize,callback): + def _conn_cb(): + self._add_wait_filekey(filekey,_callback) + + send_pack(file_stream,bytes(json.dumps({ + 'conntype':'file', + 'filekey':filekey + }),'utf-8')) + + imc.async.ret(retid) + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() + + callback(err) + + def _send_cb(data): + def __done_cb(): + nonlocal filesize + + filesize -= len(data) + if filesize == 0: + _callback() + + file_stream.write(data,__done_cb) + + if self._closed == True: + raise ConnectionError + + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + + retid = imc.async.get_retid() + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) + imc.async.switch_top() + + return _send_cb + + def recv_filedata(self,filekey,filesize,send_fn): + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) + + file_stream.read_bytes(filesize,send_fn,nonbuf = True) + + def _callback(err = None): + file_stream.close() + + if self._closed == True: + raise ConnectionError + + file_stream = None + + self.add_pend_filestream(filekey,_conn_cb) + + def abort_file(self,filekey): + try: + self._sendfile_filekeymap[filekey]('Eabort') + + except KeyError: + pass + + def start_recv(self,recv_callback): + def _recv_size(data): + size, = struct.unpack('l',data) + self.main_stream.read_bytes(size,_recv_data) + self.main_stream.read_bytes(8,_recv_size) + + def _recv_data(data): + self._recv_callback(self,data) + + self._recv_callback = tornado.stack_context.wrap(recv_callback) + self.main_stream.read_bytes(8,_recv_size) + + def close(self): + if self._closed == True: + return + + traceback.print_stack() + + self._closed = True + self.main_stream.close() + + callbacks = list(self._sendfile_filekeymap.values()) + for callback in callbacks: + callback('Eclose') + + super().close() + + def _add_wait_filekey(self,filekey,fail_cb): + self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + + def _del_wait_filekey(self,filekey): + del self._sendfile_filekeymap[filekey] + +class WebSocketConnection(Connection): + def __init__(self,linkclass,linkid,handler): + super().__init__(linkclass,linkid) + + self._ioloop = tornado.ioloop.IOLoop.current() + self.handler = handler + + def send_msg(self,data): + if self._closed == True: + raise ConnectionError + + self.handler.write_message(data,True) + + def recv_msg(self,data): + if self._closed == True: + raise ConnectionError + + self._recv_callback(self,data) + + def start_recv(self,recv_callback): + self._recv_callback = tornado.stack_context.wrap(recv_callback) diff --git a/src/py/tojauth.py b/src/py/tojauth.py new file mode 100644 index 0000000..701095b --- /dev/null +++ b/src/py/tojauth.py @@ -0,0 +1,179 @@ +from imc.auth import Auth +import config +from asyncdb import AsyncDB + +class TOJAuth(Auth): + ACCESS_READ = 0x1 + ACCESS_WRITE = 0x2 + ACCESS_CREATE = 0x4 + ACCESS_DELETE = 0x8 + ACCESS_SETPER = 0x10 + ACCESS_EXECUTE = 0x20 + + ROLETYPE_USER = 1 + ROLETYPE_3RD = 2 + ROLETYPE_MOD = 3 + ROLETYPE_TOJ = 4 + ROLETYPE_GROUP = 5 + ROLETYPE_GUEST = 6 + + auth_accessid = 1 + + def __init__(self, pubkey, privkey = None): + super().__init__() + + self.set_verifykey(pubkey) + if privkey != None: + self.set_signkey(privkey) + + TOJAuth.instance = self + TOJAuth.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER, + config.CORE_DBPASSWORD) + + def create_iden(self, linkclass, linkid, idenid, roletype, payload = {}): + iden = payload + iden.update({ + 'linkclass' : linkclass, + 'linkid' : linkid, + 'idenid' : idenid, + 'roletype' : roletype + }) + + return self.sign_iden(iden) + + def get_iden(self, conn_linkclass, conn_linkid, idendesc): + iden = super().get_iden(idendesc) + if iden == None: + return None + + if conn_linkclass == 'client' and conn_linkid != iden['linkid']: + return None + + return iden + + @staticmethod + def check_access(accessid, access_mask): + def wrapper(f): + def wrapfunc(*args): + idenid = TOJAuth.get_current_iden()['idenid'] + ok = False + + cur = TOJAuth.instance.db.cursor() + + if not ok: + sqlstr = ('SELECT "owner_idenid" FROM "ACCESS" WHERE ' + '"accessid"=%s;') + sqlarr = (accessid, ) + cur.execute(sqlstr, sqlarr) + for data in cur: + owner_idenid = data[0] + if owner_idenid == idenid: + ok = True + + if not ok: + sqlstr = ('SELECT "ACCESS_ROLE"."permission" FROM "ACCESS_ROLE"' + ' INNER JOIN "IDEN_ROLE" ON "ACCESS_ROLE"."roleid" = ' + '"IDEN_ROLE"."roleid" WHERE "ACCESS_ROLE"."accessid"=%s' + ' AND "IDEN_ROLE"."idenid"=%s;') + sqlarr = (accessid, idenid) + cur.execute(sqlstr, sqlarr) + + for data in cur: + permission = data[0] + if (permission & access_mask) == access_mask: + ok = True + break + + if ok: + return f(*args); + else: + raise Exception('TOJAuth.check_access() : PERMISSION DENIED') + + return wrapfunc + + return wrapper + + def create_access(self, owner_idenid): + self.check_access( + self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('INSERT INTO "ACCESS" ("owner_idenid") VALUES (%s) ' + 'RETURNING "accessid";') + sqlarr = (owner_idenid, ) + cur.execute(sqlstr, sqlarr) + + for data in cur: + accessid = data[0] + return accessid + + def set_access_list(self, accessid, roleid, permission): + self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0) + + cur = self.db.cursor() + table = 'ACCESS_ROLE' + cond = { + 'accessid' : accessid, + 'roleid' : roleid + } + value = { + 'permission' : permission + } + cur.upsert(table, cond, value) + + def del_access_list(self, accessid, roleid): + self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('DELETE FROM "ACCESS_ROLE" WHERE "accessid"=%s ' + 'AND "roleid"=%s;') + sqlarr = (accessid, roleid) + cur.execute(sqlstr, sqlarr) + + def create_role(self, rolename, roletype): + self.check_access( + self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('INSERT INTO "ROLE" ("rolename", "roletype") VALUES (%s, %s)' + ' RETURNING "roleid";') + sqlarr = (rolename, roletype) + cur.execute(sqlstr, sqlarr) + for data in cur: + roleid = data[0] + + if(roleid != None): + self.set_role_relation(roleid, roleid) + + return roleid + + def set_role_relation(self, idenid, roleid): + self.check_access( + self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0) + + cur = self.db.cursor() + table = 'IDEN_ROLE' + cond = { + 'idenid' : idenid, + 'roleid' : roleid + } + cur.upsert(table, cond) + + def del_role_relation(self, idenid, roleid): + self.check_access( + self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('DELETE FROM "IDEN_ROLE" WHERE "idenid"=%s ' + 'AND "roleid"=%s;') + sqlarr = (idenid, roleid) + cur.execute(sqlstr, sqlarr) + + def set_owner(self, idenid, accessid): + self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('UPDATE "ACCESS" SET "owner_idenid"=%s WHERE "accessid"=%s;') + sqlarr = (idenid, accessid) + cur.execute(sqlstr, sqlarr) + diff --git a/src/py/user.py b/src/py/user.py new file mode 100755 index 0000000..45e195a --- /dev/null +++ b/src/py/user.py @@ -0,0 +1,336 @@ +import psycopg2 +from Crypto.Hash import SHA512 + +from tojauth import TOJAuth +from asyncdb import AsyncDB +import imc.proxy +import config + +class User: + auth_accessid = 2 + + USERNAME_LEN_MIN = 5 + USERNAME_LEN_MAX = 50 + PASSWORD_LEN_MIN = 5 + PASSWORD_LEN_MAX = 50 + NICKNAME_LEN_MIN = 1 + NICKNAME_LEN_MAX = 50 + EMAIL_LEN_MIN = 5 + EMAIL_LEN_MAX = 100 + AVATAR_LEN_MIN = 0 + AVATAR_LEN_MAX = 200 + ABOUTME_LEN_MIN = 0 + ABOUTME_LEN_MAX = 1000 + + def __init__(self, mod_iden): + User.instance = self + User.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER, + config.CORE_DBPASSWORD) + User.mod_iden = mod_iden + + @imc.async.caller + def register(self, username, password, nickname, email, avatar, aboutme): + if( + type(username) != str or + type(password) != str or + type(nickname) != str or + type(email) != str or + type(avatar) != str or + type(aboutme) != str + ): + return 'Eparameter' + + if len(username) < self.USERNAME_LEN_MIN: + return 'Eusername_too_short' + elif len(username) > self.USERNAME_LEN_MAX: + return 'Eusername_too_long' + elif len(password) < self.PASSWORD_LEN_MIN: + return 'Epassword_too_short' + elif len(password) > self.PASSWORD_LEN_MAX: + return 'Epassword_too_long' + elif len(nickname) < self.NICKNAME_LEN_MIN: + return 'Enickname_too_short' + elif len(nickname) > self.NICKNAME_LEN_MAX: + return 'Enickname_too_long' + elif len(email) < self.EMAIL_LEN_MIN: + return 'Eemail_too_short' + elif len(email) > self.EMAIL_LEN_MAX: + return 'Eemail_too_long' + elif len(avatar) < self.AVATAR_LEN_MIN: + return 'Eavatar_too_short' + elif len(avatar) > self.AVATAR_LEN_MAX: + return 'Eavatar_too_long' + elif len(aboutme) < self.ABOUTME_LEN_MIN: + return 'Eaboutme_too_short' + elif len(aboutme) > self.ABOUTME_LEN_MAX: + return 'Eaboutme_too_long' + + passhash = self._password_hash(password) + + with TOJAuth.change_current_iden(self.mod_iden): + try: + uid = self._create_user( + username, passhash, nickname, email, avatar, aboutme) + except psycopg2.IntegrityError: + return 'Eusername_already_exists' + + return {'uid' : uid} + + @TOJAuth.check_access(auth_accessid, TOJAuth.ACCESS_EXECUTE) + def _create_user(self, username, passhash, nickname, email, avatar, + aboutme): + roleid = TOJAuth.instance.create_role(username, TOJAuth.ROLETYPE_USER) + + cur = self.db.cursor() + sqlstr = ('INSERT INTO "USER" ("username", "passhash", "nickname", ' + '"email", "avatar", "aboutme", "idenid") ' + 'VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING "uid";') + sqlarr = (username, passhash, nickname, email, avatar, aboutme, roleid) + cur.execute(sqlstr, sqlarr) + + for data in cur: + uid = data[0] + return uid + + @imc.async.caller + def login(self, username, password): + if( + type(username) != str or + type(password) != str + ): + return 'Eparameter' + + uid = self.get_uid_by_username(username) + if uid == None: + return 'Eno_such_uid' + + passhash = self._password_hash(password) + + cur = self.db.cursor() + sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s ' + 'AND "passhash" = %s;') + sqlarr = (uid, passhash) + cur.execute(sqlstr, sqlarr) + + idenid = None + for data in cur: + idenid = data[0] + + if idenid == None: + return 'Ewrong_password' + + linkclass = TOJAuth.get_current_iden()['linkclass'] + linkid = TOJAuth.get_current_iden()['linkid'] + + with TOJAuth.change_current_iden(self.mod_iden): + idendesc = TOJAuth.instance.create_iden( + linkclass, linkid, idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid} + ) + + ret = { + 'idendesc' : idendesc, + 'uid' : uid, + 'hash' : self._uid_passhash_hash(uid, passhash) + } + + return ret + + @imc.async.caller + def cookie_login(self, uid, uphash): + if( + type(uid) != int or + type(uphash) != str + ): + return 'Eparameter' + + idenid = None + real_uphash = None + + cur = self.db.cursor() + sqlstr = ('SELECT "idenid", "passhash" FROM "USER" WHERE "uid" = %s;') + sqlarr = (uid, ) + cur.execute(sqlstr, sqlarr) + + for data in cur: + idenid = data[0] + real_uphash = self._uid_passhash_hash(uid, data[1]) + + if idenid == None: + return 'Eno_such_uid' + + if real_uphash != uphash: + return 'Ewrong_uphash' + + linkclass = TOJAuth.get_current_iden()['linkclass'] + linkid = TOJAuth.get_current_iden()['linkid'] + + with TOJAuth.change_current_iden(self.mod_iden): + idendesc = TOJAuth.instance.create_iden( + linkclass, linkid, idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid} + ) + + ret = { + 'idendesc' : idendesc, + 'uid' : uid, + 'hash' : uphash + } + + return ret + + @imc.async.caller + def get_user_info(self, uid): + if( + type(uid) != int + ): + return 'Eparameter' + + ret = self._get_user_info_by_uid(uid) + if ret == None: + return 'Eno_such_uid' + + return ret + + @imc.async.caller + def set_user_info(self, uid, nickname, email, avatar, aboutme): + if( + type(uid) != int or + type(nickname) != str or + type(email) != str or + type(avatar) != str or + type(aboutme) != str + ): + return 'Eparameter' + + if len(nickname) < self.NICKNAME_LEN_MIN: + return 'Enickname_too_short' + elif len(nickname) > self.NICKNAME_LEN_MAX: + return 'Enickname_too_long' + elif len(email) < self.EMAIL_LEN_MIN: + return 'Eemail_too_short' + elif len(email) > self.EMAIL_LEN_MAX: + return 'Eemail_too_long' + elif len(avatar) < self.AVATAR_LEN_MIN: + return 'Eavatar_too_short' + elif len(avatar) > self.AVATAR_LEN_MAX: + return 'Eavatar_too_long' + elif len(aboutme) < self.ABOUTME_LEN_MIN: + return 'Eaboutme_too_short' + elif len(aboutme) > self.ABOUTME_LEN_MAX: + return 'Eaboutme_too_long' + + idenid = self.get_idenid_by_uid(uid) + if idenid == None: + return 'Eno_such_uid' + + if idenid != TOJAuth.get_current_iden()['idenid']: + TOJAuth.check_access( + self.auth_accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0) + + cur = self.db.cursor() + sqlstr = ('UPDATE "USER" SET "nickname" = %s, "email" = %s, ' + '"avatar" = %s, "aboutme" = %s WHERE "uid" = %s;') + sqlarr = (nickname, email, avatar, aboutme, uid) + cur.execute(sqlstr, sqlarr) + + @imc.async.caller + def change_user_password(self, uid, old_password, new_password): + if( + type(uid) != int or + type(old_password) != str or + type(new_password) != str + ): + return 'Eparameter' + + if len(new_password) < self.PASSWORD_LEN_MIN: + return 'Epassword_too_short' + elif len(new_password) > self.PASSWORD_LEN_MAX: + return 'Epassword_too_long' + + idenid = self.get_idenid_by_uid(uid) + if idenid == None: + return 'Eno_such_uid' + + if idenid != TOJAuth.get_current_iden()['idenid']: + TOJAuth.check_access( + self.auth_accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0) + + old_passhash = self._password_hash(old_password) + + cur = self.db.cursor() + sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s ' + 'AND "passhash" = %s;') + sqlarr = (uid, old_passhash) + cur.execute(sqlstr, sqlarr) + + idenid = None + for data in cur: + idenid = data[0] + + if idenid == None: + return 'Ewrong_old_password' + + new_passhash = self._password_hash(new_password) + + sqlstr = ('UPDATE "USER" SET "passhash" = %s WHERE "uid" = %s;') + sqlarr = (new_passhash, uid) + cur.execute(sqlstr, sqlarr) + + @imc.async.caller + def oauth_login(self): + raise NotImplementedError + + def _password_hash(self, password): + h = SHA512.new(bytes(password + config.USER_PASSHASH_SALT, 'utf-8')) + return h.hexdigest() + + def _uid_passhash_hash(self, uid, passhash): + return self._password_hash( + 'GENGJIAN_WEISUO_KING^^' + str(uid) + '@E__E@' + passhash + 'Yo!') + + def _get_user_info_by_uid(self, uid): + cur = self.db.cursor() + sqlstr = ('SELECT * FROM "USER" WHERE "uid" = %s;') + sqlarr = (uid, ) + cur.execute(sqlstr, sqlarr) + + ret = None + for data in cur: + ret = {} + ret['uid'] = data[0] + ret['username'] = data[1] + ret['nickname'] = data[3] + ret['email'] = data[4] + ret['avatar'] = data[5] + ret['aboutme'] = data[6] + + return ret + + def get_idenid_by_uid(self, uid): + cur = self.db.cursor() + sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s;') + sqlarr = (uid, ) + cur.execute(sqlstr, sqlarr) + + ret = None + for data in cur: + ret = data[0] + + return ret + + def get_uid_by_username(self, username): + cur = self.db.cursor() + sqlstr = ('SELECT "uid" FROM "USER" WHERE "username" = %s;') + sqlarr = (username, ) + cur.execute(sqlstr, sqlarr) + + uid = None + for data in cur: + uid = data[0] + + return uid + + def does_username_exist(self, username): + uid = self.get_uid_by_username(username) + + return uid != None + diff --git a/src/test/wstest.css b/src/test/wstest.css new file mode 100644 index 0000000..39b9fad --- /dev/null +++ b/src/test/wstest.css @@ -0,0 +1,81 @@ +body{ + overflow-y:scroll; +} + +div.head ul.right_navbar{ + margin-right:0px; + position:absolute; + top:0px; + left:auto; + right:0px; +} +div.head ul.right_navbar div.notice{ + height:100%; + padding:5px 0px 5px 0px; +} +div.head ul.right_navbar div.notice > div.box{ + width:30px; + padding:5px 0px 5px 0px; + background-color:#656765; + font-size:16px; + font-weight:bold; + text-align:center; + cursor:pointer; +} +div.head ul.right_navbar div:hover.notice > div.box{ + color:white; +} +div.head ul.right_navbar div.notice_h > div.box{ + background:#86C166; + color:white; +} +div.head ul.right_navbar div.menu{ + width:80px; + padding:10px 0px 10px 0px; + text-align:center; + cursor:pointer; +} + +div.panel_container{ + width:0px; + position:absolute; + top:40px; + right:0px; + border-left:#DDD 1px solid; + overflow:hidden; + + transition:width 200ms; +} +div.panel_container_a{ + width:240px; +} +div.panel_container > div.menu_container{ + width:240px; + display:none; +} +div.panel_container > div.menu_container > ul.menu > li > a{ + height:60px; + font-weight:bold; + line-height:60px; +} + +div.panel_container > div.notice_container{ + width:240px; + display:none; +} +div.panel_container > div.notice_container > ul.notice > li > a > div{ + height:60px; + color:black; +} + +div.modal{ + width:978px; + margin-left:-489px; +} + +div.panel_box{ + width:322px; + position:absolute; + top:0px; + right:0px; +} diff --git a/src/test/wstest.html b/src/test/wstest.html new file mode 100644 index 0000000..a17a0dc --- /dev/null +++ b/src/test/wstest.html @@ -0,0 +1,97 @@ +<!DOCTYPE HTML> +<html> +<head> +<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" /> +<meta name="viewport" content="width=device-width, initial-scale=1.0"> + +<link href="/bootstrap/css/bootstrap.min.css" rel="stylesheet"> +<link href="/bootstrap/css/bootstrap-responsive.min.css" rel="stylesheet"> + +<link href="/wstest.css" rel="stylesheet"> + +<script src="/jquery-2.0.0.min.js"></script> +<script src="/bootstrap/js/bootstrap.min.js"></script> +<script type="text/javascript" src="/imc.js"></script> +<script type="text/javascript" src="/wstest.js"></script> + +<script type="text/javascript"> + +$(document).ready(function(){ + conn_backend(); + + index.init(); +}); + +</script> +</head> +<body> + +<div id="index_head" class="navbar navbar-inverse navbar-fixed-top head"> + <div class="navbar-inner"> + <div class="container"> + <a class="brand span2" href="#">Taiwan OJ</a> + <ul class="nav bar"> + <li class="active"><a href="#">Profile</a></li> + <li><a href="#">Edit Profile</a></li> + </ul> + <p class="navbar-text span3 title">Taiwan Online Judge Test Page</p> + </div> + + <ul id="index_head_rightnavbar" class="nav bar right_navbar"> + <li><a id="index_head_nickname" href="#">Elisha</a></li> + <li><div id="index_head_notice" class="notice notice_h"><div class="box">2</div></div></li> + <li><div id="index_head_menu" class="menu"><i class="icon-chevron-left icon-white"></i></div></li> + </ul> + </div> +</div> + +<div id="index_panel" class="panel_container"> + <div class="menu_container"> + <ul id="index_panel_menu" class="nav nav-list menu"> + <li><a href="#"><i class="icon-home"></i> Home</a></li> + <li class="active"><a href="#"><i class="icon-exclamation-sign"></i> Test</a></li> + <li><a href="#"><i class="icon-th-large"></i> Square</a></li> + <ul class="nav nav-list sq_list"> + <li class="nav-header">Coming</li> + <li><a href="#">Test1</a></li> + <li><a href="#">Test2</a></li> + <li class="nav-header">Running</li> + <li><a href="#">Normal Test</a></li> + <li class="nav-header">Ended</li> + <li><a href="#">End Test</a></li> + </ul> + + <li style="margin-top:30px;"><a href="#"><i class="icon-circle-arrow-left"></i> Logout</a></li> + </ul> + </div> + + <div class="notice_container"> + <ul id="index_panel_menu" class="nav nav-list notice"> + <li><a href="#"><div><h5>Problem:573</h5>Wrong Answer</div></a></li> + <li><a href="#"><div><h5>Problem:573</h5>Runtime Error</div></a></li> + </ul> + </div> +</div> + +<div class="container" style="padding:60px 0px 32px 0px;"> + <div class="row"> + <div class="span2 slide"> + + </div> + <div class="span10 main"> + <button class="btn btn-primary" onclick="$('div.test_modal').modal('show');">Test Dialog</button> + <div class="modal hide fade test_modal" tabindex="-1" role="dialog" aria-hidden="true"> + <div class="modal-header"> + <button type="button" class="close" data-dismiss="modal" aria-hidden="true">×</button> + <h3>Test Dialog</h3> + </div> + <div class="modal-body"> + Hello, my name is Elisha. + </div> + </div> + </div> + </div> +</div> + +</body> +</html> diff --git a/src/test/wstest.js b/src/test/wstest.js new file mode 100644 index 0000000..54cead6 --- /dev/null +++ b/src/test/wstest.js @@ -0,0 +1,178 @@ +'use strict' + +var count = 0; +var last = 0; +var data = new ArrayBuffer(1024); + +var linkid = null; +var idendesc = null; + +function test_display(iden,param,callback){ + imc_call(idendesc,'/center/1/','test_dst','',function(result){ + console.log(result); + }); +} + +var WebSocketConnection = function(linkid,ws){ + var that = this; + var reader = new FileReader; + + that.__super__(linkid); + + that.send_msg = function(data){ + ws.send(new Blob([data],{'type':'application/octet-stream'})) + }; + that.start_recv = function(recv_callback){ + ws.onmessage = function(e){ + reader.onload = function(e){ + recv_callback(that,e.target.result); + }; + reader.readAsText(e.data); + } + }; + + ws.onclose = function(e){ + console.log('close'); + that.close(); + + setTimeout(conn_backend,5000); + }; +};__extend__(WebSocketConnection,imc.Connection); + +function conn_backend(ip,port){ + $.post('http://toj.tfcis.org:83/conn',{},function(res){ + var reto; + var iden; + var linkid; + var ws; + + if(res[0] != 'E'){ + reto = JSON.parse(res) + idendesc = reto.client_idendesc; + iden = JSON.parse(JSON.parse(idendesc)[0]); + linkid = iden.linkid; + + ws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn'); + ws.onopen = function(){ + var i; + var conn; + + console.log('open'); + + console.log(linkid); + ws.send(JSON.stringify({ + 'client_linkid':linkid + })); + + conn = new WebSocketConnection(reto.backend_linkid,ws); + + new imc.Auth(); + new imc.Proxy(linkid,imc.Auth.instance,function(linkid,callback){ + callback(conn); + }); + imc.Proxy.instance.add_conn(conn); + + imc_register_call('','test_display',test_display); + + + test_display(idendesc,'',function(result){ + console.log(result); + }); + }; + }else{ + setTimeout(conn_backend,5000); + } + }); +} + +function perf(){ + $('#speed').text((count - last) + '/s'); + last = count; + setTimeout(perf,1000); +} + +var index = new function(){ + this.init = function(){ + var j_navbar = $('#index_head ul.right_navbar'); + var j_navbar_menu = $('#index_head_menu'); + var j_navbar_notice = $('#index_head_notice'); + var j_panel = $('#index_panel'); + var j_panel_menu = $('#index_panel_menu'); + var j_panel_notice = $('#index_panel_notice'); + + var _in_area = function(target,id){ + return target.id == id || $(target).parents('#' + id).length > 0; + }; + var _show_panel = function(){ + var j_i; + + if(!j_panel.hasClass('panel_container_a')){ + j_i = j_navbar_menu.find('i'); + j_i.removeClass('icon-chevron-left'); + j_i.addClass('icon-chevron-down'); + + j_panel.addClass('panel_container_a'); + } + }; + var _hide_panel = function(){ + var j_i; + + if(j_panel.hasClass('panel_container_a')){ + j_i = j_navbar_menu.find('i'); + j_i.removeClass('icon-chevron-down'); + j_i.addClass('icon-chevron-left'); + + j_panel.removeClass('panel_container_a'); + } + }; + var _show_menu = function(){ + _hide_notice(); + j_panel.find('div.menu_container').show(); + _show_panel(); + }; + var _hide_menu = function(){ + j_panel.find('div.menu_container').hide(); + }; + var _show_notice = function(){ + _hide_menu(); + j_panel.find('div.notice_container').show(); + _show_panel(); + }; + var _hide_notice = function(){ + j_panel.find('div.notice_container').hide(); + }; + + $(window).on('resize',function(e){ + j_panel.css('min-height',($(window).height() - 40 + 'px')); + }); + j_panel.css('min-height',($(window).height() - 40 + 'px')); + + $(window).on('mouseover',function(e){ + var target = e.target; + + console.log(e.target); + if(target == null || + _in_area(target,'index_panel') || + (target.parentNode.id == 'index_head' && $(target).hasClass('navbar-inner'))){ + return; + } + + if(_in_area(target,'index_head_menu')){ + if(!j_panel.hasClass('panel_container_a')){ + _show_menu(); + } + }else{ + if(!_in_area(target,'index_head_rightnavbar')){ + _hide_panel(); + } + } + }); + + j_navbar_menu.on('click',function(e){ + _show_menu(); + }); + j_navbar_notice.on('click',function(e){ + _show_notice(); + }); + }; +}; |