diff options
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-x | src/py/imc/proxy.py | 109 |
1 files changed, 70 insertions, 39 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 6b95674..8f750c0 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -97,7 +97,7 @@ class Proxy: Proxy.instance = self self.register_call('imc/','pend_recvfile',self._pend_recvfile) - self.register_call('imc/','reject_sendfile',self._reject_sendfile) + self.register_call('imc/','abort_sendfile',self._abort_sendfile) def add_conn(self,conn): assert conn.link not in self._conn_linkmap @@ -153,6 +153,14 @@ class Proxy: child,name,filt = self._walk_path(path,True) filt.append(func) + def unregister_call(self,path,func_name): + child,name,filt = self._walk_path(path,True) + del name[func_name] + + def unregister_filter(self,path,func): + child,name,filt = self._walk_path(path,True) + filt.remove(func) + def call(self,dst,func_name,timeout,*args): return self._route_call(None,self._link,async.get_retid(),Auth.get_current_idendesc(),dst,func_name,timeout,list(args)) @@ -167,6 +175,11 @@ class Proxy: self._ioloop.add_callback(tornado.stack_context.wrap(_call)) def sendfile(self,dst_link,filepath): + def _abort_cb(): + if self._ret_sendfile(filekey,'Eabort'): + with Auth.change_current_iden(self._idendesc,self._auth): + self.call(dst_link + 'imc/','abort_sendfile',65536,filekey) + filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() filesize = os.stat(filepath).st_size @@ -176,14 +189,15 @@ class Proxy: 'filesize':filesize, 'filepath':filepath, 'fileresult':fileresult, - 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')), + 'abort_callback':tornado.stack_context.wrap(_abort_cb) } with Auth.change_current_iden(self._idendesc,self._auth): stat,ret = self.call(dst_link + 'imc/','pend_recvfile',65536,self._link,filekey,filesize) if stat == False: - raise ConnectionError(ret) + self._ret_sendfile(filekey,'Enoexist') return fileresult @@ -202,32 +216,28 @@ class Proxy: self._ioloop.add_callback(self._ret_sendfile,filekey,err) - try: - info = self._info_filekeymap[filekey] - - except KeyError: - return + info = self._info_filekeymap[filekey] src_link = info['src_link'] filesize = info['filesize'] in_conn = self._request_conn(src_link) - self._add_wait_filekey(in_conn.link,filekey,filesize,_callback) - in_conn.recv_file(filekey,filesize,filepath,_callback) - self._send_msg_sendfile(in_conn,src_link,filekey,filesize) + if filekey in self._info_filekeymap: + info['abort_callback'] = tornado.stack_context.wrap(lambda : _callback('Eabort')) + self._add_wait_filekey(in_conn.link,filekey,filesize,_callback) + + in_conn.recv_file(filekey,filesize,filepath,_callback) + self._send_msg_sendfile(in_conn,src_link,filekey,filesize) return info['fileresult'] - def rejectfile(self,filekey): + def abortfile(self,filekey): try: - info = self._info_filekeymap.pop(filekey) + self._info_filekeymap[filekey]['abort_callback']() - except KeyError: - return - - with Auth.change_current_iden(self._idendesc,self._auth): - self.call(info['src_link'] + 'imc/','reject_sendfile',65536,filekey) + except: + pass def _walk_path(self,path,create = False): parts = path.split('/')[:-1] @@ -254,6 +264,13 @@ class Proxy: break return (child,name,filt) + + def _json_handler(self,o): + if isinstance(o,datetime.datetime): + return o.isoformat(); + + else: + return None def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param): def __add_wait_caller(conn_link): @@ -403,24 +420,30 @@ class Proxy: try: info = self._info_filekeymap[filekey] if info['filesize'] != filesize: - self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize') + raise ValueError - except KeyError: + except (KeyError,ValueError): + self._send_msg_abortfile(out_conn,filekey,'Enoexist') self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') return + info['abort_callback'] = tornado.stack_context.wrap(lambda : __send_cb('Eabort')) self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb) out_conn.send_file(filekey,info['filepath'],__send_cb) else: in_conn = self._request_conn(src_link) - self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb) - self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb) + if in_conn == None: + self._send_msg_abortfile(out_conn,filekey,'Enoexist') - send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) - in_conn.recv_filedata(filekey,filesize,send_fn) + else: + self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb) + self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb) - self._send_msg_sendfile(in_conn,src_link,filekey,filesize) + send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) + in_conn.recv_filedata(filekey,filesize,send_fn) + + self._send_msg_sendfile(in_conn,src_link,filekey,filesize) def _add_wait_filekey(self,conn_link,filekey,filesize,callback): callback = tornado.stack_context.wrap(callback) @@ -433,21 +456,22 @@ class Proxy: wait = self._conn_filekeymap[conn_link].pop(filekey) self._ioloop.remove_timeout(wait['timer']) - def _ret_sendfile(self,filekey,err = None): + def _ret_sendfile(self,filekey,err): try: info = self._info_filekeymap.pop(filekey) except KeyError: - return + return False self._ioloop.remove_timeout(info['timer']) - fileresult = info['fileresult'] if err == None: - fileresult.ret_result('Success') - + info['fileresult'].ret_result('Success') + else: - fileresult.ret_result(err) + info['fileresult'].ret_result(err) + + return True def _request_conn(self,link): try: @@ -498,7 +522,7 @@ class Proxy: 'param':param } - conn.send_msg(bytes(json.dumps(msg),'utf-8')) + conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8')) def _recv_msg_call(self,conn,msg): @async.caller @@ -523,8 +547,8 @@ class Proxy: 'caller_retid':caller_retid, 'result':{'stat':stat,'data':data} } - - conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8')) def _recv_msg_ret(self,conn,msg): caller_link = msg['caller_link'] @@ -542,7 +566,7 @@ class Proxy: 'filesize':filesize } - conn.send_msg(bytes(json.dumps(msg),'utf-8')) + conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8')) def _recv_msg_sendfile(self,conn,msg): @async.caller @@ -562,7 +586,7 @@ class Proxy: 'error':err } - conn.send_msg(bytes(json.dumps(msg),'utf-8')) + conn.send_msg(bytes(json.dumps(msg,default = self._json_handler),'utf-8')) def _recv_msg_abortfile(self,conn,msg): @async.caller @@ -580,16 +604,23 @@ class Proxy: @async.caller def _pend_recvfile(self,src_link,filekey,filesize): + def __abort_cb(): + if self._ret_sendfile(filekey,'Eabort'): + with Auth.change_current_iden(self._idendesc,self._auth): + self.call(src_link + 'imc/','abort_sendfile',65536,filekey) + self._info_filekeymap[filekey] = { 'src_link':src_link, 'filesize':filesize, 'fileresult':FileResult(filekey), - 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')), + 'abort_callback':tornado.stack_context.wrap(__abort_cb) } @async.caller - def _reject_sendfile(self,filekey): - self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject') + def _abort_sendfile(self,filekey): + if filekey in self._info_filekeymap: + self._ioloop.add_callback(self._ret_sendfile,filekey,'Eabort') def imc_call(dst,func_name,*args): return Proxy.instance.call(dst,func_name,65536,*args) |