diff options
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-x | src/py/imc/proxy.py | 246 |
1 files changed, 121 insertions, 125 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 00476f0..c9d57bc 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -12,10 +12,9 @@ from imc import async from imc.auth import Auth class Connection: - def __init__(self,linkclass,linkid): - self.linkclass = linkclass - self.linkid = linkid - self.link_linkidmap = {} + def __init__(self,link): + self.link = link + self.link_linkmap = {} self._close_callback = [] self._closed = False @@ -74,26 +73,22 @@ class FileResult(): return self._result class Proxy: - def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None): + def __init__(self,link,auth_instance,idendesc,conn_link_fn = lambda : None): self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' self.MSGTYPE_SENDFILE = 'sendfile' self.MSGTYPE_ABORTFILE = 'abortfile' self._ioloop = tornado.ioloop.IOLoop.instance() - self._linkclass = linkclass - self._linkid = linkid + self._link = link self._auth = auth_instance self._idendesc = idendesc - if conn_linkid_fn == None: - self._conn_linkid_fn = lambda : None - else: - self._conn_linkid_fn = conn_linkid_fn + self._conn_link_fn = conn_link_fn - self._conn_linkidmap = {} - self._conn_retidmap = {self._linkid:{}} - self._conn_filekeymap = {self._linkid:{}} + self._conn_linkmap = {} + self._conn_retidmap = {self._link:{}} + self._conn_filekeymap = {self._link:{}} self._call_pathmap = {} self._info_filekeymap = {} @@ -104,47 +99,47 @@ class Proxy: self.register_call('imc/','reject_sendfile',self._reject_sendfile) def add_conn(self,conn): - assert conn.linkid not in self._conn_linkidmap + assert conn.link not in self._conn_linkmap - self._conn_linkidmap[conn.linkid] = conn - self._conn_retidmap[conn.linkid] = {} - self._conn_filekeymap[conn.linkid] = {} + self._conn_linkmap[conn.link] = conn + self._conn_retidmap[conn.link] = {} + self._conn_filekeymap[conn.link] = {} conn.add_close_callback(self._conn_close_cb) conn.start_recv(self._recv_dispatch) - def link_conn(self,linkid,conn): - assert conn.linkid in self._conn_linkidmap + def link_conn(self,link,conn): + assert conn.link in self._conn_linkmap - conn.link_linkidmap[linkid] = True - self._conn_linkidmap[linkid] = conn + conn.link_linkmap[link] = True + self._conn_linkmap[link] = conn - def unlink_conn(self,linkid): - assert linkid in self._conn_linkidmap + def unlink_conn(self,link): + assert link in self._conn_linkmap - conn = self._conn_linkidmap.pop(linkid) - del conn.link_linkidmap[linkid] + conn = self._conn_linkmap.pop(link) + del conn.link_linkmap[link] def del_conn(self,conn): - waits = list(self._conn_retidmap[conn.linkid].values()) + waits = list(self._conn_retidmap[conn.link].values()) for wait in waits: wait['callback']((False,'Eclose')) - waits = list(self._conn_filekeymap[conn.linkid].values()) + waits = list(self._conn_filekeymap[conn.link].values()) for wait in waits: wait['callback']('Eclose') - linkids = list(conn.link_linkidmap.keys()) - for linkid in linkids: - self.unlink_conn(linkid) + links = list(conn.link_linkmap.keys()) + for link in links: + self.unlink_conn(link) - del self._conn_linkidmap[conn.linkid] - del self._conn_retidmap[conn.linkid] - del self._conn_filekeymap[conn.linkid] + del self._conn_linkmap[conn.link] + del self._conn_retidmap[conn.link] + del self._conn_filekeymap[conn.link] - def get_conn(self,linkid): + def get_conn(self,link): try: - return self._conn_linkidmap[linkid] + return self._conn_linkmap[link] except KeyError: return None @@ -152,13 +147,14 @@ class Proxy: def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,func_name])] = func - def call(self,idendesc,dst,func_name,timeout,*args): - return self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args)) + def call(self,dst,func_name,timeout,*args): + return self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args)) - def call_async(self,idendesc,dst,func_name,timeout,callback,*args): + def call_async(self,dst,func_name,timeout,callback,*args): @async.caller def _call(): - result = self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args)) + result = self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args)) + if callback != None: callback(result) @@ -177,7 +173,9 @@ class Proxy: 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } - stat,ret = self.call(self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize},655360) + with Auth.change_current_iden(self._idendesc): + stat,ret = self.call(dst_link + 'imc/','pend_recvfile',65536,self._link,filekey,filesize) + if stat == False: raise ConnectionError(ret) @@ -186,7 +184,7 @@ class Proxy: def recvfile(self,filekey,filepath): def _callback(err = None): try: - self._del_wait_filekey(in_conn.linkid,filekey) + self._del_wait_filekey(in_conn.link,filekey) except KeyError: return @@ -204,14 +202,14 @@ class Proxy: except KeyError: return - src_linkid = info['src_linkid'] + src_link = info['src_link'] filesize = info['filesize'] - in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback) + in_conn = self._request_conn(src_link) + self._add_wait_filekey(in_conn.link,filekey,filesize,_callback) in_conn.recv_file(filekey,filesize,filepath,_callback) - self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + self._send_msg_sendfile(in_conn,src_link,filekey,filesize) return info['fileresult'] @@ -222,104 +220,105 @@ class Proxy: except KeyError: return - dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) - self.call(self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey},65536) + with Auth.change_current_iden(self._idendesc): + self.call(info['src_link'] + 'imc/','reject_sendfile',65536,filekey) - def _route_call(self,in_conn,caller_retid,idendesc,dst,func_name,timeout,param): - def __add_wait_caller(conn_linkid): - callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result)) - self._conn_retidmap[conn_linkid][caller_retid] = { + def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param): + def __add_wait_caller(conn_link): + callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_link,caller_retid,result)) + self._conn_retidmap[conn_link][caller_retid] = { 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))), 'callback':callback } - def __del_wait_caller(conn_linkid): - wait = self._conn_retidmap[conn_linkid].pop(caller_retid) + def __del_wait_caller(conn_link): + wait = self._conn_retidmap[conn_link].pop(caller_retid) self._ioloop.remove_timeout(wait['timer']) def __ret(result): - if caller_linkid == self._linkid: + if caller_link == self._link: return result else: - conn = self._request_conn(caller_linkid) + conn = self._request_conn(caller_link) if conn != None: - self._send_msg_ret(conn,caller_linkid,caller_retid,result) + self._send_msg_ret(conn,caller_link,caller_retid,result) if in_conn != None: - in_linkclass = in_conn.linkclass - in_linkid = in_conn.linkid + in_link = in_conn.link else: - in_linkclass = self._linkclass - in_linkid = self._linkid + in_link = self._link - iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc) - if iden == None: - return __ret(False,'Eilliden') + #iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc) + #if iden == None: + # return __ret((False,'Eilliden')) try: dst_part = dst.split('/',3) - dst_linkid = dst_part[2] + dst_link = ''.join(['/',dst_part[1],'/',dst_part[2],'/']) dst_path = dst_part[3] except Exception: - return __ret(False,'Enoexist') - - caller_linkid = iden['linkid'] + return __ret((False,'Enoexist')) - if dst_linkid == self._linkid: - __add_wait_caller(self._linkid) + if dst_link == self._link: + __add_wait_caller(self._link) try: - with Auth.change_current_iden(iden): + if Auth.get_current_idendesc() == idendesc: result = self._call_pathmap[''.join([dst_path,func_name])](*param) + else: + with Auth.change_current_iden(idendesc): + result = self._call_pathmap[''.join([dst_path,func_name])](*param) + except KeyError: result = (False,'Enoexist') - __del_wait_caller(self._linkid) + __del_wait_caller(self._link) return __ret(result) else: - conn = self._request_conn(dst_linkid) + conn = self._request_conn(dst_link) if conn == None: + print(dst_link) return __ret((False,'Enoexist')) else: - if caller_linkid == self._linkid: - __add_wait_caller(conn.linkid) - self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + if caller_link == self._link: + __add_wait_caller(conn.link) + self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param) result = async.switch_top() - __del_wait_caller(conn.linkid) + __del_wait_caller(conn.link) return __ret(result) else: - self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param) return - def _ret_call(self,caller_linkid,caller_retid,result): + def _ret_call(self,caller_link,caller_retid,result): @async.caller def __ret_remote(): - conn = self._request_conn(caller_linkid) + conn = self._request_conn(caller_link) if conn != None: - self._send_msg_ret(conn,caller_linkid,caller_retid,result) + self._send_msg_ret(conn,caller_link,caller_retid,result) - if caller_linkid == self._linkid: + if caller_link == self._link: async.ret(caller_retid,result) else: __ret_remote() - def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): + def _route_sendfile(self,out_conn,src_link,filekey,filesize): def __send_cb(err = None): try: - self._del_wait_filekey(out_conn.linkid,filekey) + self._del_wait_filekey(out_conn.link,filekey) except KeyError: return @@ -333,7 +332,7 @@ class Proxy: def __bridge_cb(err = None): try: - self._del_wait_filekey(in_conn,filekey) + self._del_wait_filekey(in_conn.link,filekey) if err != None: if not in_conn.closed(): @@ -344,7 +343,7 @@ class Proxy: pass try: - self._del_wait_filekey(out_conn,filekey) + self._del_wait_filekey(out_conn.link,filekey) if err != None: if not out_conn.closed(): @@ -354,7 +353,7 @@ class Proxy: except KeyError: pass - if src_linkid == self._linkid: + if src_link == self._link: try: info = self._info_filekeymap[filekey] if info['filesize'] != filesize: @@ -364,28 +363,28 @@ class Proxy: self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') return - self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb) + self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb) out_conn.send_file(filekey,info['filepath'],__send_cb) else: - in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb) - self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb) + in_conn = self._request_conn(src_link) + self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb) + self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb) send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) in_conn.recv_filedata(filekey,filesize,send_fn) - self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + self._send_msg_sendfile(in_conn,src_link,filekey,filesize) - def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback): + def _add_wait_filekey(self,conn_link,filekey,filesize,callback): callback = tornado.stack_context.wrap(callback) - self._conn_filekeymap[conn_linkid][filekey] = { - 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = min(filesize,1000)),lambda : callback('Etimeout')), + self._conn_filekeymap[conn_link][filekey] = { + 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = max(filesize,10000)),lambda : callback('Etimeout')), 'callback':callback } - def _del_wait_filekey(self,conn_linkid,filekey): - wait = self._conn_filekeymap[conn_linkid].pop(filekey) + def _del_wait_filekey(self,conn_link,filekey): + wait = self._conn_filekeymap[conn_link].pop(filekey) self._ioloop.remove_timeout(wait['timer']) def _ret_sendfile(self,filekey,err = None): @@ -404,15 +403,15 @@ class Proxy: else: fileresult.ret_result(err) - def _request_conn(self,linkid): + def _request_conn(self,link): try: - return self._conn_linkidmap[linkid] + return self._conn_linkmap[link] except KeyError: - conn = self._conn_linkid_fn(linkid) + conn = self._conn_link_fn(link) - if conn != None and conn.linkid != linkid: - self.link_conn(linkid,conn) + if conn != None and conn.link != link: + self.link_conn(link,conn) return conn @@ -441,9 +440,10 @@ class Proxy: elif msg_type == self.MSGTYPE_ABORTFILE: self._recv_msg_abortfile(conn,msg) - def _send_msg_call(self,conn,caller_retid,idendesc,dst,func_name,timeout,param): + def _send_msg_call(self,conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param): msg = { 'type':self.MSGTYPE_CALL, + 'caller_link':caller_link, 'caller_retid':caller_retid, 'idendesc':idendesc, 'dst':dst, @@ -457,8 +457,9 @@ class Proxy: def _recv_msg_call(self,conn,msg): @async.caller def __call(): - self._route_call(conn,caller_retid,idendesc,dst,func_name,timeout,param) + self._route_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param) + caller_link = msg['caller_link'] caller_retid = msg['caller_retid'] idendesc = msg['idendesc'] dst = msg['dst'] @@ -468,11 +469,11 @@ class Proxy: __call() - def _send_msg_ret(self,conn,caller_linkid,caller_retid,result): + def _send_msg_ret(self,conn,caller_link,caller_retid,result): stat,data = result msg = { 'type':self.MSGTYPE_RET, - 'caller_linkid':caller_linkid, + 'caller_link':caller_link, 'caller_retid':caller_retid, 'result':{'stat':stat,'data':data} } @@ -480,17 +481,17 @@ class Proxy: conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_ret(self,conn,msg): - caller_linkid = msg['caller_linkid'] + caller_link = msg['caller_link'] caller_retid = msg['caller_retid'] data = msg['result'] result = (data['stat'],data['data']) - self._ret_call(caller_linkid,caller_retid,result) + self._ret_call(caller_link,caller_retid,result) - def _send_msg_sendfile(self,conn,src_linkid,filekey,filesize): + def _send_msg_sendfile(self,conn,src_link,filekey,filesize): msg = { 'type':self.MSGTYPE_SENDFILE, - 'src_linkid':src_linkid, + 'src_link':src_link, 'filekey':filekey, 'filesize':filesize } @@ -500,9 +501,9 @@ class Proxy: def _recv_msg_sendfile(self,conn,msg): @async.caller def __call(): - self._route_sendfile(conn,src_linkid,filekey,filesize) + self._route_sendfile(conn,src_link,filekey,filesize) - src_linkid = msg['src_linkid'] + src_link = msg['src_link'] filekey = msg['filekey'] filesize = msg['filesize'] @@ -521,7 +522,7 @@ class Proxy: @async.caller def __call(): try: - self._conn_filekeymap[conn.linkid][filekey]['callback'](err) + self._conn_filekeymap[conn.link][filekey]['callback'](err) except KeyError: pass @@ -532,28 +533,23 @@ class Proxy: __call() @async.caller - def _pend_recvfile(self,iden,param): - filekey = param['filekey'] - filesize = param['filesize'] - + def _pend_recvfile(self,src_link,filekey,filesize): self._info_filekeymap[filekey] = { - 'src_linkclass':iden['linkclass'], - 'src_linkid':iden['linkid'], + 'src_link':src_link, 'filesize':filesize, 'fileresult':FileResult(filekey), 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } @async.caller - def _reject_sendfile(self,iden,param): - filekey = param['filekey'] + def _reject_sendfile(self,filekey): self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject') -def imc_call(idendesc,dst,func_name,*args): - return Proxy.instance.call(idendesc,dst,func_name,65536,*args) +def imc_call(dst,func_name,*args): + return Proxy.instance.call(dst,func_name,65536,*args) -def imc_call_async(idendesc,dst,func_name,callback,*args): - Proxy.instance.call_async(idendesc,dst,func_name,65536,callback,*args) +def imc_call_async(dst,func_name,callback,*args): + Proxy.instance.call_async(dst,func_name,65536,callback,*args) def imc_register_call(path,func_name,func): Proxy.instance.register_call(path,func_name,func) |