diff options
author | pzread <netfirewall@gmail.com> | 2013-05-31 01:04:19 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-31 01:04:19 +0800 |
commit | 42463e2fc315928c3b14bd77de5c6741edade365 (patch) | |
tree | 629a46aba852314d1e38006d3fc55cb6f1b82709 | |
parent | 05c48532dc67c44d12682de3ece293e6f2a412f3 (diff) | |
download | taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.gz taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.zst taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.zip |
Add cancel sendfile
-rw-r--r-- | src/py/backend_server.py | 8 | ||||
-rw-r--r-- | src/py/center_server.py | 2 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 97 | ||||
-rw-r--r-- | src/py/netio.py | 9 |
4 files changed, 83 insertions, 33 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 2ac0f2b..85e49b6 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -92,7 +92,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._idendesc = info['idendesc'] iden = TOJAuth.instance.get_iden('backend',self._linkid,self._idendesc) self._linkid = iden['linkid'] - Proxy('backend',self._linkid,TOJAuth.instance,self._conn_linkid) + Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid) self.center_conn = SocketConnection('center',info['center_linkid'],stream) self.center_conn.add_close_callback(__retry) @@ -225,8 +225,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,iden,param): - param = '5' - filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso') + param = '3' + filekey = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso') dst = '/backend/' + param + '/' ret = imc_call(self._idendesc,dst,'test_dst',filekey) @@ -236,7 +236,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) #return ret + ' Too' - Proxy.instance.recvfile(param,'data') + Proxy.instance.cancelfile(param) return 'ok' diff --git a/src/py/center_server.py b/src/py/center_server.py index 513d946..2ad6dfd 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -68,7 +68,7 @@ class CenterServer(tornado.tcpserver.TCPServer): self._linkid = self._create_linkid() self._idendesc = self._create_idendesc('center',self._linkid) - Proxy('center',self._linkid,TOJAuth.instance) + Proxy('center',self._linkid,TOJAuth.instance,self._idendesc) imc_register_call('','lookup_linkid',self._lookup_linkid) imc_register_call('','add_client',self._add_client) diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 65589c3..948041a 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -36,7 +36,7 @@ class Connection: return self._closed class Proxy: - def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None): + def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None): self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' self.MSGTYPE_SENDFILE = 'sendfile' @@ -46,6 +46,7 @@ class Proxy: self._linkclass = linkclass self._linkid = linkid self._auth = auth_instance + self._idendesc = idendesc if conn_linkid_fn == None: self._conn_linkid_fn = lambda : None @@ -65,6 +66,7 @@ class Proxy: Proxy.instance = self self.register_call('imc/','pend_recvfile',self._pend_recvfile) + self.register_call('imc/','cancel_sendfile',self._cancel_sendfile) def add_conn(self,conn): assert conn.linkid not in self._conn_linkidmap @@ -133,12 +135,12 @@ class Proxy: caller_retid = ''.join([self._linkid,'/',caller_grid]) return self._route_call(None,caller_retid,timeout,idendesc,dst,func_name,param) - def sendfile(self,idendesc,dst_link,filepath): + def sendfile(self,dst_link,filepath): @async.callee def _call(_grid): self.call(_grid, 10000, - idendesc, + self._idendesc, dst_link + 'imc/','pend_recvfile', {'filekey':filekey,'filesize':filesize}) @@ -147,7 +149,8 @@ class Proxy: self._info_filekeymap[filekey] = { 'filesize':filesize, - 'filepath':filepath + 'filepath':filepath, + 'callback':None } _call() @@ -155,6 +158,11 @@ class Proxy: return filekey def recvfile(self,filekey,filepath): + def _handle_cb(err = None): + info = self._info_filekeymap.pop(filekey) + + print('recv done') + def _fail_cb(err): try: del self._conn_filekeymap[in_conn.linkid][filekey] @@ -166,21 +174,40 @@ class Proxy: in_conn.abort_file(filekey) self._send_msg_abortfile(in_conn,filekey,err) + _handle_cb(err) + try: - info = self._info_filekeymap.pop(filekey) - src_linkid = info['src_linkid'] - filesize = info['filesize'] + info = self._info_filekeymap[filekey] - in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb) + except KeyError: + return - in_conn.recv_file(filekey,filesize,filepath) - self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + src_linkid = info['src_linkid'] + filesize = info['filesize'] + + in_conn = self._request_conn(src_linkid) + self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb) + + in_conn.recv_file(filekey,filesize,filepath,_handle_cb) + self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + + def cancelfile(self,filekey): + @async.callee + def _call(_grid): + self.call(_grid, + 10000, + self._idendesc, + dst_link + 'imc/','cancel_sendfile', + {'filekey':filekey}) + + try: + info = self._info_filekeymap.pop(filekey) except KeyError: - pass + return - return + dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) + _call() def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param): def __add_wait_caller(in_linkid): @@ -253,7 +280,12 @@ class Proxy: return def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): - def _send_fail(err): + def __handle_cb(err = None): + info = self._info_filekeymap.pop(filekey) + + print('send done') + + def __send_fail_cb(err): try: del self._conn_filekeymap[out_conn.linkid][filekey] @@ -264,7 +296,9 @@ class Proxy: out_conn.abort_file(filekey) self._send_msg_abortfile(out_conn,filekey,err) - def _bridge_fail(err): + __handle_cb(err) + + def __bridge_fail_cb(err): try: del self._conn_filekeymap[in_conn.linkid][filekey] @@ -285,26 +319,27 @@ class Proxy: except KeyError: pass + __handle_cb(err) + if src_linkid == self._linkid: try: - info = self._info_filekeymap.pop(filekey) + info = self._info_filekeymap[filekey] assert info['filesize'] == filesize - self._add_wait_filekey(filekey,filesize,None,out_conn,_send_fail) - - out_conn.send_file(filekey,info['filepath']) - except KeyError: - pass + return except AssertionError: - pass + return + + self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb) + out_conn.send_file(filekey,info['filepath'],__handle_cb) else: print('test start') in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(filekey,filesize,in_conn,out_conn,_bridge_fail) + self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb) send_fn = out_conn.send_filedata(filekey,filesize) in_conn.recv_filedata(filekey,filesize,send_fn) @@ -482,9 +517,23 @@ class Proxy: filesize = param['filesize'] self._info_filekeymap[filekey] = { + 'src_linkclass':iden['linkclass'], 'src_linkid':iden['linkid'], - 'filesize':filesize + 'filesize':filesize, + 'callback':None } + + @async.caller + def _cancel_sendfile(self,iden,param): + filekey = param['filekey'] + + try: + info = self._info_filekeymap.pop(filekey) + + except KeyError: + return + + print('cancel') @async.callee def imc_call(idendesc,dst,func_name,param,_grid): diff --git a/src/py/netio.py b/src/py/netio.py index 2910207..df19627 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -289,7 +289,7 @@ class SocketConnection(Connection): self.main_stream.write(struct.pack('l',len(data)) + data) - def send_file(self,filekey,filepath): + def send_file(self,filekey,filepath,callback): def _conn_cb(): self._add_sendfile(filekey,_fail_cb) @@ -306,7 +306,7 @@ class SocketConnection(Connection): os.close(fd) - print('send done') + callback() def _fail_cb(): try: @@ -328,7 +328,7 @@ class SocketConnection(Connection): file_stream.set_close_callback(lambda stream : _fail_cb()) file_stream.connect(_conn_cb) - def recv_file(self,filekey,filesize,filepath): + def recv_file(self,filekey,filesize,filepath,callback): def _conn_cb(stream): nonlocal file_stream @@ -343,9 +343,10 @@ class SocketConnection(Connection): file_stream.close() os.close(fd) - print('recv done') print(time.perf_counter() - st) + callback() + def _fail_cb(): try: del self._sendfile_filekeymap[filekey] |