aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/imc/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-xsrc/py/imc/proxy.py109
1 files changed, 70 insertions, 39 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 6b95674..8f750c0 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -97,7 +97,7 @@ class Proxy:
Proxy.instance = self
self.register_call('imc/','pend_recvfile',self._pend_recvfile)
- self.register_call('imc/','reject_sendfile',self._reject_sendfile)
+ self.register_call('imc/','abort_sendfile',self._abort_sendfile)
def add_conn(self,conn):
assert conn.link not in self._conn_linkmap
@@ -153,6 +153,14 @@ class Proxy:
child,name,filt = self._walk_path(path,True)
filt.append(func)
+ def unregister_call(self,path,func_name):
+ child,name,filt = self._walk_path(path,True)
+ del name[func_name]
+
+ def unregister_filter(self,path,func):
+ child,name,filt = self._walk_path(path,True)
+ filt.remove(func)
+
def call(self,dst,func_name,timeout,*args):
return self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args))
@@ -167,6 +175,11 @@ class Proxy:
self._ioloop.add_callback(tornado.stack_context.wrap(_call))
def sendfile(self,dst_link,filepath):
+ def _abort_cb():
+ if self._ret_sendfile(filekey,'Eabort'):
+ with Auth.change_current_iden(self._idendesc,self._auth):
+ self.call(dst_link + 'imc/','abort_sendfile',65536,filekey)
+
filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
filesize = os.stat(filepath).st_size
@@ -176,14 +189,15 @@ class Proxy:
'filesize':filesize,
'filepath':filepath,
'fileresult':fileresult,
- 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')),
+ 'abort_callback':tornado.stack_context.wrap(_abort_cb)
}
with Auth.change_current_iden(self._idendesc,self._auth):
stat,ret = self.call(dst_link + 'imc/','pend_recvfile',65536,self._link,filekey,filesize)
if stat == False:
- raise ConnectionError(ret)
+ self._ret_sendfile(filekey,'Enoexist')
return fileresult
@@ -202,32 +216,28 @@ class Proxy:
self._ioloop.add_callback(self._ret_sendfile,filekey,err)
- try:
- info = self._info_filekeymap[filekey]
-
- except KeyError:
- return
+ info = self._info_filekeymap[filekey]
src_link = info['src_link']
filesize = info['filesize']
in_conn = self._request_conn(src_link)
- self._add_wait_filekey(in_conn.link,filekey,filesize,_callback)
- in_conn.recv_file(filekey,filesize,filepath,_callback)
- self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
+ if filekey in self._info_filekeymap:
+ info['abort_callback'] = tornado.stack_context.wrap(lambda : _callback('Eabort'))
+ self._add_wait_filekey(in_conn.link,filekey,filesize,_callback)
+
+ in_conn.recv_file(filekey,filesize,filepath,_callback)
+ self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
return info['fileresult']
- def rejectfile(self,filekey):
+ def abortfile(self,filekey):
try:
- info = self._info_filekeymap.pop(filekey)
+ self._info_filekeymap[filekey]['abort_callback']()
- except KeyError:
- return
-
- with Auth.change_current_iden(self._idendesc,self._auth):
- self.call(info['src_link'] + 'imc/','reject_sendfile',65536,filekey)
+ except:
+ pass
def _walk_path(self,path,create = False):
parts = path.split('/')[:-1]
@@ -254,6 +264,13 @@ class Proxy:
break
return (child,name,filt)
+
+ def _json_handler(self,o):
+ if isinstance(o,datetime.datetime):
+ return o.isoformat();
+
+ else:
+ return None
def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param):
def __add_wait_caller(conn_link):
@@ -403,24 +420,30 @@ class Proxy:
try:
info = self._info_filekeymap[filekey]
if info['filesize'] != filesize:
- self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize')
+ raise ValueError
- except KeyError:
+ except (KeyError,ValueError):
+ self._send_msg_abortfile(out_conn,filekey,'Enoexist')
self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
return
+ info['abort_callback'] = tornado.stack_context.wrap(lambda : __send_cb('Eabort'))
self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb)
out_conn.send_file(filekey,info['filepath'],__send_cb)
else:
in_conn = self._request_conn(src_link)
- self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb)
- self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb)
+ if in_conn == None:
+ self._send_msg_abortfile(out_conn,filekey,'Enoexist')
- send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
- in_conn.recv_filedata(filekey,filesize,send_fn)
+ else:
+ self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb)
+ self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb)
- self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
+ send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
+ in_conn.recv_filedata(filekey,filesize,send_fn)
+
+ self._send_msg_sendfile(in_conn,src_link,filekey,filesize)
def _add_wait_filekey(self,conn_link,filekey,filesize,callback):
callback = tornado.stack_context.wrap(callback)
@@ -433,21 +456,22 @@ class Proxy:
wait = self._conn_filekeymap[conn_link].pop(filekey)
self._ioloop.remove_timeout(wait['timer'])
- def _ret_sendfile(self,filekey,err = None):
+ def _ret_sendfile(self,filekey,err):
try:
info = self._info_filekeymap.pop(filekey)
except KeyError:
- return
+ return False
self._ioloop.remove_timeout(info['timer'])
- fileresult = info['fileresult']
if err == None:
- fileresult.ret_result('Success')
-
+ info['fileresult'].ret_result('Success')
+
else:
- fileresult.ret_result(err)
+ info['fileresult'].ret_result(err)
+
+ return True
def _request_conn(self,link):
try:
@@ -498,7 +522,7 @@ class Proxy:
'param':param
}
- conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+ conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8'))
def _recv_msg_call(self,conn,msg):
@async.caller
@@ -523,8 +547,8 @@ class Proxy:
'caller_retid':caller_retid,
'result':{'stat':stat,'data':data}
}
-
- conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8'))
def _recv_msg_ret(self,conn,msg):
caller_link = msg['caller_link']
@@ -542,7 +566,7 @@ class Proxy:
'filesize':filesize
}
- conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+ conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8'))
def _recv_msg_sendfile(self,conn,msg):
@async.caller
@@ -562,7 +586,7 @@ class Proxy:
'error':err
}
- conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+ conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8'))
def _recv_msg_abortfile(self,conn,msg):
@async.caller
@@ -580,16 +604,23 @@ class Proxy:
@async.caller
def _pend_recvfile(self,src_link,filekey,filesize):
+ def __abort_cb():
+ if self._ret_sendfile(filekey,'Eabort'):
+ with Auth.change_current_iden(self._idendesc,self._auth):
+ self.call(src_link + 'imc/','abort_sendfile',65536,filekey)
+
self._info_filekeymap[filekey] = {
'src_link':src_link,
'filesize':filesize,
'fileresult':FileResult(filekey),
- 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')),
+ 'abort_callback':tornado.stack_context.wrap(__abort_cb)
}
@async.caller
- def _reject_sendfile(self,filekey):
- self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject')
+ def _abort_sendfile(self,filekey):
+ if filekey in self._info_filekeymap:
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Eabort')
def imc_call(dst,func_name,*args):
return Proxy.instance.call(dst,func_name,65536,*args)