aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/imc/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-xsrc/py/imc/proxy.py246
1 files changed, 121 insertions, 125 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 00476f0..c9d57bc 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -12,10 +12,9 @@ from imc import async
from imc.auth import Auth
class Connection:
- def __init__(self,linkclass,linkid):
- self.linkclass = linkclass
- self.linkid = linkid
- self.link_linkidmap = {}
+ def __init__(self,link):
+ self.link = link
+ self.link_linkmap = {}
self._close_callback = []
self._closed = False
@@ -74,26 +73,22 @@ class FileResult():
return self._result
class Proxy:
- def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None):
+ def __init__(self,link,auth_instance,idendesc,conn_link_fn = lambda : None):
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
self.MSGTYPE_SENDFILE = 'sendfile'
self.MSGTYPE_ABORTFILE = 'abortfile'
self._ioloop = tornado.ioloop.IOLoop.instance()
- self._linkclass = linkclass
- self._linkid = linkid
+ self._link = link
self._auth = auth_instance
self._idendesc = idendesc
- if conn_linkid_fn == None:
- self._conn_linkid_fn = lambda : None
- else:
- self._conn_linkid_fn = conn_linkid_fn
+ self._conn_link_fn = conn_link_fn
- self._conn_linkidmap = {}
- self._conn_retidmap = {self._linkid:{}}
- self._conn_filekeymap = {self._linkid:{}}
+ self._conn_linkmap = {}
+ self._conn_retidmap = {self._link:{}}
+ self._conn_filekeymap = {self._link:{}}
self._call_pathmap = {}
self._info_filekeymap = {}
@@ -104,47 +99,47 @@ class Proxy:
self.register_call('imc/','reject_sendfile',self._reject_sendfile)
def add_conn(self,conn):
- assert conn.linkid not in self._conn_linkidmap
+ assert conn.link not in self._conn_linkmap
- self._conn_linkidmap[conn.linkid] = conn
- self._conn_retidmap[conn.linkid] = {}
- self._conn_filekeymap[conn.linkid] = {}
+ self._conn_linkmap[conn.link] = conn
+ self._conn_retidmap[conn.link] = {}
+ self._conn_filekeymap[conn.link] = {}
conn.add_close_callback(self._conn_close_cb)
conn.start_recv(self._recv_dispatch)
- def link_conn(self,linkid,conn):
- assert conn.linkid in self._conn_linkidmap
+ def link_conn(self,link,conn):
+ assert conn.link in self._conn_linkmap
- conn.link_linkidmap[linkid] = True
- self._conn_linkidmap[linkid] = conn
+ conn.link_linkmap[link] = True
+ self._conn_linkmap[link] = conn
- def unlink_conn(self,linkid):
- assert linkid in self._conn_linkidmap
+ def unlink_conn(self,link):
+ assert link in self._conn_linkmap
- conn = self._conn_linkidmap.pop(linkid)
- del conn.link_linkidmap[linkid]
+ conn = self._conn_linkmap.pop(link)
+ del conn.link_linkmap[link]
def del_conn(self,conn):
- waits = list(self._conn_retidmap[conn.linkid].values())
+ waits = list(self._conn_retidmap[conn.link].values())
for wait in waits:
wait['callback']((False,'Eclose'))
- waits = list(self._conn_filekeymap[conn.linkid].values())
+ waits = list(self._conn_filekeymap[conn.link].values())
for wait in waits:
wait['callback']('Eclose')
- linkids = list(conn.link_linkidmap.keys())
- for linkid in linkids:
- self.unlink_conn(linkid)
+ links = list(conn.link_linkmap.keys())
+ for link in links:
+ self.unlink_conn(link)
- del self._conn_linkidmap[conn.linkid]
- del self._conn_retidmap[conn.linkid]
- del self._conn_filekeymap[conn.linkid]
+ del self._conn_linkmap[conn.link]
+ del self._conn_retidmap[conn.link]
+ del self._conn_filekeymap[conn.link]
- def get_conn(self,linkid):
+ def get_conn(self,link):
try:
- return self._conn_linkidmap[linkid]
+ return self._conn_linkmap[link]
except KeyError:
return None
@@ -152,13 +147,14 @@ class Proxy:
def register_call(self,path,func_name,func):
self._call_pathmap[''.join([path,func_name])] = func
- def call(self,idendesc,dst,func_name,timeout,*args):
- return self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args))
+ def call(self,dst,func_name,timeout,*args):
+ return self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args))
- def call_async(self,idendesc,dst,func_name,timeout,callback,*args):
+ def call_async(self,dst,func_name,timeout,callback,*args):
@async.caller
def _call():
- result = self._route_call(None,async.get_retid(),idendesc,dst,func_name,timeout,list(args))
+ result = self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args))
+
if callback != None:
callback(result)
@@ -177,7 +173,9 @@ class Proxy:
'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
- stat,ret = self.call(self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize},655360)
+ with Auth.change_current_iden(self._idendesc):
+ stat,ret = self.call(dst_link + 'imc/','pend_recvfile',65536,self._link,filekey,filesize)
+
if stat == False:
raise ConnectionError(ret)
@@ -186,7 +184,7 @@ class Proxy:
def recvfile(self,filekey,filepath):
def _callback(err = None):
try:
- self._del_wait_filekey(in_conn.linkid,filekey)
+ self._del_wait_filekey(in_conn.link,filekey)
except KeyError:
return
@@ -204,14 +202,14 @@ class Proxy:
except KeyError:
return
- src_linkid = info['src_linkid']
+ src_link = info['src_link']
filesize = info['filesize']
- in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback)
+ in_conn = self._request_conn(src_link)
+ self._add_wait_filekey(in_conn.link,filekey,filesize,_callback)
in_conn.recv_file(filekey,filesize,filepath,_callback)
- self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+ self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
return info['fileresult']
@@ -222,104 +220,105 @@ class Proxy:
except KeyError:
return
- dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/'])
- self.call(self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey},65536)
+ with Auth.change_current_iden(self._idendesc):
+ self.call(info['src_link'] + 'imc/','reject_sendfile',65536,filekey)
- def _route_call(self,in_conn,caller_retid,idendesc,dst,func_name,timeout,param):
- def __add_wait_caller(conn_linkid):
- callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result))
- self._conn_retidmap[conn_linkid][caller_retid] = {
+ def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param):
+ def __add_wait_caller(conn_link):
+ callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_link,caller_retid,result))
+ self._conn_retidmap[conn_link][caller_retid] = {
'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))),
'callback':callback
}
- def __del_wait_caller(conn_linkid):
- wait = self._conn_retidmap[conn_linkid].pop(caller_retid)
+ def __del_wait_caller(conn_link):
+ wait = self._conn_retidmap[conn_link].pop(caller_retid)
self._ioloop.remove_timeout(wait['timer'])
def __ret(result):
- if caller_linkid == self._linkid:
+ if caller_link == self._link:
return result
else:
- conn = self._request_conn(caller_linkid)
+ conn = self._request_conn(caller_link)
if conn != None:
- self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+ self._send_msg_ret(conn,caller_link,caller_retid,result)
if in_conn != None:
- in_linkclass = in_conn.linkclass
- in_linkid = in_conn.linkid
+ in_link = in_conn.link
else:
- in_linkclass = self._linkclass
- in_linkid = self._linkid
+ in_link = self._link
- iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc)
- if iden == None:
- return __ret(False,'Eilliden')
+ #iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc)
+ #if iden == None:
+ # return __ret((False,'Eilliden'))
try:
dst_part = dst.split('/',3)
- dst_linkid = dst_part[2]
+ dst_link = ''.join(['/',dst_part[1],'/',dst_part[2],'/'])
dst_path = dst_part[3]
except Exception:
- return __ret(False,'Enoexist')
-
- caller_linkid = iden['linkid']
+ return __ret((False,'Enoexist'))
- if dst_linkid == self._linkid:
- __add_wait_caller(self._linkid)
+ if dst_link == self._link:
+ __add_wait_caller(self._link)
try:
- with Auth.change_current_iden(iden):
+ if Auth.get_current_idendesc() == idendesc:
result = self._call_pathmap[''.join([dst_path,func_name])](*param)
+ else:
+ with Auth.change_current_iden(idendesc):
+ result = self._call_pathmap[''.join([dst_path,func_name])](*param)
+
except KeyError:
result = (False,'Enoexist')
- __del_wait_caller(self._linkid)
+ __del_wait_caller(self._link)
return __ret(result)
else:
- conn = self._request_conn(dst_linkid)
+ conn = self._request_conn(dst_link)
if conn == None:
+ print(dst_link)
return __ret((False,'Enoexist'))
else:
- if caller_linkid == self._linkid:
- __add_wait_caller(conn.linkid)
- self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+ if caller_link == self._link:
+ __add_wait_caller(conn.link)
+ self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param)
result = async.switch_top()
- __del_wait_caller(conn.linkid)
+ __del_wait_caller(conn.link)
return __ret(result)
else:
- self._send_msg_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+ self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param)
return
- def _ret_call(self,caller_linkid,caller_retid,result):
+ def _ret_call(self,caller_link,caller_retid,result):
@async.caller
def __ret_remote():
- conn = self._request_conn(caller_linkid)
+ conn = self._request_conn(caller_link)
if conn != None:
- self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+ self._send_msg_ret(conn,caller_link,caller_retid,result)
- if caller_linkid == self._linkid:
+ if caller_link == self._link:
async.ret(caller_retid,result)
else:
__ret_remote()
- def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
+ def _route_sendfile(self,out_conn,src_link,filekey,filesize):
def __send_cb(err = None):
try:
- self._del_wait_filekey(out_conn.linkid,filekey)
+ self._del_wait_filekey(out_conn.link,filekey)
except KeyError:
return
@@ -333,7 +332,7 @@ class Proxy:
def __bridge_cb(err = None):
try:
- self._del_wait_filekey(in_conn,filekey)
+ self._del_wait_filekey(in_conn.link,filekey)
if err != None:
if not in_conn.closed():
@@ -344,7 +343,7 @@ class Proxy:
pass
try:
- self._del_wait_filekey(out_conn,filekey)
+ self._del_wait_filekey(out_conn.link,filekey)
if err != None:
if not out_conn.closed():
@@ -354,7 +353,7 @@ class Proxy:
except KeyError:
pass
- if src_linkid == self._linkid:
+ if src_link == self._link:
try:
info = self._info_filekeymap[filekey]
if info['filesize'] != filesize:
@@ -364,28 +363,28 @@ class Proxy:
self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
return
- self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb)
+ self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb)
out_conn.send_file(filekey,info['filepath'],__send_cb)
else:
- in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb)
- self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb)
+ in_conn = self._request_conn(src_link)
+ self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb)
+ self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb)
send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
in_conn.recv_filedata(filekey,filesize,send_fn)
- self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+ self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
- def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback):
+ def _add_wait_filekey(self,conn_link,filekey,filesize,callback):
callback = tornado.stack_context.wrap(callback)
- self._conn_filekeymap[conn_linkid][filekey] = {
- 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = min(filesize,1000)),lambda : callback('Etimeout')),
+ self._conn_filekeymap[conn_link][filekey] = {
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = max(filesize,10000)),lambda : callback('Etimeout')),
'callback':callback
}
- def _del_wait_filekey(self,conn_linkid,filekey):
- wait = self._conn_filekeymap[conn_linkid].pop(filekey)
+ def _del_wait_filekey(self,conn_link,filekey):
+ wait = self._conn_filekeymap[conn_link].pop(filekey)
self._ioloop.remove_timeout(wait['timer'])
def _ret_sendfile(self,filekey,err = None):
@@ -404,15 +403,15 @@ class Proxy:
else:
fileresult.ret_result(err)
- def _request_conn(self,linkid):
+ def _request_conn(self,link):
try:
- return self._conn_linkidmap[linkid]
+ return self._conn_linkmap[link]
except KeyError:
- conn = self._conn_linkid_fn(linkid)
+ conn = self._conn_link_fn(link)
- if conn != None and conn.linkid != linkid:
- self.link_conn(linkid,conn)
+ if conn != None and conn.link != link:
+ self.link_conn(link,conn)
return conn
@@ -441,9 +440,10 @@ class Proxy:
elif msg_type == self.MSGTYPE_ABORTFILE:
self._recv_msg_abortfile(conn,msg)
- def _send_msg_call(self,conn,caller_retid,idendesc,dst,func_name,timeout,param):
+ def _send_msg_call(self,conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param):
msg = {
'type':self.MSGTYPE_CALL,
+ 'caller_link':caller_link,
'caller_retid':caller_retid,
'idendesc':idendesc,
'dst':dst,
@@ -457,8 +457,9 @@ class Proxy:
def _recv_msg_call(self,conn,msg):
@async.caller
def __call():
- self._route_call(conn,caller_retid,idendesc,dst,func_name,timeout,param)
+ self._route_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param)
+ caller_link = msg['caller_link']
caller_retid = msg['caller_retid']
idendesc = msg['idendesc']
dst = msg['dst']
@@ -468,11 +469,11 @@ class Proxy:
__call()
- def _send_msg_ret(self,conn,caller_linkid,caller_retid,result):
+ def _send_msg_ret(self,conn,caller_link,caller_retid,result):
stat,data = result
msg = {
'type':self.MSGTYPE_RET,
- 'caller_linkid':caller_linkid,
+ 'caller_link':caller_link,
'caller_retid':caller_retid,
'result':{'stat':stat,'data':data}
}
@@ -480,17 +481,17 @@ class Proxy:
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_ret(self,conn,msg):
- caller_linkid = msg['caller_linkid']
+ caller_link = msg['caller_link']
caller_retid = msg['caller_retid']
data = msg['result']
result = (data['stat'],data['data'])
- self._ret_call(caller_linkid,caller_retid,result)
+ self._ret_call(caller_link,caller_retid,result)
- def _send_msg_sendfile(self,conn,src_linkid,filekey,filesize):
+ def _send_msg_sendfile(self,conn,src_link,filekey,filesize):
msg = {
'type':self.MSGTYPE_SENDFILE,
- 'src_linkid':src_linkid,
+ 'src_link':src_link,
'filekey':filekey,
'filesize':filesize
}
@@ -500,9 +501,9 @@ class Proxy:
def _recv_msg_sendfile(self,conn,msg):
@async.caller
def __call():
- self._route_sendfile(conn,src_linkid,filekey,filesize)
+ self._route_sendfile(conn,src_link,filekey,filesize)
- src_linkid = msg['src_linkid']
+ src_link = msg['src_link']
filekey = msg['filekey']
filesize = msg['filesize']
@@ -521,7 +522,7 @@ class Proxy:
@async.caller
def __call():
try:
- self._conn_filekeymap[conn.linkid][filekey]['callback'](err)
+ self._conn_filekeymap[conn.link][filekey]['callback'](err)
except KeyError:
pass
@@ -532,28 +533,23 @@ class Proxy:
__call()
@async.caller
- def _pend_recvfile(self,iden,param):
- filekey = param['filekey']
- filesize = param['filesize']
-
+ def _pend_recvfile(self,src_link,filekey,filesize):
self._info_filekeymap[filekey] = {
- 'src_linkclass':iden['linkclass'],
- 'src_linkid':iden['linkid'],
+ 'src_link':src_link,
'filesize':filesize,
'fileresult':FileResult(filekey),
'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
@async.caller
- def _reject_sendfile(self,iden,param):
- filekey = param['filekey']
+ def _reject_sendfile(self,filekey):
self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject')
-def imc_call(idendesc,dst,func_name,*args):
- return Proxy.instance.call(idendesc,dst,func_name,65536,*args)
+def imc_call(dst,func_name,*args):
+ return Proxy.instance.call(dst,func_name,65536,*args)
-def imc_call_async(idendesc,dst,func_name,callback,*args):
- Proxy.instance.call_async(idendesc,dst,func_name,65536,callback,*args)
+def imc_call_async(dst,func_name,callback,*args):
+ Proxy.instance.call_async(dst,func_name,65536,callback,*args)
def imc_register_call(path,func_name,func):
Proxy.instance.register_call(path,func_name,func)