aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-06-09 13:34:55 +0800
committerpzread <netfirewall@gmail.com>2013-06-09 13:34:55 +0800
commit77dd822815744579b05da117efb14f43b7088038 (patch)
treeea2388767586a740d06bde30c4e29c940f9a91c4 /src
parent872567a4cf3bff7d9d310f5e66f465f5523d58d9 (diff)
parent8cf636373548c8e3484a137268ddd041d12bbe4a (diff)
downloadtaiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.tar.gz
taiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.tar.zst
taiwan-online-judge-77dd822815744579b05da117efb14f43b7088038.zip
Merge branch '2.0'
Conflicts: README.md
Diffstat (limited to 'src')
-rw-r--r--src/js/imc.js220
-rw-r--r--src/py/asyncdb.py331
-rw-r--r--src/py/backend_server.py331
-rw-r--r--src/py/center_server.py235
-rw-r--r--src/py/imc/__init__.py1
-rw-r--r--src/py/imc/async.py121
-rw-r--r--src/py/imc/auth.py79
-rw-r--r--src/py/imc/nonblock.py61
-rwxr-xr-xsrc/py/imc/proxy.py559
-rw-r--r--src/py/netio.py511
-rw-r--r--src/py/tojauth.py179
-rwxr-xr-xsrc/py/user.py336
-rw-r--r--src/test/wstest.css81
-rw-r--r--src/test/wstest.html97
-rw-r--r--src/test/wstest.js178
15 files changed, 3320 insertions, 0 deletions
diff --git a/src/js/imc.js b/src/js/imc.js
new file mode 100644
index 0000000..8e444a1
--- /dev/null
+++ b/src/js/imc.js
@@ -0,0 +1,220 @@
+var __extend__ = function(child,parent){
+ child.prototype.__super__ = parent;
+};
+
+var imc = new function(){
+ this.Connection = function(linkid){
+ var that = this;
+
+ that.link_linkidmap = {};
+ that.close_callback = [];
+ that.linkid = linkid;
+
+ that.send_msg = function(data){};
+ that.start_recv = function(recv_callback){};
+
+ that.close = function(){
+ var i;
+
+ for(i = 0;i < that.close_callback.length;i++){
+ that.close_callback[i](that);
+ }
+ };
+ };
+
+ this.Proxy = function(linkid,auth,connect_linkid){
+ var MSGTYPE_CALL = 'call';
+ var MSGTYPE_RET = 'ret';
+
+ var that = this;
+ var caller_retid_count = 0;
+ var conn_linkidmap = {};
+ var conn_retidmap = {};
+ var call_pathmap = {};
+
+ var route_call = function(caller_retid,timeout,idendesc,dst,func_name,param,callback){
+ var i;
+ var part;
+ var dst_linkid;
+ var dst_path;
+ var caller_linkid;
+ var func;
+
+ var _add_wait_caller = function(conn_linkid){
+ conn_retidmap[conn_linkid][caller_retid] = {
+ 'timeout':timeout,
+ 'callback':callback
+ }
+ };
+
+ part = dst.split('/');
+ dst_linkid = part[2];
+ dst_path = part.slice(3).join('/');
+
+ iden = auth.get_iden(idendesc);
+
+ caller_linkid = iden.linkid
+ if(caller_retid.split('/')[0] != caller_linkid){
+ return false;
+ }
+
+ if(dst_linkid == linkid){
+ if((func = call_pathmap[dst_path + func_name]) != undefined){
+ _add_wait_caller(linkid);
+
+ func(iden,param,function(data){
+ if(linkid in conn_retidmap && caller_retid in conn_retidmap[linkid]){
+ delete conn_retidmap[linkid][caller_retid];
+ callback({'stat':true,'data':data});
+ }
+ });
+ }else{
+ callback({'stat':false,'data':'Enoexist'});
+ }
+ }else{
+ that.request_conn(dst_linkid,function(conn){
+ if(caller_linkid == linkid){
+ _add_wait_caller(conn.linkid);
+ }
+
+ send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param);
+ });
+ }
+ };
+
+ var recv_dispatch = function(conn,data){
+ msgo = JSON.parse(data);
+ if(msgo.type == MSGTYPE_CALL){
+ recv_msg_call(conn,msgo);
+ }else if(msgo.type == MSGTYPE_RET){
+ recv_msg_ret(conn,msgo);
+ }
+ };
+
+ var send_msg_call = function(conn,caller_retid,timeout,idendesc,dst,func_name,param){
+ msg = {
+ 'type':MSGTYPE_CALL,
+ 'caller_retid':caller_retid,
+ 'timeout':timeout,
+ 'idendesc':idendesc,
+ 'dst':dst,
+ 'func_name':func_name,
+ 'param':param
+ };
+
+ conn.send_msg(JSON.stringify(msg));
+ };
+ var recv_msg_call = function(conn,msg){
+ var caller_retid = msg.caller_retid;
+ var timeout = msg.timeout;
+ var idendesc = msg.idendesc;
+ var dst = msg.dst;
+ var func_name = msg.func_name;
+ var param = msg.param;
+
+ route_call(caller_retid,timeout,idendesc,dst,func_name,param,function(result){
+ that.request_conn(caller_retid,function(conn){
+ var iden;
+
+ iden = auth.get_iden(idendesc);
+ send_msg_ret(conn,iden.linkid,caller_retid,result);
+ });
+ });
+ };
+
+ var send_msg_ret = function(conn,caller_linkid,caller_retid,result){
+ msg = {
+ 'type':MSGTYPE_RET,
+ 'caller_linkid':caller_linkid,
+ 'caller_retid':caller_retid,
+ 'result':result
+ };
+
+ conn.send_msg(JSON.stringify(msg));
+ };
+ var recv_msg_ret = function(conn,msg){
+ var caller_linkid = msg['caller_linkid'];
+ var caller_retid = msg['caller_retid'];
+ var result = msg['result'];
+
+ if(caller_linkid == linkid){
+ if(conn.linkid in conn_retidmap && caller_retid in conn_retidmap[conn.linkid]){
+ wait = conn_retidmap[conn.linkid][caller_retid];
+ delete conn_retidmap[conn.linkid][caller_retid];
+
+ wait.callback(result);
+ }
+ }else{
+ request_conn(caller_linkid,function(conn){
+ send_msg_ret(conn,caller_linkid,caller_retid,result);
+ });
+ }
+ };
+
+ that.add_conn = function(conn){
+ conn_linkidmap[conn.linkid] = conn;
+ conn_retidmap[conn.linkid] = {};
+ conn.start_recv(recv_dispatch);
+ };
+ that.link_conn = function(linkid,conn){
+ conn.link_linkidmap[linkid] = true;
+ conn_linkidmap[linkid] = conn;
+ };
+ that.unlink_conn = function(linkid){
+ conn = conn_linkidmap[linkid];
+ delete conn_linkidmap[linkid];
+ delete conn.link_linkidmap[linkid];
+ };
+ that.del_conn = function(conn){
+ delete conn_linkidmap[conn.linkid];
+ };
+ that.request_conn = function(linkid,callback){
+ var _conn_cb = function(conn){
+ if(conn != null && conn.linkid != linkid){
+ that.link_conn(linkid,conn);
+ }
+
+ callback(conn);
+ };
+
+ conn = conn_linkidmap[linkid];
+ if(conn != undefined){
+ _conn_cb(conn);
+ }else{
+ connect_linkid(linkid,_conn_cb);
+ }
+ };
+
+ that.call = function(idendesc,timeout,dst,func_name,param,callback){
+ caller_retid = linkid + '/' + caller_retid_count;
+ caller_retid_count += 1;
+
+ route_call(caller_retid,timeout,idendesc,dst,func_name,param,callback);
+ };
+
+ that.register_call = function(path,func_name,func){
+ call_pathmap[path + func_name] = func;
+ };
+
+ conn_retidmap[linkid] = {};
+
+ imc.Proxy.instance = that;
+ };
+
+ this.Auth = function(){
+ var that = this;
+
+ that.get_iden = function(idendesc){
+ return JSON.parse(JSON.parse(idendesc)[0]);
+ };
+
+ imc.Auth.instance = that;
+ };
+};
+
+function imc_call(idendesc,dst,func_name,param,callback){
+ imc.Proxy.instance.call(idendesc,10000,dst,func_name,param,callback);
+}
+function imc_register_call(path,func_name,func){
+ imc.Proxy.instance.register_call(path,func_name,func);
+}
diff --git a/src/py/asyncdb.py b/src/py/asyncdb.py
new file mode 100644
index 0000000..1e2bf7a
--- /dev/null
+++ b/src/py/asyncdb.py
@@ -0,0 +1,331 @@
+from collections import deque
+import random
+
+import tornado.ioloop
+import tornado.stack_context
+import psycopg2
+
+import imc.async
+
+class RestrictCursor:
+ def __init__(self,db,cur):
+ self._db = db
+ self._cur = cur
+ self._ori_cur = cur
+ self._in_transaction = False
+
+ self._init_implement()
+
+ def __iter__(self):
+ return self._cur
+
+ def execute(self,sql,param = None):
+ self._db.execute(self._cur,sql,param)
+
+ self.arraysize = self._cur.arraysize
+ self.itersize = self._cur.itersize
+ self.rowcount = self._cur.rowcount
+ self.rownumber = self._cur.rownumber
+ self.lastrowid = self._cur.lastrowid
+ self.query = self._cur.query
+ self.statusmessage = self._cur.statusmessage
+
+ def begin(self):
+ if self._in_transaction == True:
+ return
+
+ self._cur = self._db.begin_transaction()
+ self._init_implement()
+
+ self._db.execute(self._cur,'BEGIN;')
+
+ self._in_transaction = True
+
+ def commit(self):
+ if self._in_transaction == False:
+ return
+
+ self._db.execute(self._cur,'COMMIT;')
+ if self._cur.statusmessage == 'COMMIT':
+ ret = True
+
+ else:
+ ret = False
+
+ self._db.end_transaction(self._cur.connection)
+ self._cur = self._ori_cur
+
+ self._in_transaction = False
+
+ return ret
+
+ def rollback(self):
+ if self._in_transaction == False:
+ return
+
+ self._db.execute(self._cur,'ROLLBACK;')
+
+ self._db.end_transaction(self._cur.connection)
+ self._cur = self._ori_cur
+
+ self._in_transaction = False
+
+ def upsert(self,name,cond,value = None):
+ cond_keys = []
+ cond_vals = []
+ items = cond.items()
+ for key,val in items:
+ cond_keys.append('"' + key + '"')
+ cond_vals.append(val)
+
+ value_keys = []
+ value_vals = []
+ if value != None:
+ items = value.items()
+ for key,val in items:
+ value_keys.append('"' + key + '"')
+ value_vals.append(val)
+
+ query_list = ['UPDATE "' + name + '" SET ']
+ for key in value_keys:
+ query_list.append(key)
+ query_list.append('=%s')
+ query_list.append(',')
+
+ query_list[-1] = ' WHERE '
+
+ for key in cond_keys:
+ query_list.append(key)
+ query_list.append('=%s')
+ query_list.append(' AND ')
+
+ query_list[-1] = ';'
+ update_query = ''.join(query_list)
+
+ update_param = list(value_vals)
+ update_param.extend(cond_vals)
+
+ query_list = ['INSERT INTO "' + name + '" (']
+ for key in cond_keys:
+ query_list.append(key)
+ query_list.append(',')
+ for key in value_keys:
+ query_list.append(key)
+ query_list.append(',')
+
+ count = len(cond)
+ if value != None:
+ count += len(value)
+
+ query_list[-1] = ') VALUES ('
+ query_list.extend(['%s,'] * (count - 1))
+ query_list.append('%s);')
+ insert_query = ''.join(query_list)
+
+ insert_param = list(cond_vals)
+ insert_param.extend(value_vals)
+
+ while True:
+ self.begin()
+
+ if value != None:
+ self.execute(update_query,update_param)
+
+ if value == None or self.rowcount == 0:
+ try:
+ self.execute(insert_query,insert_param)
+
+ except psycopg2.IntegrityError:
+ self.rollback()
+ if value == None:
+ break
+
+ else:
+ continue
+
+ if self.commit() == True:
+ break
+
+ def _init_implement(self):
+ self.fetchone = self._cur.fetchone
+ self.fetchmany = self._cur.fetchmany
+ self.fetchall = self._cur.fetchall
+ self.scroll = self._cur.scroll
+ self.cast = self._cur.cast
+ self.tzinfo_factory = self._cur.tzinfo_factory
+
+ self.arraysize = 0
+ self.itersize = 0
+ self.rowcount = 0
+ self.rownumber = 0
+ self.lastrowid = None
+ self.query = ''
+ self.statusmessage = ''
+
+class AsyncDB:
+ def __init__(self,dbname,user,password):
+ self.OPER_CURSOR = 0
+ self.OPER_EXECUTE = 1
+
+ self._ioloop = tornado.ioloop.IOLoop.instance()
+
+ self._dbname = dbname
+ self._user = user
+ self._password = password
+ self._conn_fdmap = {}
+ self._free_connpool = []
+ self._share_connpool = []
+ self._pendoper_fdmap = {}
+ self._opercallback_fdmap = {}
+
+ for i in range(8):
+ conn = self._create_conn()
+ self._free_connpool.append(conn)
+
+ self._ioloop.add_handler(conn.fileno(),
+ self._oper_dispatch,
+ tornado.ioloop.IOLoop.ERROR)
+
+ self._ioloop.add_callback(self._oper_dispatch,conn.fileno(),0)
+
+ for i in range(2):
+ conn = self._create_conn()
+ self._share_connpool.append(conn)
+
+ self._ioloop.add_handler(conn.fileno(),
+ self._oper_dispatch,
+ tornado.ioloop.IOLoop.ERROR)
+
+ self._ioloop.add_callback(self._oper_dispatch,conn.fileno(),0)
+
+ def cursor(self):
+ return RestrictCursor(self,self._cursor())
+
+ def execute(self,cur,sql,param = None):
+ fd = cur.connection.fileno()
+
+ self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),imc.async.get_retid()))
+ self._ioloop.add_callback(self._oper_dispatch,fd,0)
+
+ imc.async.switch_top()
+
+ def begin_transaction(self):
+ if len(self._free_connpool) > 0:
+ conn = self._free_connpool.pop()
+
+ else:
+ conn = self._create_conn()
+ self._ioloop.add_handler(conn.fileno(),
+ self._oper_dispatch,
+ tornado.ioloop.IOLoop.ERROR)
+
+ return self._cursor(conn)
+
+ def end_transaction(self,conn):
+ if len(self._free_connpool) < 16:
+ self._free_connpool.append(conn)
+
+ else:
+ self._close_conn(conn)
+
+ def _cursor(self,conn = None):
+ if conn != None:
+ fd = conn.fileno()
+
+ else:
+ fd = self._share_connpool[random.randrange(len(self._share_connpool))].fileno()
+
+ self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,imc.async.get_retid()))
+ self._ioloop.add_callback(self._oper_dispatch,fd,0)
+
+ cur = imc.async.switch_top()
+ return cur
+
+ def _create_conn(self):
+ conn = psycopg2.connect(database = self._dbname,
+ user = self._user,
+ password = self._password,
+ async = 1)
+
+ fd = conn.fileno()
+ self._conn_fdmap[fd] = conn
+ self._pendoper_fdmap[fd] = deque()
+ self._opercallback_fdmap[fd] = None
+
+ return conn
+
+ def _close_conn(self,conn):
+ fd = conn.fileno()
+ self._conn_fdmap.pop(fd,None)
+ self._pendoper_fdmap.pop(fd,None)
+ self._opercallback_fdmap.pop(fd,None)
+
+ conn.close()
+
+ def _oper_dispatch(self,fd,evt):
+ err = None
+ try:
+ conn = self._conn_fdmap[fd]
+
+ except KeyError:
+ self._ioloop.remove_handler(fd)
+ return
+
+ try:
+ stat = conn.poll()
+
+ except Exception as e:
+ err = e
+
+ if err != None or stat == psycopg2.extensions.POLL_OK:
+ self._ioloop.update_handler(fd,
+ tornado.ioloop.IOLoop.ERROR)
+
+ elif stat == psycopg2.extensions.POLL_READ:
+ self._ioloop.update_handler(fd,
+ tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.ERROR)
+
+ return
+
+ elif stat == psycopg2.extensions.POLL_WRITE:
+ self._ioloop.update_handler(fd,
+ tornado.ioloop.IOLoop.WRITE | tornado.ioloop.IOLoop.ERROR)
+
+ return
+
+ cb = self._opercallback_fdmap[fd]
+ if cb != None:
+ self._opercallback_fdmap[fd] = None
+ cb(err)
+
+ else:
+ try:
+ oper,data,retid = self._pendoper_fdmap[fd].popleft()
+
+ except IndexError:
+ return
+
+ if oper == self.OPER_CURSOR:
+ def _ret_cursor(err = None):
+ if err == None:
+ imc.async.ret(retid,conn.cursor())
+
+ else:
+ imc.async.ret(retid,err = err)
+
+ self._opercallback_fdmap[fd] = _ret_cursor
+
+ elif oper == self.OPER_EXECUTE:
+ def _ret_execute(err = None):
+ if err == None:
+ imc.async.ret(retid)
+
+ else:
+ imc.async.ret(retid,err = err)
+
+ cur,sql,param = data
+
+ cur.execute(sql,param)
+ self._opercallback_fdmap[fd] = _ret_execute
+
+ self._ioloop.add_callback(self._oper_dispatch,fd,0)
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
new file mode 100644
index 0000000..7535183
--- /dev/null
+++ b/src/py/backend_server.py
@@ -0,0 +1,331 @@
+#! /usr/bin/env python
+
+import socket
+import json
+import datetime
+import time
+import random
+from multiprocessing import Process
+
+import tornado.ioloop
+import tornado.tcpserver
+import tornado.httpserver
+import tornado.websocket
+
+from imc import auth
+import imc.async
+from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call
+
+import netio
+from netio import SocketStream,SocketConnection,WebSocketConnection
+from tojauth import TOJAuth
+
+class BackendWorker(tornado.tcpserver.TCPServer):
+ def __init__(self,center_addr,ws_port):
+ super().__init__()
+
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self.center_addr = center_addr
+ self.sock_addr = None
+ self.ws_port = ws_port
+
+ self._linkid = None
+ self._idendesc = None
+ self._pend_mainconn_linkidmap = {}
+ self._pend_filestream_filekeymap = {}
+ self._client_linkidmap = {}
+
+ def start(self):
+ sock_port = random.randrange(4096,8192)
+ self.sock_addr = ('10.8.0.10',sock_port)
+
+ self.bind(sock_port,'',socket.AF_INET,65536)
+ super().start()
+
+ self._conn_center()
+
+ def handle_stream(self,stream,addr):
+ def _recv_conn_info(data):
+ info = json.loads(data.decode('utf-8'))
+ conntype = info['conntype']
+
+ if conntype == 'main':
+ self._handle_mainconn(sock_stream,addr,info)
+
+ elif conntype == 'file':
+ self._handle_fileconn(sock_stream,addr,info)
+
+ fd = stream.fileno()
+ self._ioloop.remove_handler(fd)
+ sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0))
+
+ netio.recv_pack(sock_stream,_recv_conn_info)
+
+ def add_client(self,linkid,handler):
+ self._client_linkidmap[linkid] = {}
+
+ conn = netio.WebSocketConnection('client',linkid,handler)
+ conn.add_close_callback(lambda conn : self.del_client(conn.linkid))
+ Proxy.instance.add_conn(conn)
+
+ #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','add_client',{'backend_linkid':self._linkid,'client_linkid':linkid})
+
+ return conn
+
+ def del_client(self,linkid):
+ del self._client_linkidmap[linkid]
+
+ #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','del_client',linkid)
+
+ def _conn_center(self):
+ def __retry(conn):
+ print('retry connect center')
+
+ self.center_conn = None
+ self._ioloop.add_timeout(datetime.timedelta(seconds = 5),self._conn_center)
+
+ def __send_worker_info():
+ def ___recv_info_cb(data):
+ info = json.loads(data.decode('utf-8'))
+
+ pubkey = open('pubkey.pem','r').read()
+ TOJAuth(pubkey)
+
+ self._idendesc = info['idendesc']
+ iden = TOJAuth.instance.get_iden('backend',self._linkid,self._idendesc)
+ self._linkid = iden['linkid']
+ Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid)
+
+ self.center_conn = SocketConnection('center',info['center_linkid'],stream,self.center_addr)
+ self.center_conn.add_close_callback(__retry)
+ Proxy.instance.add_conn(self.center_conn)
+
+ imc_register_call('','test_dst',self._test_dst)
+ #imc_register_call('','test_dsta',self._test_dsta)
+ #time.sleep(2)
+
+ if int(self._linkid) == 2:
+ self._test_call('9')
+
+ sock_ip,sock_port = self.sock_addr
+ netio.send_pack(stream,bytes(json.dumps({
+ 'linkclass':'backend',
+ 'sock_ip':sock_ip,
+ 'sock_port':sock_port,
+ 'ws_ip':'210.70.137.215',
+ 'ws_port':self.ws_port
+ }),'utf-8'))
+ netio.recv_pack(stream,___recv_info_cb)
+
+ stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ stream.set_close_callback(__retry)
+ stream.connect(self.center_addr,__send_worker_info)
+
+ def _conn_linkid(self,linkid):
+ def __handle_pend(conn):
+ try:
+ retids = self._pend_mainconn_linkidmap.pop(worker_linkid)
+
+ except KeyError:
+ return
+
+ for retid in retids:
+ imc.async.ret(retid,conn)
+
+ def __conn_cb():
+ conn = Proxy.instance.get_conn(worker_linkid)
+ if conn != None:
+ __handle_pend(conn)
+ main_stream.set_close_callback(None)
+ main_stream.close()
+
+ else:
+ sock_ip,sock_port = self.sock_addr
+ netio.send_pack(main_stream,bytes(json.dumps({
+ 'conntype':'main',
+ 'linkclass':'backend',
+ 'linkid':self._linkid,
+ 'sock_ip':sock_ip,
+ 'sock_port':sock_port
+ }),'utf-8'))
+ netio.recv_pack(main_stream,__recv_cb)
+
+ def __recv_cb(data):
+ stat = json.loads(data.decode('utf-8'))
+ if stat == True:
+ conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,sock_addr,self._add_pend_filestream)
+ Proxy.instance.add_conn(conn)
+ __handle_pend(conn)
+
+ else:
+ main_stream.set_close_callback(None)
+ main_stream.close()
+
+ if self.center_conn == None:
+ return None
+
+ stat,ret = imc_call(self._idendesc,'/center/' + self.center_conn.linkid + '/','lookup_linkid',linkid)
+
+ if stat == False or ret == None:
+ return None
+
+ else:
+ worker_linkclass = ret['worker_linkclass']
+ worker_linkid = ret['worker_linkid']
+
+ conn = Proxy.instance.get_conn(worker_linkid)
+ if conn != None:
+ return conn
+
+ elif worker_linkid in self._pend_mainconn_linkidmap:
+ self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.get_retid())
+ return imc.async.switch_top()
+
+ else:
+ self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.get_retid()]
+
+ sock_addr = (ret['sock_ip'],ret['sock_port'])
+
+ main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ main_stream.set_close_callback(lambda conn : __handle_pend(None))
+ main_stream.connect(sock_addr,__conn_cb)
+
+ return imc.async.switch_top()
+
+ def _add_pend_filestream(self,filekey,callback):
+ self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback)
+
+ def _handle_mainconn(self,main_stream,addr,info):
+ linkclass = info['linkclass']
+ linkid = info['linkid']
+ sock_ip = info['sock_ip']
+ sock_port = info['sock_port']
+
+ conn = Proxy.instance.get_conn(linkid)
+ if conn != None:
+ return
+
+ if (linkid not in self._pend_mainconn_linkidmap) or self._linkid > linkid:
+ conn = SocketConnection(linkclass,linkid,main_stream,(sock_ip,sock_port),self._add_pend_filestream)
+ Proxy.instance.add_conn(conn)
+
+ netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8'))
+
+ if linkid in self._pend_mainconn_linkidmap:
+ retids = self._pend_mainconn_linkidmap.pop(linkid)
+ for retid in retids:
+ imc.async.ret(retid,conn)
+
+ else:
+ netio.send_pack(main_stream,bytes(json.dumps(False),'utf-8'))
+
+ def _handle_fileconn(self,file_stream,addr,info):
+ try:
+ self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream)
+
+ except KeyError:
+ pass
+
+ @imc.async.caller
+ def _test_call(self,param):
+ dst = '/backend/' + '3' + '/'
+ ret = imc_call_async(self._idendesc,dst,'test_dst',lambda result : print(result),'test',113)
+ print(ret)
+
+ ret = imc_call(self._idendesc,'/center/1/','create_iden','client','1234',1221,TOJAuth.ROLETYPE_USER,{'uid':31})
+ print(ret)
+
+ return
+
+ pend = []
+ for i in range(0,32):
+ if str((i % 16) + 2) == self._linkid:
+ continue
+
+ fileres = Proxy.instance.sendfile('/backend/' + str((i % 16) + 2) + '/','Fedora-18-x86_64-DVD.iso')
+
+ dst = '/backend/' + str((i % 16) + 2) + '/'
+ ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey)
+
+ pend.append(fileres)
+
+ for p in pend:
+ print(self._linkid + ' ' + p.wait())
+
+ print(self._linkid)
+
+ @imc.async.caller
+ def _test_dst(self,param,sdfsdf):
+ #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param)
+ #return ret + ' Too'
+
+ print(param)
+ print(sdfsdf)
+ print(TOJAuth.get_current_iden())
+
+ #Proxy.instance.rejectfile(param)
+ #print('recv ' + iden['linkid'] + ' > ' + self._linkid)
+ #fileres = Proxy.instance.recvfile(param,'data')
+ #print('recv ' + fileres.wait())
+
+ return 'ok'
+
+ @imc.async.caller
+ def _test_dsta(self,iden,param):
+ return param + ' Too'
+
+class WebSocketConnHandler(tornado.websocket.WebSocketHandler):
+ def open(self):
+ pass
+
+ def on_message(self,msg):
+ global backend_worker
+
+ if hasattr(self,'backend_conn'):
+ self.backend_conn.recv_msg(msg)
+
+ else:
+ try:
+ info = json.loads(msg)
+ self.backend_conn = backend_worker.add_client(info['client_linkid'],self)
+
+ except Exception:
+ self.close()
+
+ def on_close(self):
+ global backend_backend
+
+ if hasattr(self,'backend_conn'):
+ self.backend_conn.close()
+
+def start_backend_worker(ws_port):
+ global backend_worker
+
+ http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([
+ ('/conn',WebSocketConnHandler)
+ ]))
+ http_serv.listen(ws_port)
+
+ backend_worker = BackendWorker(('10.8.0.10',5730),ws_port)
+ backend_worker.start()
+
+ tornado.ioloop.IOLoop.instance().start()
+
+if __name__ == '__main__':
+ worker_list = []
+
+ worker_list.append(Process(target = start_backend_worker,args = (81, )))
+ worker_list.append(Process(target = start_backend_worker,args = (82, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (181, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (182, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (183, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (184, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (185, )))
+ #worker_list.append(Process(target = start_backend_worker,args = (186, )))
+
+ for proc in worker_list:
+ proc.start()
+
+ for proc in worker_list:
+ proc.join()
+
diff --git a/src/py/center_server.py b/src/py/center_server.py
new file mode 100644
index 0000000..f54a57c
--- /dev/null
+++ b/src/py/center_server.py
@@ -0,0 +1,235 @@
+#! /usr/bin/env python
+
+import random
+import json
+import uuid
+import socket
+
+import tornado.ioloop
+import tornado.tcpserver
+import tornado.httpserver
+import tornado.web
+
+import imc.async
+from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call
+
+import netio
+from netio import SocketStream,SocketConnection
+from tojauth import TOJAuth
+
+class Worker:
+ def __init__(self,main_stream,linkclass,linkid,idendesc,worker_info,center_linkid):
+ self.main_stream = main_stream
+ self.linkclass = linkclass
+ self.linkid = linkid
+ self.idendesc = idendesc
+ self.sock_addr = (worker_info['sock_ip'],worker_info['sock_port'])
+
+ netio.send_pack(self.main_stream,bytes(json.dumps({
+ 'idendesc':self.idendesc,
+ 'center_linkid':center_linkid
+ }),'utf-8'))
+
+ conn = SocketConnection(self.linkclass,self.linkid,self.main_stream,self.sock_addr)
+ conn.add_close_callback(lambda conn : self.close())
+ Proxy.instance.add_conn(conn)
+
+ def close(self):
+ pass
+
+class BackendWorker(Worker):
+ def __init__(self,main_stream,linkid,idendesc,worker_info,center_linkid):
+ global center_serv
+
+ super().__init__(main_stream,'backend',linkid,idendesc,worker_info,center_linkid)
+ self.ws_addr = (worker_info['ws_ip'],worker_info['ws_port'])
+
+ center_serv.add_backend_worker(self)
+
+ def close(self):
+ global center_serv
+
+ center_serv.del_backend_worker(self)
+ print('disconnect')
+
+class CenterServer(tornado.tcpserver.TCPServer):
+ def __init__(self):
+ super().__init__()
+
+ self._ioloop = tornado.ioloop.IOLoop.instance()
+ self._linkid_usemap = {}
+ self._worker_linkidmap = {}
+ self._backend_clientmap = {}
+ self._backend_workerlist = []
+
+ pubkey = open('pubkey.pem','r').read()
+ privkey = open('privkey.pem','r').read()
+ TOJAuth(pubkey,privkey)
+
+ self._linkid = self._create_linkid()
+
+ self._idendesc = TOJAuth.instance.create_iden('center',self._linkid,1,TOJAuth.ROLETYPE_TOJ)
+ Proxy('center',self._linkid,TOJAuth.instance,self._idendesc)
+
+ imc_register_call('','lookup_linkid',self._lookup_linkid)
+ imc_register_call('','create_iden',self._create_iden)
+ imc_register_call('','add_client',self._add_client)
+ imc_register_call('','del_client',self._del_client)
+
+ imc_register_call('','test_dst',self._test_dst)
+ imc_register_call('','test_dstb',self._test_dstb)
+
+ def handle_stream(self,stream,addr):
+ def _recv_worker_info(data):
+ worker_info = json.loads(data.decode('utf-8'))
+
+ linkclass = worker_info['linkclass']
+ if linkclass == 'backend':
+ linkid = self._create_linkid()
+ idendesc = TOJAuth.instance.create_iden('backend',linkid,1,TOJAuth.ROLETYPE_TOJ)
+ BackendWorker(main_stream,linkid,idendesc,worker_info,self._linkid)
+
+ fd = stream.fileno()
+ self._ioloop.remove_handler(fd)
+ main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0))
+
+ netio.recv_pack(main_stream,_recv_worker_info)
+
+ def add_backend_worker(self,backend):
+ backend_linkid = backend.linkid
+
+ self._worker_linkidmap[backend_linkid] = backend
+ self._backend_clientmap[backend_linkid] = {}
+ self._backend_workerlist.append(backend)
+
+ def del_backend_worker(self,backend):
+ backend_linkid = backend.linkid
+
+ del self._worker_linkidmap[backend_linkid]
+ del self._backend_clientmap[backend_linkid]
+ self._backend_workerlist.remove(backend)
+
+ def dispatch_client(self):
+ size = len(self._backend_workerlist)
+ if size == 0:
+ return None
+
+ linkid = self._create_linkid()
+ idendesc = TOJAuth.instance.create_iden('client',linkid,2,TOJAuth.ROLETYPE_GUEST)
+ backend = self._backend_workerlist[random.randrange(size)]
+ ws_ip,ws_port = backend.ws_addr
+
+ return (idendesc,backend.linkid,ws_ip,ws_port)
+
+ def _create_linkid(self):
+ linkid = uuid.uuid1()
+ while linkid in self._linkid_usemap:
+ linkid = uuid.uuid1()
+
+ linkid = str(linkid)
+ self._linkid_usemap[linkid] = True
+
+ linkid = str(len(self._linkid_usemap))
+
+ return linkid
+
+ @imc.async.caller
+ def _lookup_linkid(self,linkid):
+ try:
+ worker = self._worker_linkidmap[linkid]
+
+ #a = int(iden['linkid'])
+ #b = int(linkid)
+
+ #if b > a:
+ # worker = self._worker_linkidmap[str(a + 1)]
+
+ #else:
+ # worker = self._worker_linkidmap[str(a - 1)]
+
+ if TOJAuth.get_current_iden()['linkclass'] != 'client':
+ sock_ip,sock_port = worker.sock_addr
+ return {
+ 'worker_linkclass':worker.linkclass,
+ 'worker_linkid':worker.linkid,
+ 'sock_ip':sock_ip,
+ 'sock_port':sock_port
+ }
+
+ except KeyError:
+ return None
+
+ @imc.async.caller
+ @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE)
+ def _create_iden(self,linkclass,linkid,idenid,roletype,payload):
+ return TOJAuth.instance.create_iden(linkclass,linkid,idenid,roletype,payload)
+
+ @imc.async.caller
+ def _add_client(self,param):
+ backend_linkid = iden['linkid']
+ client_linkid = param['client_linkid']
+
+ self._backend_clientmap[backend_linkid][client_linkid] = True
+ conn = Proxy.instance.get_conn(backend_linkid)
+ Proxy.instance.link_conn(client_linkid,conn)
+
+ print(client_linkid);
+
+ @imc.async.caller
+ def _del_client(self,param):
+ backend_linkid = iden['linkid']
+ client_linkid = param
+
+ del self._backend_clientmap[backend_linkid][client_linkid]
+ conn = Proxy.instance.get_conn(client_linkid)
+ Proxy.instance.unlink_conn(client_linkid)
+
+
+
+
+ @imc.async.caller
+ def _test_dst(self,param):
+ linkidlist = []
+ clientmaps = self._backend_clientmap.values()
+ for clientmap in clientmaps:
+ linkids = clientmap.keys()
+ for linkid in linkids:
+ linkidlist.append(linkid)
+
+ return linkidlist
+
+ @imc.async.caller
+ def _test_dstb(self,param):
+ return param + ' World'
+
+class WebConnHandler(tornado.web.RequestHandler):
+ def set_default_headers(self):
+ self.set_header('Access-Control-Allow-Origin','*')
+
+ def post(self):
+ global center_serv
+
+ data = center_serv.dispatch_client()
+ if data == None:
+ self.write('Eno_backend')
+ else:
+ client_idendesc,backend_linkid,ip,port = data
+ self.write(json.dumps({
+ 'client_idendesc':client_idendesc,
+ 'backend_linkid':backend_linkid,
+ 'ip':ip,
+ 'port':port
+ }))
+
+if __name__ == '__main__':
+ global center_serv
+
+ center_serv = CenterServer()
+ center_serv.listen(5730)
+
+ http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([
+ ('/conn',WebConnHandler),
+ ]))
+ http_serv.listen(83)
+
+ tornado.ioloop.IOLoop.instance().start()
diff --git a/src/py/imc/__init__.py b/src/py/imc/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/py/imc/__init__.py
@@ -0,0 +1 @@
+
diff --git a/src/py/imc/async.py b/src/py/imc/async.py
new file mode 100644
index 0000000..c0df338
--- /dev/null
+++ b/src/py/imc/async.py
@@ -0,0 +1,121 @@
+import traceback
+import uuid
+import ssl
+
+import tornado.stack_context
+from Crypto.Hash import SHA512
+from greenlet import greenlet
+
+from imc import auth
+
+gr_idmap = {}
+ret_idmap = {}
+gr_main = greenlet.getcurrent()
+
+def switch_top():
+ global gr_main
+
+ assert greenlet.getcurrent() != gr_main
+
+ old_iden = auth.current_iden
+ old_contexts = tornado.stack_context._state.contexts
+ auth.current_iden = None
+
+ result = gr_main.switch(None)
+
+ tornado.stack_context._state.contexts = old_contexts
+ auth.current_iden = old_iden
+
+ return result
+
+def caller(f):
+ def wrapper(*args,**kwargs):
+ global gr_main
+ global gr_idmap
+ global ret_idmap
+
+ def _call(*args,**kwargs):
+ ret = f(*args,**kwargs)
+ retids = gr_idmap[grid]
+ for retid in retids:
+ del ret_idmap[retid]
+
+ del gr_idmap[grid]
+
+ return (True,ret)
+
+ try:
+ gr = greenlet(_call)
+ grid = id(gr)
+ gr_idmap[grid] = set()
+ old_iden = auth.current_iden
+ old_contexts = tornado.stack_context._state.contexts
+
+ result = gr.switch(*args,**kwargs)
+
+ tornado.stack_context._state.contexts = old_contexts
+ auth.current_iden = old_iden
+
+ if result == None:
+ return (False,None)
+
+ if gr.dead == False:
+ gr.parent = gr_main
+
+ return result
+
+ except TypeError as err:
+ traceback.print_stack()
+ print(err)
+ return (False,'Eparameter')
+
+ except Exception as err:
+ traceback.print_stack()
+ print(err)
+ return (False,'Einternal')
+
+ return wrapper
+
+def get_retid():
+ global gr_idmap
+ global ret_idmap
+
+ gr = greenlet.getcurrent()
+ grid = id(gr)
+ retid = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
+
+ gr_idmap[grid].add(retid)
+ ret_idmap[retid] = gr
+
+ return retid
+
+def ret(retid,value = None,err = None):
+ global gr_main
+ global gr_idmap
+ global ret_idmap
+
+ assert greenlet.getcurrent() == gr_main
+
+ try:
+ gr = ret_idmap.pop(retid)
+ gr_idmap[id(gr)].remove(retid)
+
+ except KeyError:
+ return
+
+ try:
+ old_iden = auth.current_iden
+ old_contexts = tornado.stack_context._state.contexts
+
+ if err == None:
+ gr.switch(value)
+
+ else:
+ gr.throw(err)
+
+ tornado.stack_context._state.contexts = old_contexts
+ auth.current_iden = old_iden
+
+ except TypeError as err:
+ traceback.print_stack()
+ print(err)
diff --git a/src/py/imc/auth.py b/src/py/imc/auth.py
new file mode 100644
index 0000000..03c15dc
--- /dev/null
+++ b/src/py/imc/auth.py
@@ -0,0 +1,79 @@
+import time
+import json
+import binascii
+import contextlib
+
+import tornado.stack_context
+from Crypto.PublicKey import RSA
+from Crypto.Hash import SHA512
+from Crypto.Signature import PKCS1_v1_5
+
+current_iden = None
+
+class Auth:
+ def __init__(self):
+ global current_iden
+
+ self._cache_hashmap = {}
+ current_iden = None
+
+ @staticmethod
+ def get_current_iden():
+ global current_iden
+
+ return current_iden
+
+ @staticmethod
+ def change_current_iden(iden):
+ @contextlib.contextmanager
+ def context():
+ global current_iden
+
+ old_iden = current_iden
+ current_iden = iden
+
+ try:
+ yield
+
+ finally:
+ current_iden = old_iden
+
+ return tornado.stack_context.StackContext(context)
+
+ def set_signkey(self,key):
+ self._signer = PKCS1_v1_5.new(RSA.importKey(key))
+
+ def set_verifykey(self,key):
+ self._verifier = PKCS1_v1_5.new(RSA.importKey(key))
+
+ def sign_iden(self,iden):
+ data = json.dumps(iden)
+ sign = binascii.hexlify(self._sign(bytes(data,'utf-8'))).decode('utf-8')
+
+ return json.dumps([data,sign])
+
+ def get_iden(self,idendesc):
+ pair = json.loads(idendesc)
+ data = pair[0]
+ sign = pair[1]
+
+ if self._verify(bytes(data,'utf-8'),binascii.unhexlify(sign)):
+ return json.loads(data)
+
+ else:
+ return None
+
+ def _sign(self,data):
+ return self._signer.sign(SHA512.new(data))
+
+ def _verify(self,data,sig):
+ h = SHA512.new(data)
+ if h in self._cache_hashmap:
+ return True
+
+ if self._verifier.verify(h,sig) == True:
+ self._cache_hashmap[h] = True
+
+ return True
+ else:
+ return False
diff --git a/src/py/imc/nonblock.py b/src/py/imc/nonblock.py
new file mode 100644
index 0000000..f68673d
--- /dev/null
+++ b/src/py/imc/nonblock.py
@@ -0,0 +1,61 @@
+import types
+from greenlet import greenlet
+
+gr_waitmap = {}
+
+gr_main = greenlet.getcurrent()
+
+def current():
+ return greenlet.getcurrent()
+
+def switchtop():
+ global gr_main
+
+ return gr_main.switch(None)
+
+def callee(f):
+ def wrapper(*args,**kwargs):
+ kwargs['_grid'] = str(id(greenlet.getcurrent()))
+ return f(*args,**kwargs)
+
+ return wrapper
+
+def caller(f):
+ def wrapper(*args,**kwargs):
+ global gr_waitmap
+
+ try:
+ gr = greenlet(lambda *args,**kwargs : (str(id(greenlet.getcurrent())),f(*args,**kwargs)))
+ grid = str(id(gr))
+ gr_waitmap[grid] = gr
+
+ result = gr.switch(*args,**kwargs)
+ if result == None:
+ return (False,None)
+
+ ret_grid,ret = result
+ del gr_waitmap[grid]
+
+ if greenlet.getcurrent() == gr_main:
+ return (False,None)
+
+ else:
+ while ret_grid != grid:
+ ret_grid,ret = gr_main.switch()
+
+ return (True,ret)
+
+ except Exception:
+ return (False,'Einternal')
+
+ return wrapper
+
+def retcall(grid,value):
+ global gr_waitmap
+
+ try:
+ gr = gr_waitmap[grid]
+ gr.switch(value)
+
+ except Exception:
+ pass
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
new file mode 100755
index 0000000..00476f0
--- /dev/null
+++ b/src/py/imc/proxy.py
@@ -0,0 +1,559 @@
+import json
+import uuid
+import os
+import datetime
+import ssl
+
+from Crypto.Hash import SHA512
+import tornado.ioloop
+import tornado.stack_context
+
+from imc import async
+from imc.auth import Auth
+
+class Connection:
+ def __init__(self,linkclass,linkid):
+ self.linkclass = linkclass
+ self.linkid = linkid
+ self.link_linkidmap = {}
+ self._close_callback = []
+ self._closed = False
+
+ def send_msg(self,data):
+ pass
+
+ def send_file(self,filekey,filepath,callback):
+ pass
+
+ def recv_file(self,filekey,filesize,filepath,callback):
+ pass
+
+ def send_filedata(self,filekey,filesize,callback):
+ pass
+
+ def recv_filedata(self,filekey,filesize,send_fn):
+ pass
+
+ def start_recv(self,recv_callback):
+ pass
+
+ def abort_file(self,filekey):
+ pass
+
+ def add_close_callback(self,callback):
+ self._close_callback.append(tornado.stack_context.wrap(callback))
+
+ def close(self):
+ self._closed = True
+
+ for callback in self._close_callback:
+ callback(self)
+
+ def closed(self):
+ return self._closed
+
+class FileResult():
+ def __init__(self,filekey):
+ self.filekey = filekey
+ self._retid = None
+ self._result = None
+
+ def ret_result(self,res):
+ if self._result != None:
+ return
+
+ self._result = res
+ if self._retid != None:
+ async.ret(self._retid)
+
+ def wait(self):
+ if self._result == None:
+ self._retid = async.get_retid()
+ async.switch_top()
+
+ return self._result
+
+class Proxy:
+ def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None):
+ self.MSGTYPE_CALL = 'call'
+ self.MSGTYPE_RET = 'ret'
+ self.MSGTYPE_SENDFILE = 'sendfile'
+ self.MSGTYPE_ABORTFILE = 'abortfile'
+
+ self._ioloop = tornado.ioloop.IOLoop.instance()
+ self._linkclass = linkclass
+ self._linkid = linkid
+ self._auth = auth_instance
+ self._idendesc = idendesc
+
+ if conn_linkid_fn == None:
+ self._conn_linkid_fn = lambda : None
+ else:
+ self._conn_linkid_fn = conn_linkid_fn
+
+ self._conn_linkidmap = {}
+ self._conn_retidmap = {self._linkid:{}}
+ self._conn_filekeymap = {self._linkid:{}}
+ self._call_pathmap = {}
+
+ self._info_filekeymap = {}
+
+ Proxy.instance = self
+
+ self.register_call('imc/','pend_recvfile',self._pend_recvfile)
+ self.register_call('imc/','reject_sendfile',self._reject_sendfile)
+
+ def add_conn(self,conn):
+ assert conn.linkid not in self._conn_linkidmap
+
+ self._conn_linkidmap[conn.linkid] = conn
+ self._conn_retidmap[conn.linkid] = {}
+ self._conn_filekeymap[conn.linkid] = {}
+
+ conn.add_close_callback(self._conn_close_cb)
+ conn.start_recv(self._recv_dispatch)
+
+ def link_conn(self,linkid,conn):
+ assert conn.linkid in self._conn_linkidmap
+
+ conn.link_linkidmap[linkid] = True
+ self._conn_linkidmap[linkid] = conn
+
+ def unlink_conn(self,linkid):
+ assert linkid in self._conn_linkidmap
+
+ conn = self._conn_linkidmap.pop(linkid)
+ del conn.link_linkidmap[linkid]
+
+ def del_conn(self,conn):
+ waits = list(self._conn_retidmap[conn.linkid].values())
+ for wait in waits:
+ wait['callback']((False,'Eclose'))
+
+ waits = list(self._conn_filekeymap[conn.linkid].values())
+ for wait in waits:
+ wait['callback']('Eclose')
+
+ linkids = list(conn.link_linkidmap.keys())
+ for linkid in linkids:
+ self.unlink_conn(linkid)
+
+ del self._conn_linkidmap[conn.linkid]
+ del self._conn_retidmap[conn.linkid]
+ del self._conn_filekeymap[conn.linkid]
+
+ def get_conn(self,linkid):
+ try:
+ return self._conn_linkidmap[linkid]
+
+ except KeyError:
+ return None
+
+ def register_call(self,path,func_name,func):
+ self._call_pathmap[''.join([path,func_name])] = func
+
+ def call(self,idendesc,dst,func_name,timeout,*args):
+ return self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args))
+
+ def call_async(self,idendesc,dst,func_name,timeout,callback,*args):
+ @async.caller
+ def _call():
+ result = self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args))
+ if callback != None:
+ callback(result)
+
+ self._ioloop.add_callback(tornado.stack_context.wrap(_call))
+
+ def sendfile(self,dst_link,filepath):
+ filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
+ filesize = os.stat(filepath).st_size
+
+ fileresult = FileResult(filekey)
+
+ self._info_filekeymap[filekey] = {
+ 'filesize':filesize,
+ 'filepath':filepath,
+ 'fileresult':fileresult,
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
+ }
+
+ stat,ret = self.call(self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize},655360)
+ if stat == False:
+ raise ConnectionError(ret)
+
+ return fileresult
+
+ def recvfile(self,filekey,filepath):
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(in_conn.linkid,filekey)
+
+ except KeyError:
+ return
+
+ if err != None:
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
+
+ self._ioloop.add_callback(self._ret_sendfile,filekey,err)
+
+ try:
+ info = self._info_filekeymap[filekey]
+
+ except KeyError:
+ return
+
+ src_linkid = info['src_linkid']
+ filesize = info['filesize']
+
+ in_conn = self._request_conn(src_linkid)
+ self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback)
+
+ in_conn.recv_file(filekey,filesize,filepath,_callback)
+ self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+
+ return info['fileresult']
+
+ def rejectfile(self,filekey):
+ try:
+ info = self._info_filekeymap.pop(filekey)
+
+ except KeyError:
+ return
+
+ dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/'])
+ self.call(self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey},65536)
+
+ def _route_call(self,in_conn,caller_retid,idendesc,dst,func_name,timeout,param):
+ def __add_wait_caller(conn_linkid):
+ callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result))
+ self._conn_retidmap[conn_linkid][caller_retid] = {
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))),
+ 'callback':callback
+ }
+
+ def __del_wait_caller(conn_linkid):
+ wait = self._conn_retidmap[conn_linkid].pop(caller_retid)
+ self._ioloop.remove_timeout(wait['timer'])
+
+ def __ret(result):
+ if caller_linkid == self._linkid:
+ return result
+
+ else:
+ conn = self._request_conn(caller_linkid)
+ if conn != None:
+ self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+
+ if in_conn != None:
+ in_linkclass = in_conn.linkclass
+ in_linkid = in_conn.linkid
+
+ else:
+ in_linkclass = self._linkclass
+ in_linkid = self._linkid
+
+ iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc)
+ if iden == None:
+ return __ret(False,'Eilliden')
+
+ try:
+ dst_part = dst.split('/',3)
+ dst_linkid = dst_part[2]
+ dst_path = dst_part[3]
+
+ except Exception:
+ return __ret(False,'Enoexist')
+
+ caller_linkid = iden['linkid']
+
+ if dst_linkid == self._linkid:
+ __add_wait_caller(self._linkid)
+
+ try:
+ with Auth.change_current_iden(iden):
+ result = self._call_pathmap[''.join([dst_path,func_name])](*param)
+
+ except KeyError:
+ result = (False,'Enoexist')
+
+ __del_wait_caller(self._linkid)
+
+ return __ret(result)
+
+ else:
+ conn = self._request_conn(dst_linkid)
+ if conn == None:
+ return __ret((False,'Enoexist'))
+
+ else:
+ if caller_linkid == self._linkid:
+ __add_wait_caller(conn.linkid)
+ self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+
+ result = async.switch_top()
+
+ __del_wait_caller(conn.linkid)
+
+ return __ret(result)
+
+ else:
+ self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+
+ return
+
+ def _ret_call(self,caller_linkid,caller_retid,result):
+ @async.caller
+ def __ret_remote():
+ conn = self._request_conn(caller_linkid)
+ if conn != None:
+ self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+
+ if caller_linkid == self._linkid:
+ async.ret(caller_retid,result)
+
+ else:
+ __ret_remote()
+
+ def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
+ def __send_cb(err = None):
+ try:
+ self._del_wait_filekey(out_conn.linkid,filekey)
+
+ except KeyError:
+ return
+
+ if err != None:
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
+
+ self._ioloop.add_callback(self._ret_sendfile,filekey,err)
+
+ def __bridge_cb(err = None):
+ try:
+ self._del_wait_filekey(in_conn,filekey)
+
+ if err != None:
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
+
+ except KeyError:
+ pass
+
+ try:
+ self._del_wait_filekey(out_conn,filekey)
+
+ if err != None:
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
+
+ except KeyError:
+ pass
+
+ if src_linkid == self._linkid:
+ try:
+ info = self._info_filekeymap[filekey]
+ if info['filesize'] != filesize:
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize')
+
+ except KeyError:
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
+ return
+
+ self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb)
+ out_conn.send_file(filekey,info['filepath'],__send_cb)
+
+ else:
+ in_conn = self._request_conn(src_linkid)
+ self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb)
+ self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb)
+
+ send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
+ in_conn.recv_filedata(filekey,filesize,send_fn)
+
+ self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+
+ def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback):
+ callback = tornado.stack_context.wrap(callback)
+ self._conn_filekeymap[conn_linkid][filekey] = {
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = min(filesize,1000)),lambda : callback('Etimeout')),
+ 'callback':callback
+ }
+
+ def _del_wait_filekey(self,conn_linkid,filekey):
+ wait = self._conn_filekeymap[conn_linkid].pop(filekey)
+ self._ioloop.remove_timeout(wait['timer'])
+
+ def _ret_sendfile(self,filekey,err = None):
+ try:
+ info = self._info_filekeymap.pop(filekey)
+
+ except KeyError:
+ return
+
+ self._ioloop.remove_timeout(info['timer'])
+
+ fileresult = info['fileresult']
+ if err == None:
+ fileresult.ret_result('Success')
+
+ else:
+ fileresult.ret_result(err)
+
+ def _request_conn(self,linkid):
+ try:
+ return self._conn_linkidmap[linkid]
+
+ except KeyError:
+ conn = self._conn_linkid_fn(linkid)
+
+ if conn != None and conn.linkid != linkid:
+ self.link_conn(linkid,conn)
+
+ return conn
+
+ def _conn_close_cb(self,conn):
+ self.del_conn(conn)
+ print('connection close')
+
+ def _recv_dispatch(self,conn,data):
+ try:
+ msg = json.loads(data.decode('utf-8'))
+
+ except:
+ return
+
+ msg_type = msg['type']
+
+ if msg_type == self.MSGTYPE_CALL:
+ self._recv_msg_call(conn,msg)
+
+ elif msg_type == self.MSGTYPE_RET:
+ self._recv_msg_ret(conn,msg)
+
+ elif msg_type == self.MSGTYPE_SENDFILE:
+ self._recv_msg_sendfile(conn,msg)
+
+ elif msg_type == self.MSGTYPE_ABORTFILE:
+ self._recv_msg_abortfile(conn,msg)
+
+ def _send_msg_call(self,conn,caller_retid,idendesc,dst,func_name,timeout,param):
+ msg = {
+ 'type':self.MSGTYPE_CALL,
+ 'caller_retid':caller_retid,
+ 'idendesc':idendesc,
+ 'dst':dst,
+ 'func_name':func_name,
+ 'timeout':timeout,
+ 'param':param
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_call(self,conn,msg):
+ @async.caller
+ def __call():
+ self._route_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+
+ caller_retid = msg['caller_retid']
+ idendesc = msg['idendesc']
+ dst = msg['dst']
+ func_name = msg['func_name']
+ timeout = msg['timeout']
+ param = msg['param']
+
+ __call()
+
+ def _send_msg_ret(self,conn,caller_linkid,caller_retid,result):
+ stat,data = result
+ msg = {
+ 'type':self.MSGTYPE_RET,
+ 'caller_linkid':caller_linkid,
+ 'caller_retid':caller_retid,
+ 'result':{'stat':stat,'data':data}
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_ret(self,conn,msg):
+ caller_linkid = msg['caller_linkid']
+ caller_retid = msg['caller_retid']
+ data = msg['result']
+ result = (data['stat'],data['data'])
+
+ self._ret_call(caller_linkid,caller_retid,result)
+
+ def _send_msg_sendfile(self,conn,src_linkid,filekey,filesize):
+ msg = {
+ 'type':self.MSGTYPE_SENDFILE,
+ 'src_linkid':src_linkid,
+ 'filekey':filekey,
+ 'filesize':filesize
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_sendfile(self,conn,msg):
+ @async.caller
+ def __call():
+ self._route_sendfile(conn,src_linkid,filekey,filesize)
+
+ src_linkid = msg['src_linkid']
+ filekey = msg['filekey']
+ filesize = msg['filesize']
+
+ __call()
+
+ def _send_msg_abortfile(self,conn,filekey,err):
+ msg = {
+ 'type':self.MSGTYPE_ABORTFILE,
+ 'filekey':filekey,
+ 'error':err
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_abortfile(self,conn,msg):
+ @async.caller
+ def __call():
+ try:
+ self._conn_filekeymap[conn.linkid][filekey]['callback'](err)
+
+ except KeyError:
+ pass
+
+ filekey = msg['filekey']
+ err = msg['error']
+
+ __call()
+
+ @async.caller
+ def _pend_recvfile(self,iden,param):
+ filekey = param['filekey']
+ filesize = param['filesize']
+
+ self._info_filekeymap[filekey] = {
+ 'src_linkclass':iden['linkclass'],
+ 'src_linkid':iden['linkid'],
+ 'filesize':filesize,
+ 'fileresult':FileResult(filekey),
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
+ }
+
+ @async.caller
+ def _reject_sendfile(self,iden,param):
+ filekey = param['filekey']
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject')
+
+def imc_call(idendesc,dst,func_name,*args):
+ return Proxy.instance.call(idendesc,dst,func_name,65536,*args)
+
+def imc_call_async(idendesc,dst,func_name,callback,*args):
+ Proxy.instance.call_async(idendesc,dst,func_name,65536,callback,*args)
+
+def imc_register_call(path,func_name,func):
+ Proxy.instance.register_call(path,func_name,func)
diff --git a/src/py/netio.py b/src/py/netio.py
new file mode 100644
index 0000000..b71ab0b
--- /dev/null
+++ b/src/py/netio.py
@@ -0,0 +1,511 @@
+import os
+import traceback
+import json
+import struct
+import socket
+from collections import deque
+
+import tornado.ioloop
+import tornado.stack_context
+
+import imc.async
+from imc.proxy import Connection
+
+def send_pack(stream,data):
+ stream.write(struct.pack('l',len(data)) + data)
+
+def recv_pack(stream,callback):
+ def _recv_size(data):
+ size, = struct.unpack('l',data)
+ stream.read_bytes(size,lambda data : callback(data))
+
+ stream.read_bytes(8,_recv_size)
+
+class SocketStream:
+ def __init__(self,sock):
+ self.DATA_BUF = 0
+ self.DATA_NOBUF = 1
+ self.DATA_FILE = 2
+
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self._sock = sock
+
+ self._conning = False
+ self._closed = False
+ self._conn_callback = None
+ self._close_callback = None
+
+ self._read_queue = deque()
+ self._write_queue = deque()
+ self._stat = tornado.ioloop.IOLoop.ERROR
+
+ self._sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1)
+ self._sock.setblocking(False)
+ self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR)
+
+ def connect(self,addr,callback):
+ if self._closed == True:
+ raise ConnectionError
+
+ try:
+ self._conn_callback = tornado.stack_context.wrap(callback)
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ self._conning = True
+ self._sock.connect(addr)
+
+ except BlockingIOError:
+ pass
+
+ def read_bytes(self,size,callback = None,nonbuf = False):
+ if self._closed == True:
+ raise ConnectionError
+
+ if nonbuf == False:
+ self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)])
+
+ else:
+ self._read_queue.append([self.DATA_NOBUF,size,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.READ
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def write(self,buf,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
+ self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def sendfile(self,fd,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
+ size = os.fstat(fd).st_size
+
+ self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.WRITE
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def recvfile(self,fd,size,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
+ self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
+
+ self._stat |= tornado.ioloop.IOLoop.READ
+ self._ioloop.update_handler(self._sock.fileno(),self._stat)
+
+ def set_close_callback(self,callback):
+ if callback == None:
+ self._close_callback = None
+
+ else:
+ self._close_callback = tornado.stack_context.wrap(callback)
+
+ def close(self):
+ if self._closed == True:
+ return
+
+ self._closed = True
+ self._ioloop.remove_handler(self._sock.fileno())
+ self._sock.close()
+
+ if self._close_callback != None:
+ self._close_callback(self)
+
+ def _handle_event(self,fd,evt):
+ if evt & tornado.ioloop.IOLoop.ERROR:
+ print(os.strerror(self._sock.getsockopt(socket.SOL_SOCKET,socket.SO_ERROR)))
+ self.close()
+ return
+
+ if evt & tornado.ioloop.IOLoop.READ:
+ while len(self._read_queue) > 0:
+ iocb = self._read_queue[0]
+ datatype = iocb[0]
+
+ if datatype == self.DATA_BUF:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(size)
+ if len(buf) == 0:
+ self.close()
+ return
+
+ iocb[2].extend(buf)
+ size -= len(buf)
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3](iocb[2])
+
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ except Exception:
+ self.close()
+ return
+
+ elif datatype == self.DATA_NOBUF:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(size)
+ if len(buf) == 0:
+ self.close()
+ return
+
+ iocb[2](buf)
+ size -= len(buf)
+
+ if size == 0:
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ except Exception:
+ self.close()
+ return
+
+ elif datatype == self.DATA_FILE:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(min(size,65536))
+ if len(buf) == 0:
+ self.close()
+ return
+
+ os.write(iocb[2],buf)
+ size -= len(buf)
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ except Exception:
+ self.close()
+ return
+
+ if evt & tornado.ioloop.IOLoop.WRITE:
+ if self._conning == True:
+ self._conning = False
+
+ if self._conn_callback != None:
+ self._conn_callback()
+
+ while len(self._write_queue) > 0:
+ iocb = self._write_queue[0]
+ datatype = iocb[0]
+
+ if datatype == self.DATA_BUF:
+ off = iocb[1]
+ buf = iocb[2]
+
+ try:
+ while True:
+ ret = self._sock.send(buf[off:])
+ if ret == 0:
+ self.close()
+ return
+
+ off += ret
+
+ if off == len(buf):
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._write_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = off
+ break
+
+ except Exception:
+ self.close()
+ return
+
+ elif datatype == self.DATA_FILE:
+ size = iocb[1]
+ filefd = iocb[2]
+ sockfd = self._sock.fileno()
+
+ try:
+ while True:
+ ret = os.sendfile(sockfd,filefd,None,min(size,65536))
+ if ret == 0:
+ self.close()
+ return
+
+ size -= ret
+
+ if size == 0:
+ if iocb[3] != None:
+ iocb[3]()
+
+ self._write_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
+ except Exception:
+ self.close()
+ return
+
+ if self._closed == True:
+ return
+
+ stat = tornado.ioloop.IOLoop.ERROR
+ if len(self._read_queue) > 0:
+ stat |= tornado.ioloop.IOLoop.READ
+
+ if len(self._write_queue) > 0:
+ stat |= tornado.ioloop.IOLoop.WRITE
+
+ if stat != self._stat:
+ self._stat = stat
+ self._ioloop.update_handler(fd,stat)
+
+class SocketConnection(Connection):
+ def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None):
+ super().__init__(linkclass,linkid)
+
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self._sendfile_filekeymap = {}
+
+ self.main_stream = main_stream
+ self.main_stream.set_close_callback(lambda conn : self.close())
+ self.file_addr = file_addr
+ self.add_pend_filestream = add_pend_filestream_fn
+
+ def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
+ self.main_stream.write(struct.pack('l',len(data)) + data)
+
+ def send_file(self,filekey,filepath,callback):
+ def _conn_cb():
+ self._add_wait_filekey(filekey,_callback)
+
+ send_pack(file_stream,bytes(json.dumps({
+ 'conntype':'file',
+ 'filekey':filekey
+ }),'utf-8'))
+
+ file_stream.sendfile(fd,_callback)
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
+ os.close(fd)
+
+ callback(err)
+
+ if self._closed == True:
+ raise ConnectionError
+
+ fd = os.open(filepath,os.O_RDONLY)
+ filesize = os.fstat(fd).st_size
+
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
+
+ def recv_file(self,filekey,filesize,filepath,callback):
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
+
+ file_stream.recvfile(fd,filesize,_callback)
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
+ os.close(fd)
+
+ if err != None:
+ try:
+ os.remove(filepath)
+
+ except FileNotFoundError:
+ pass
+
+ callback(err)
+
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
+ self.add_pend_filestream(filekey,_conn_cb)
+ fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
+
+ def send_filedata(self,filekey,filesize,callback):
+ def _conn_cb():
+ self._add_wait_filekey(filekey,_callback)
+
+ send_pack(file_stream,bytes(json.dumps({
+ 'conntype':'file',
+ 'filekey':filekey
+ }),'utf-8'))
+
+ imc.async.ret(retid)
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
+ callback(err)
+
+ def _send_cb(data):
+ def __done_cb():
+ nonlocal filesize
+
+ filesize -= len(data)
+ if filesize == 0:
+ _callback()
+
+ file_stream.write(data,__done_cb)
+
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+
+ retid = imc.async.get_retid()
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
+ imc.async.switch_top()
+
+ return _send_cb
+
+ def recv_filedata(self,filekey,filesize,send_fn):
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
+
+ file_stream.read_bytes(filesize,send_fn,nonbuf = True)
+
+ def _callback(err = None):
+ file_stream.close()
+
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
+ self.add_pend_filestream(filekey,_conn_cb)
+
+ def abort_file(self,filekey):
+ try:
+ self._sendfile_filekeymap[filekey]('Eabort')
+
+ except KeyError:
+ pass
+
+ def start_recv(self,recv_callback):
+ def _recv_size(data):
+ size, = struct.unpack('l',data)
+ self.main_stream.read_bytes(size,_recv_data)
+ self.main_stream.read_bytes(8,_recv_size)
+
+ def _recv_data(data):
+ self._recv_callback(self,data)
+
+ self._recv_callback = tornado.stack_context.wrap(recv_callback)
+ self.main_stream.read_bytes(8,_recv_size)
+
+ def close(self):
+ if self._closed == True:
+ return
+
+ traceback.print_stack()
+
+ self._closed = True
+ self.main_stream.close()
+
+ callbacks = list(self._sendfile_filekeymap.values())
+ for callback in callbacks:
+ callback('Eclose')
+
+ super().close()
+
+ def _add_wait_filekey(self,filekey,fail_cb):
+ self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+
+ def _del_wait_filekey(self,filekey):
+ del self._sendfile_filekeymap[filekey]
+
+class WebSocketConnection(Connection):
+ def __init__(self,linkclass,linkid,handler):
+ super().__init__(linkclass,linkid)
+
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self.handler = handler
+
+ def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
+ self.handler.write_message(data,True)
+
+ def recv_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
+ self._recv_callback(self,data)
+
+ def start_recv(self,recv_callback):
+ self._recv_callback = tornado.stack_context.wrap(recv_callback)
diff --git a/src/py/tojauth.py b/src/py/tojauth.py
new file mode 100644
index 0000000..701095b
--- /dev/null
+++ b/src/py/tojauth.py
@@ -0,0 +1,179 @@
+from imc.auth import Auth
+import config
+from asyncdb import AsyncDB
+
+class TOJAuth(Auth):
+ ACCESS_READ = 0x1
+ ACCESS_WRITE = 0x2
+ ACCESS_CREATE = 0x4
+ ACCESS_DELETE = 0x8
+ ACCESS_SETPER = 0x10
+ ACCESS_EXECUTE = 0x20
+
+ ROLETYPE_USER = 1
+ ROLETYPE_3RD = 2
+ ROLETYPE_MOD = 3
+ ROLETYPE_TOJ = 4
+ ROLETYPE_GROUP = 5
+ ROLETYPE_GUEST = 6
+
+ auth_accessid = 1
+
+ def __init__(self, pubkey, privkey = None):
+ super().__init__()
+
+ self.set_verifykey(pubkey)
+ if privkey != None:
+ self.set_signkey(privkey)
+
+ TOJAuth.instance = self
+ TOJAuth.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER,
+ config.CORE_DBPASSWORD)
+
+ def create_iden(self, linkclass, linkid, idenid, roletype, payload = {}):
+ iden = payload
+ iden.update({
+ 'linkclass' : linkclass,
+ 'linkid' : linkid,
+ 'idenid' : idenid,
+ 'roletype' : roletype
+ })
+
+ return self.sign_iden(iden)
+
+ def get_iden(self, conn_linkclass, conn_linkid, idendesc):
+ iden = super().get_iden(idendesc)
+ if iden == None:
+ return None
+
+ if conn_linkclass == 'client' and conn_linkid != iden['linkid']:
+ return None
+
+ return iden
+
+ @staticmethod
+ def check_access(accessid, access_mask):
+ def wrapper(f):
+ def wrapfunc(*args):
+ idenid = TOJAuth.get_current_iden()['idenid']
+ ok = False
+
+ cur = TOJAuth.instance.db.cursor()
+
+ if not ok:
+ sqlstr = ('SELECT "owner_idenid" FROM "ACCESS" WHERE '
+ '"accessid"=%s;')
+ sqlarr = (accessid, )
+ cur.execute(sqlstr, sqlarr)
+ for data in cur:
+ owner_idenid = data[0]
+ if owner_idenid == idenid:
+ ok = True
+
+ if not ok:
+ sqlstr = ('SELECT "ACCESS_ROLE"."permission" FROM "ACCESS_ROLE"'
+ ' INNER JOIN "IDEN_ROLE" ON "ACCESS_ROLE"."roleid" = '
+ '"IDEN_ROLE"."roleid" WHERE "ACCESS_ROLE"."accessid"=%s'
+ ' AND "IDEN_ROLE"."idenid"=%s;')
+ sqlarr = (accessid, idenid)
+ cur.execute(sqlstr, sqlarr)
+
+ for data in cur:
+ permission = data[0]
+ if (permission & access_mask) == access_mask:
+ ok = True
+ break
+
+ if ok:
+ return f(*args);
+ else:
+ raise Exception('TOJAuth.check_access() : PERMISSION DENIED')
+
+ return wrapfunc
+
+ return wrapper
+
+ def create_access(self, owner_idenid):
+ self.check_access(
+ self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('INSERT INTO "ACCESS" ("owner_idenid") VALUES (%s) '
+ 'RETURNING "accessid";')
+ sqlarr = (owner_idenid, )
+ cur.execute(sqlstr, sqlarr)
+
+ for data in cur:
+ accessid = data[0]
+ return accessid
+
+ def set_access_list(self, accessid, roleid, permission):
+ self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ table = 'ACCESS_ROLE'
+ cond = {
+ 'accessid' : accessid,
+ 'roleid' : roleid
+ }
+ value = {
+ 'permission' : permission
+ }
+ cur.upsert(table, cond, value)
+
+ def del_access_list(self, accessid, roleid):
+ self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('DELETE FROM "ACCESS_ROLE" WHERE "accessid"=%s '
+ 'AND "roleid"=%s;')
+ sqlarr = (accessid, roleid)
+ cur.execute(sqlstr, sqlarr)
+
+ def create_role(self, rolename, roletype):
+ self.check_access(
+ self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('INSERT INTO "ROLE" ("rolename", "roletype") VALUES (%s, %s)'
+ ' RETURNING "roleid";')
+ sqlarr = (rolename, roletype)
+ cur.execute(sqlstr, sqlarr)
+ for data in cur:
+ roleid = data[0]
+
+ if(roleid != None):
+ self.set_role_relation(roleid, roleid)
+
+ return roleid
+
+ def set_role_relation(self, idenid, roleid):
+ self.check_access(
+ self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ table = 'IDEN_ROLE'
+ cond = {
+ 'idenid' : idenid,
+ 'roleid' : roleid
+ }
+ cur.upsert(table, cond)
+
+ def del_role_relation(self, idenid, roleid):
+ self.check_access(
+ self.auth_accessid, self.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('DELETE FROM "IDEN_ROLE" WHERE "idenid"=%s '
+ 'AND "roleid"=%s;')
+ sqlarr = (idenid, roleid)
+ cur.execute(sqlstr, sqlarr)
+
+ def set_owner(self, idenid, accessid):
+ self.check_access(accessid, self.ACCESS_SETPER)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('UPDATE "ACCESS" SET "owner_idenid"=%s WHERE "accessid"=%s;')
+ sqlarr = (idenid, accessid)
+ cur.execute(sqlstr, sqlarr)
+
diff --git a/src/py/user.py b/src/py/user.py
new file mode 100755
index 0000000..45e195a
--- /dev/null
+++ b/src/py/user.py
@@ -0,0 +1,336 @@
+import psycopg2
+from Crypto.Hash import SHA512
+
+from tojauth import TOJAuth
+from asyncdb import AsyncDB
+import imc.proxy
+import config
+
+class User:
+ auth_accessid = 2
+
+ USERNAME_LEN_MIN = 5
+ USERNAME_LEN_MAX = 50
+ PASSWORD_LEN_MIN = 5
+ PASSWORD_LEN_MAX = 50
+ NICKNAME_LEN_MIN = 1
+ NICKNAME_LEN_MAX = 50
+ EMAIL_LEN_MIN = 5
+ EMAIL_LEN_MAX = 100
+ AVATAR_LEN_MIN = 0
+ AVATAR_LEN_MAX = 200
+ ABOUTME_LEN_MIN = 0
+ ABOUTME_LEN_MAX = 1000
+
+ def __init__(self, mod_iden):
+ User.instance = self
+ User.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER,
+ config.CORE_DBPASSWORD)
+ User.mod_iden = mod_iden
+
+ @imc.async.caller
+ def register(self, username, password, nickname, email, avatar, aboutme):
+ if(
+ type(username) != str or
+ type(password) != str or
+ type(nickname) != str or
+ type(email) != str or
+ type(avatar) != str or
+ type(aboutme) != str
+ ):
+ return 'Eparameter'
+
+ if len(username) < self.USERNAME_LEN_MIN:
+ return 'Eusername_too_short'
+ elif len(username) > self.USERNAME_LEN_MAX:
+ return 'Eusername_too_long'
+ elif len(password) < self.PASSWORD_LEN_MIN:
+ return 'Epassword_too_short'
+ elif len(password) > self.PASSWORD_LEN_MAX:
+ return 'Epassword_too_long'
+ elif len(nickname) < self.NICKNAME_LEN_MIN:
+ return 'Enickname_too_short'
+ elif len(nickname) > self.NICKNAME_LEN_MAX:
+ return 'Enickname_too_long'
+ elif len(email) < self.EMAIL_LEN_MIN:
+ return 'Eemail_too_short'
+ elif len(email) > self.EMAIL_LEN_MAX:
+ return 'Eemail_too_long'
+ elif len(avatar) < self.AVATAR_LEN_MIN:
+ return 'Eavatar_too_short'
+ elif len(avatar) > self.AVATAR_LEN_MAX:
+ return 'Eavatar_too_long'
+ elif len(aboutme) < self.ABOUTME_LEN_MIN:
+ return 'Eaboutme_too_short'
+ elif len(aboutme) > self.ABOUTME_LEN_MAX:
+ return 'Eaboutme_too_long'
+
+ passhash = self._password_hash(password)
+
+ with TOJAuth.change_current_iden(self.mod_iden):
+ try:
+ uid = self._create_user(
+ username, passhash, nickname, email, avatar, aboutme)
+ except psycopg2.IntegrityError:
+ return 'Eusername_already_exists'
+
+ return {'uid' : uid}
+
+ @TOJAuth.check_access(auth_accessid, TOJAuth.ACCESS_EXECUTE)
+ def _create_user(self, username, passhash, nickname, email, avatar,
+ aboutme):
+ roleid = TOJAuth.instance.create_role(username, TOJAuth.ROLETYPE_USER)
+
+ cur = self.db.cursor()
+ sqlstr = ('INSERT INTO "USER" ("username", "passhash", "nickname", '
+ '"email", "avatar", "aboutme", "idenid") '
+ 'VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING "uid";')
+ sqlarr = (username, passhash, nickname, email, avatar, aboutme, roleid)
+ cur.execute(sqlstr, sqlarr)
+
+ for data in cur:
+ uid = data[0]
+ return uid
+
+ @imc.async.caller
+ def login(self, username, password):
+ if(
+ type(username) != str or
+ type(password) != str
+ ):
+ return 'Eparameter'
+
+ uid = self.get_uid_by_username(username)
+ if uid == None:
+ return 'Eno_such_uid'
+
+ passhash = self._password_hash(password)
+
+ cur = self.db.cursor()
+ sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s '
+ 'AND "passhash" = %s;')
+ sqlarr = (uid, passhash)
+ cur.execute(sqlstr, sqlarr)
+
+ idenid = None
+ for data in cur:
+ idenid = data[0]
+
+ if idenid == None:
+ return 'Ewrong_password'
+
+ linkclass = TOJAuth.get_current_iden()['linkclass']
+ linkid = TOJAuth.get_current_iden()['linkid']
+
+ with TOJAuth.change_current_iden(self.mod_iden):
+ idendesc = TOJAuth.instance.create_iden(
+ linkclass, linkid, idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid}
+ )
+
+ ret = {
+ 'idendesc' : idendesc,
+ 'uid' : uid,
+ 'hash' : self._uid_passhash_hash(uid, passhash)
+ }
+
+ return ret
+
+ @imc.async.caller
+ def cookie_login(self, uid, uphash):
+ if(
+ type(uid) != int or
+ type(uphash) != str
+ ):
+ return 'Eparameter'
+
+ idenid = None
+ real_uphash = None
+
+ cur = self.db.cursor()
+ sqlstr = ('SELECT "idenid", "passhash" FROM "USER" WHERE "uid" = %s;')
+ sqlarr = (uid, )
+ cur.execute(sqlstr, sqlarr)
+
+ for data in cur:
+ idenid = data[0]
+ real_uphash = self._uid_passhash_hash(uid, data[1])
+
+ if idenid == None:
+ return 'Eno_such_uid'
+
+ if real_uphash != uphash:
+ return 'Ewrong_uphash'
+
+ linkclass = TOJAuth.get_current_iden()['linkclass']
+ linkid = TOJAuth.get_current_iden()['linkid']
+
+ with TOJAuth.change_current_iden(self.mod_iden):
+ idendesc = TOJAuth.instance.create_iden(
+ linkclass, linkid, idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid}
+ )
+
+ ret = {
+ 'idendesc' : idendesc,
+ 'uid' : uid,
+ 'hash' : uphash
+ }
+
+ return ret
+
+ @imc.async.caller
+ def get_user_info(self, uid):
+ if(
+ type(uid) != int
+ ):
+ return 'Eparameter'
+
+ ret = self._get_user_info_by_uid(uid)
+ if ret == None:
+ return 'Eno_such_uid'
+
+ return ret
+
+ @imc.async.caller
+ def set_user_info(self, uid, nickname, email, avatar, aboutme):
+ if(
+ type(uid) != int or
+ type(nickname) != str or
+ type(email) != str or
+ type(avatar) != str or
+ type(aboutme) != str
+ ):
+ return 'Eparameter'
+
+ if len(nickname) < self.NICKNAME_LEN_MIN:
+ return 'Enickname_too_short'
+ elif len(nickname) > self.NICKNAME_LEN_MAX:
+ return 'Enickname_too_long'
+ elif len(email) < self.EMAIL_LEN_MIN:
+ return 'Eemail_too_short'
+ elif len(email) > self.EMAIL_LEN_MAX:
+ return 'Eemail_too_long'
+ elif len(avatar) < self.AVATAR_LEN_MIN:
+ return 'Eavatar_too_short'
+ elif len(avatar) > self.AVATAR_LEN_MAX:
+ return 'Eavatar_too_long'
+ elif len(aboutme) < self.ABOUTME_LEN_MIN:
+ return 'Eaboutme_too_short'
+ elif len(aboutme) > self.ABOUTME_LEN_MAX:
+ return 'Eaboutme_too_long'
+
+ idenid = self.get_idenid_by_uid(uid)
+ if idenid == None:
+ return 'Eno_such_uid'
+
+ if idenid != TOJAuth.get_current_iden()['idenid']:
+ TOJAuth.check_access(
+ self.auth_accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ cur = self.db.cursor()
+ sqlstr = ('UPDATE "USER" SET "nickname" = %s, "email" = %s, '
+ '"avatar" = %s, "aboutme" = %s WHERE "uid" = %s;')
+ sqlarr = (nickname, email, avatar, aboutme, uid)
+ cur.execute(sqlstr, sqlarr)
+
+ @imc.async.caller
+ def change_user_password(self, uid, old_password, new_password):
+ if(
+ type(uid) != int or
+ type(old_password) != str or
+ type(new_password) != str
+ ):
+ return 'Eparameter'
+
+ if len(new_password) < self.PASSWORD_LEN_MIN:
+ return 'Epassword_too_short'
+ elif len(new_password) > self.PASSWORD_LEN_MAX:
+ return 'Epassword_too_long'
+
+ idenid = self.get_idenid_by_uid(uid)
+ if idenid == None:
+ return 'Eno_such_uid'
+
+ if idenid != TOJAuth.get_current_iden()['idenid']:
+ TOJAuth.check_access(
+ self.auth_accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0)
+
+ old_passhash = self._password_hash(old_password)
+
+ cur = self.db.cursor()
+ sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s '
+ 'AND "passhash" = %s;')
+ sqlarr = (uid, old_passhash)
+ cur.execute(sqlstr, sqlarr)
+
+ idenid = None
+ for data in cur:
+ idenid = data[0]
+
+ if idenid == None:
+ return 'Ewrong_old_password'
+
+ new_passhash = self._password_hash(new_password)
+
+ sqlstr = ('UPDATE "USER" SET "passhash" = %s WHERE "uid" = %s;')
+ sqlarr = (new_passhash, uid)
+ cur.execute(sqlstr, sqlarr)
+
+ @imc.async.caller
+ def oauth_login(self):
+ raise NotImplementedError
+
+ def _password_hash(self, password):
+ h = SHA512.new(bytes(password + config.USER_PASSHASH_SALT, 'utf-8'))
+ return h.hexdigest()
+
+ def _uid_passhash_hash(self, uid, passhash):
+ return self._password_hash(
+ 'GENGJIAN_WEISUO_KING^^' + str(uid) + '@E__E@' + passhash + 'Yo!')
+
+ def _get_user_info_by_uid(self, uid):
+ cur = self.db.cursor()
+ sqlstr = ('SELECT * FROM "USER" WHERE "uid" = %s;')
+ sqlarr = (uid, )
+ cur.execute(sqlstr, sqlarr)
+
+ ret = None
+ for data in cur:
+ ret = {}
+ ret['uid'] = data[0]
+ ret['username'] = data[1]
+ ret['nickname'] = data[3]
+ ret['email'] = data[4]
+ ret['avatar'] = data[5]
+ ret['aboutme'] = data[6]
+
+ return ret
+
+ def get_idenid_by_uid(self, uid):
+ cur = self.db.cursor()
+ sqlstr = ('SELECT "idenid" FROM "USER" WHERE "uid" = %s;')
+ sqlarr = (uid, )
+ cur.execute(sqlstr, sqlarr)
+
+ ret = None
+ for data in cur:
+ ret = data[0]
+
+ return ret
+
+ def get_uid_by_username(self, username):
+ cur = self.db.cursor()
+ sqlstr = ('SELECT "uid" FROM "USER" WHERE "username" = %s;')
+ sqlarr = (username, )
+ cur.execute(sqlstr, sqlarr)
+
+ uid = None
+ for data in cur:
+ uid = data[0]
+
+ return uid
+
+ def does_username_exist(self, username):
+ uid = self.get_uid_by_username(username)
+
+ return uid != None
+
diff --git a/src/test/wstest.css b/src/test/wstest.css
new file mode 100644
index 0000000..39b9fad
--- /dev/null
+++ b/src/test/wstest.css
@@ -0,0 +1,81 @@
+body{
+ overflow-y:scroll;
+}
+
+div.head ul.right_navbar{
+ margin-right:0px;
+ position:absolute;
+ top:0px;
+ left:auto;
+ right:0px;
+}
+div.head ul.right_navbar div.notice{
+ height:100%;
+ padding:5px 0px 5px 0px;
+}
+div.head ul.right_navbar div.notice > div.box{
+ width:30px;
+ padding:5px 0px 5px 0px;
+ background-color:#656765;
+ font-size:16px;
+ font-weight:bold;
+ text-align:center;
+ cursor:pointer;
+}
+div.head ul.right_navbar div:hover.notice > div.box{
+ color:white;
+}
+div.head ul.right_navbar div.notice_h > div.box{
+ background:#86C166;
+ color:white;
+}
+div.head ul.right_navbar div.menu{
+ width:80px;
+ padding:10px 0px 10px 0px;
+ text-align:center;
+ cursor:pointer;
+}
+
+div.panel_container{
+ width:0px;
+ position:absolute;
+ top:40px;
+ right:0px;
+ border-left:#DDD 1px solid;
+ overflow:hidden;
+
+ transition:width 200ms;
+}
+div.panel_container_a{
+ width:240px;
+}
+div.panel_container > div.menu_container{
+ width:240px;
+ display:none;
+}
+div.panel_container > div.menu_container > ul.menu > li > a{
+ height:60px;
+ font-weight:bold;
+ line-height:60px;
+}
+
+div.panel_container > div.notice_container{
+ width:240px;
+ display:none;
+}
+div.panel_container > div.notice_container > ul.notice > li > a > div{
+ height:60px;
+ color:black;
+}
+
+div.modal{
+ width:978px;
+ margin-left:-489px;
+}
+
+div.panel_box{
+ width:322px;
+ position:absolute;
+ top:0px;
+ right:0px;
+}
diff --git a/src/test/wstest.html b/src/test/wstest.html
new file mode 100644
index 0000000..a17a0dc
--- /dev/null
+++ b/src/test/wstest.html
@@ -0,0 +1,97 @@
+<!DOCTYPE HTML>
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
+
+<link href="/bootstrap/css/bootstrap.min.css" rel="stylesheet">
+<link href="/bootstrap/css/bootstrap-responsive.min.css" rel="stylesheet">
+
+<link href="/wstest.css" rel="stylesheet">
+
+<script src="/jquery-2.0.0.min.js"></script>
+<script src="/bootstrap/js/bootstrap.min.js"></script>
+<script type="text/javascript" src="/imc.js"></script>
+<script type="text/javascript" src="/wstest.js"></script>
+
+<script type="text/javascript">
+
+$(document).ready(function(){
+ conn_backend();
+
+ index.init();
+});
+
+</script>
+</head>
+<body>
+
+<div id="index_head" class="navbar navbar-inverse navbar-fixed-top head">
+ <div class="navbar-inner">
+ <div class="container">
+ <a class="brand span2" href="#">Taiwan OJ</a>
+ <ul class="nav bar">
+ <li class="active"><a href="#">Profile</a></li>
+ <li><a href="#">Edit Profile</a></li>
+ </ul>
+ <p class="navbar-text span3 title">Taiwan Online Judge Test Page</p>
+ </div>
+
+ <ul id="index_head_rightnavbar" class="nav bar right_navbar">
+ <li><a id="index_head_nickname" href="#">Elisha</a></li>
+ <li><div id="index_head_notice" class="notice notice_h"><div class="box">2</div></div></li>
+ <li><div id="index_head_menu" class="menu"><i class="icon-chevron-left icon-white"></i></div></li>
+ </ul>
+ </div>
+</div>
+
+<div id="index_panel" class="panel_container">
+ <div class="menu_container">
+ <ul id="index_panel_menu" class="nav nav-list menu">
+ <li><a href="#"><i class="icon-home"></i> Home</a></li>
+ <li class="active"><a href="#"><i class="icon-exclamation-sign"></i> Test</a></li>
+ <li><a href="#"><i class="icon-th-large"></i> Square</a></li>
+ <ul class="nav nav-list sq_list">
+ <li class="nav-header">Coming</li>
+ <li><a href="#">Test1</a></li>
+ <li><a href="#">Test2</a></li>
+ <li class="nav-header">Running</li>
+ <li><a href="#">Normal Test</a></li>
+ <li class="nav-header">Ended</li>
+ <li><a href="#">End Test</a></li>
+ </ul>
+
+ <li style="margin-top:30px;"><a href="#"><i class="icon-circle-arrow-left"></i> Logout</a></li>
+ </ul>
+ </div>
+
+ <div class="notice_container">
+ <ul id="index_panel_menu" class="nav nav-list notice">
+ <li><a href="#"><div><h5>Problem:573</h5>Wrong Answer</div></a></li>
+ <li><a href="#"><div><h5>Problem:573</h5>Runtime Error</div></a></li>
+ </ul>
+ </div>
+</div>
+
+<div class="container" style="padding:60px 0px 32px 0px;">
+ <div class="row">
+ <div class="span2 slide">
+
+ </div>
+ <div class="span10 main">
+ <button class="btn btn-primary" onclick="$('div.test_modal').modal('show');">Test Dialog</button>
+ <div class="modal hide fade test_modal" tabindex="-1" role="dialog" aria-hidden="true">
+ <div class="modal-header">
+ <button type="button" class="close" data-dismiss="modal" aria-hidden="true">×</button>
+ <h3>Test Dialog</h3>
+ </div>
+ <div class="modal-body">
+ Hello, my name is Elisha.
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
+
+</body>
+</html>
diff --git a/src/test/wstest.js b/src/test/wstest.js
new file mode 100644
index 0000000..54cead6
--- /dev/null
+++ b/src/test/wstest.js
@@ -0,0 +1,178 @@
+'use strict'
+
+var count = 0;
+var last = 0;
+var data = new ArrayBuffer(1024);
+
+var linkid = null;
+var idendesc = null;
+
+function test_display(iden,param,callback){
+ imc_call(idendesc,'/center/1/','test_dst','',function(result){
+ console.log(result);
+ });
+}
+
+var WebSocketConnection = function(linkid,ws){
+ var that = this;
+ var reader = new FileReader;
+
+ that.__super__(linkid);
+
+ that.send_msg = function(data){
+ ws.send(new Blob([data],{'type':'application/octet-stream'}))
+ };
+ that.start_recv = function(recv_callback){
+ ws.onmessage = function(e){
+ reader.onload = function(e){
+ recv_callback(that,e.target.result);
+ };
+ reader.readAsText(e.data);
+ }
+ };
+
+ ws.onclose = function(e){
+ console.log('close');
+ that.close();
+
+ setTimeout(conn_backend,5000);
+ };
+};__extend__(WebSocketConnection,imc.Connection);
+
+function conn_backend(ip,port){
+ $.post('http://toj.tfcis.org:83/conn',{},function(res){
+ var reto;
+ var iden;
+ var linkid;
+ var ws;
+
+ if(res[0] != 'E'){
+ reto = JSON.parse(res)
+ idendesc = reto.client_idendesc;
+ iden = JSON.parse(JSON.parse(idendesc)[0]);
+ linkid = iden.linkid;
+
+ ws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn');
+ ws.onopen = function(){
+ var i;
+ var conn;
+
+ console.log('open');
+
+ console.log(linkid);
+ ws.send(JSON.stringify({
+ 'client_linkid':linkid
+ }));
+
+ conn = new WebSocketConnection(reto.backend_linkid,ws);
+
+ new imc.Auth();
+ new imc.Proxy(linkid,imc.Auth.instance,function(linkid,callback){
+ callback(conn);
+ });
+ imc.Proxy.instance.add_conn(conn);
+
+ imc_register_call('','test_display',test_display);
+
+
+ test_display(idendesc,'',function(result){
+ console.log(result);
+ });
+ };
+ }else{
+ setTimeout(conn_backend,5000);
+ }
+ });
+}
+
+function perf(){
+ $('#speed').text((count - last) + '/s');
+ last = count;
+ setTimeout(perf,1000);
+}
+
+var index = new function(){
+ this.init = function(){
+ var j_navbar = $('#index_head ul.right_navbar');
+ var j_navbar_menu = $('#index_head_menu');
+ var j_navbar_notice = $('#index_head_notice');
+ var j_panel = $('#index_panel');
+ var j_panel_menu = $('#index_panel_menu');
+ var j_panel_notice = $('#index_panel_notice');
+
+ var _in_area = function(target,id){
+ return target.id == id || $(target).parents('#' + id).length > 0;
+ };
+ var _show_panel = function(){
+ var j_i;
+
+ if(!j_panel.hasClass('panel_container_a')){
+ j_i = j_navbar_menu.find('i');
+ j_i.removeClass('icon-chevron-left');
+ j_i.addClass('icon-chevron-down');
+
+ j_panel.addClass('panel_container_a');
+ }
+ };
+ var _hide_panel = function(){
+ var j_i;
+
+ if(j_panel.hasClass('panel_container_a')){
+ j_i = j_navbar_menu.find('i');
+ j_i.removeClass('icon-chevron-down');
+ j_i.addClass('icon-chevron-left');
+
+ j_panel.removeClass('panel_container_a');
+ }
+ };
+ var _show_menu = function(){
+ _hide_notice();
+ j_panel.find('div.menu_container').show();
+ _show_panel();
+ };
+ var _hide_menu = function(){
+ j_panel.find('div.menu_container').hide();
+ };
+ var _show_notice = function(){
+ _hide_menu();
+ j_panel.find('div.notice_container').show();
+ _show_panel();
+ };
+ var _hide_notice = function(){
+ j_panel.find('div.notice_container').hide();
+ };
+
+ $(window).on('resize',function(e){
+ j_panel.css('min-height',($(window).height() - 40 + 'px'));
+ });
+ j_panel.css('min-height',($(window).height() - 40 + 'px'));
+
+ $(window).on('mouseover',function(e){
+ var target = e.target;
+
+ console.log(e.target);
+ if(target == null ||
+ _in_area(target,'index_panel') ||
+ (target.parentNode.id == 'index_head' && $(target).hasClass('navbar-inner'))){
+ return;
+ }
+
+ if(_in_area(target,'index_head_menu')){
+ if(!j_panel.hasClass('panel_container_a')){
+ _show_menu();
+ }
+ }else{
+ if(!_in_area(target,'index_head_rightnavbar')){
+ _hide_panel();
+ }
+ }
+ });
+
+ j_navbar_menu.on('click',function(e){
+ _show_menu();
+ });
+ j_navbar_notice.on('click',function(e){
+ _show_notice();
+ });
+ };
+};