diff options
author | pzread <netfirewall@gmail.com> | 2013-05-30 17:41:45 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-30 17:41:45 +0800 |
commit | 05c48532dc67c44d12682de3ece293e6f2a412f3 (patch) | |
tree | 164d67f8cc1c3009867e98b9852887be3575941d | |
parent | 4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e (diff) | |
download | taiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.tar.gz taiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.tar.zst taiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.zip |
Add fault tolerance to sendfile
-rw-r--r-- | src/py/backend_server.py | 13 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 109 | ||||
-rw-r--r-- | src/py/netio.py | 121 |
3 files changed, 215 insertions, 28 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 2e91382..2ac0f2b 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -104,7 +104,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): time.sleep(1) if int(self._linkid) == 2: - self._test_call(None,'4') + self._test_call(None,'9') sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -225,16 +225,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,iden,param): - print('start cold test') - - filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso') - - dst = '/backend/' + param + '/' - ret = imc_call(self._idendesc,dst,'test_dst',filekey) - - time.sleep(10) - print('start warm test') - + param = '5' filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso') dst = '/backend/' + param + '/' diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 8897a7e..65589c3 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -1,6 +1,7 @@ import json import uuid import os +import datetime import tornado.ioloop import tornado.stack_context @@ -14,6 +15,7 @@ class Connection: self.linkid = linkid self.link_linkidmap = {} self._close_callback = [] + self._closed = False def send_msg(self,data): pass @@ -25,15 +27,20 @@ class Connection: self._close_callback.append(tornado.stack_context.wrap(callback)) def close(self): + self._closed = True + for callback in self._close_callback: callback(self) + def closed(self): + return self._closed + class Proxy: def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None): self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' self.MSGTYPE_SENDFILE = 'sendfile' - self.MSGTYPE_RECVFILE = 'recvfile' + self.MSGTYPE_ABORTFILE = 'abortfile' self._ioloop = tornado.ioloop.IOLoop.instance() self._linkclass = linkclass @@ -91,6 +98,15 @@ class Proxy: for linkid,retid in wait_ret: self._ret_call(linkid,retid,(False,'Eclose')) + fail_map = self._conn_filekeymap[conn.linkid] + fails = fail_map.values() + fail_cb = [] + for callback in fails: + fail_cb.append(callback) + + for callback in fail_cb: + callback('Eclose') + linkids = conn.link_linkidmap.keys() link_del = [] for linkid in linkids: @@ -139,12 +155,25 @@ class Proxy: return filekey def recvfile(self,filekey,filepath): + def _fail_cb(err): + try: + del self._conn_filekeymap[in_conn.linkid][filekey] + + except KeyError: + return + + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) + try: info = self._info_filekeymap.pop(filekey) src_linkid = info['src_linkid'] filesize = info['filesize'] in_conn = self._request_conn(src_linkid) + self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb) + in_conn.recv_file(filekey,filesize,filepath) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) @@ -224,12 +253,45 @@ class Proxy: return def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): + def _send_fail(err): + try: + del self._conn_filekeymap[out_conn.linkid][filekey] + + except KeyError: + return + + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) + + def _bridge_fail(err): + try: + del self._conn_filekeymap[in_conn.linkid][filekey] + + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) + + except KeyError: + pass + + try: + del self._conn_filekeymap[out_conn.linkid][filekey] + + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) + + except KeyError: + pass + if src_linkid == self._linkid: try: info = self._info_filekeymap.pop(filekey) assert info['filesize'] == filesize - self._conn_filekeymap[out_conn.linkid][filekey] = {} + self._add_wait_filekey(filekey,filesize,None,out_conn,_send_fail) + out_conn.send_file(filekey,info['filepath']) except KeyError: @@ -242,6 +304,8 @@ class Proxy: print('test start') in_conn = self._request_conn(src_linkid) + self._add_wait_filekey(filekey,filesize,in_conn,out_conn,_bridge_fail) + send_fn = out_conn.send_filedata(filekey,filesize) in_conn.recv_filedata(filekey,filesize,send_fn) @@ -286,10 +350,28 @@ class Proxy: elif msg_type == self.MSGTYPE_SENDFILE: self._recv_msg_sendfile(conn,msg) + elif msg_type == self.MSGTYPE_ABORTFILE: + self._recv_msg_abortfile(conn,msg) + def _conn_close_cb(self,conn): self.del_conn(conn) print('connection close') + + def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback): + def __call(err): + self._ioloop.remove_timeout(timer) + fail_callback(err) + + callback = tornado.stack_context.wrap(__call) + + timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')) + + if in_conn != None: + self._conn_filekeymap[in_conn.linkid][filekey] = callback + if out_conn != None: + self._conn_filekeymap[out_conn.linkid][filekey] = callback + def _check_waitcaller(self): wait_maps = self._conn_retidmap.values() for wait_map in wait_maps: @@ -371,6 +453,29 @@ class Proxy: __call() + def _send_msg_abortfile(self,conn,filekey,err): + msg = { + 'type':self.MSGTYPE_ABORTFILE, + 'filekey':filekey, + 'error':err + } + + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_abortfile(self,conn,msg): + @async.caller + def __call(): + try: + self._conn_filekeymap[conn.linkid][filekey](err) + + except: + pass + + filekey = msg['filekey'] + err = msg['error'] + + __call() + @async.caller def _pend_recvfile(self,iden,param): filekey = param['filekey'] diff --git a/src/py/netio.py b/src/py/netio.py index 8385b58..2910207 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -157,6 +157,9 @@ class SocketStream: try: while True: buf = self._sock.recv(size) + if len(buf) == 0: + self.close() + return iocb[2](buf) size -= len(buf) @@ -175,6 +178,9 @@ class SocketStream: try: while True: buf = self._sock.recv(min(size,65536)) + if len(buf) == 0: + self.close() + return os.write(iocb[2],buf) size -= len(buf) @@ -207,7 +213,12 @@ class SocketStream: try: while True: - off += self._sock.send(buf[off:]) + ret = self._sock.send(buf[off:]) + if ret == 0: + self.close() + return + + off += ret if off == len(buf): if iocb[3] != None: @@ -227,7 +238,12 @@ class SocketStream: try: while True: - size -= os.sendfile(sockfd,filefd,None,min(size,65536)) + ret = os.sendfile(sockfd,filefd,None,min(size,65536)) + if ret == 0: + self.close() + return + + size -= ret if size == 0: if iocb[3] != None: @@ -259,7 +275,7 @@ class SocketConnection(Connection): super().__init__(linkclass,linkid) self._ioloop = tornado.ioloop.IOLoop.current() - self._stream_filekeymap = {} + self._sendfile_filekeymap = {} self.main_stream = main_stream self.main_stream.set_close_callback(lambda conn : self.close()) @@ -268,11 +284,14 @@ class SocketConnection(Connection): self._start_ping() def send_msg(self,data): + if self._closed == True: + raise ConnectionError + self.main_stream.write(struct.pack('l',len(data)) + data) def send_file(self,filekey,filepath): def _conn_cb(): - self._stream_filekeymap[filekey] = file_stream + self._add_sendfile(filekey,_fail_cb) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', @@ -282,25 +301,68 @@ class SocketConnection(Connection): file_stream.sendfile(fd,_done_cb) def _done_cb(): + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) + print('send done') + def _fail_cb(): + try: + del self._sendfile_filekeymap[filekey] + + except: + return + + file_stream.close() + os.close(fd) + + if self._closed == True: + raise ConnectionError + fd = os.open(filepath,os.O_RDONLY) filesize = os.fstat(fd).st_size file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + file_stream.set_close_callback(lambda stream : _fail_cb()) file_stream.connect(_conn_cb) def recv_file(self,filekey,filesize,filepath): - def _conn_cb(file_stream): - self._stream_filekeymap[filekey] = file_stream + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + file_stream.set_close_callback(lambda stream : _fail_cb()) + self._add_sendfile(filekey,_fail_cb) + file_stream.recvfile(fd,filesize,_done_cb) def _done_cb(): + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) print('recv done') print(time.perf_counter() - st) + def _fail_cb(): + try: + del self._sendfile_filekeymap[filekey] + + except: + return + + file_stream.close() + + os.close(fd) + os.remove(filepath) + + if self._closed == True: + raise ConnectionError + + file_stream = None + self.add_pend_filestream(filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) @@ -308,10 +370,7 @@ class SocketConnection(Connection): def send_filedata(self,filekey,filesize): def _conn_cb(): - nonlocal file_stream - - file_stream = stream - self._stream_filekeymap[filekey] = file_stream + self._add_sendfile(filekey,lambda : file_stream.close()) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', @@ -323,22 +382,40 @@ class SocketConnection(Connection): def _send_cb(data): file_stream.write(data) - file_stream = None - stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + if self._closed == True: + raise ConnectionError + + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) old_gr = imc.async.current() - stream.connect(_conn_cb) + file_stream.connect(_conn_cb) imc.async.switchtop() return _send_cb def recv_filedata(self,filekey,filesize,callback): - def _conn_cb(file_stream): - self._stream_filekeymap[filekey] = file_stream + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + self._add_sendfile(filekey,lambda : file_stream.close()) + file_stream.read_bytes(filesize,callback,nonbuf = True) + if self._closed == True: + raise ConnectionError + + file_stream = None + self.add_pend_filestream(filekey,_conn_cb) + def abort_file(self,filekey): + try: + self._sendfile_filekeymap[filekey]() + + except KeyError: + pass + def start_recv(self,recv_callback): def _recv_size(data): size, = struct.unpack('l',data) @@ -364,10 +441,17 @@ class SocketConnection(Connection): except AttributeError: pass + if self._closed == True: + return + + self._closed = True self.main_stream.close() super().close() + def _add_sendfile(self,filekey,fail_cb): + self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + def _start_ping(self): def __check(): try: @@ -378,6 +462,7 @@ class SocketConnection(Connection): self._ping_delay += 1 if self._ping_delay > 10: + print(self.linkid) self.close() self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000) @@ -392,9 +477,15 @@ class WebSocketConnection(Connection): self.handler = handler def send_msg(self,data): + if self._closed == True: + raise ConnectionError + self.handler.write_message(data,True) def recv_msg(self,data): + if self._closed == True: + raise ConnectionError + self._recv_callback(self,data) def start_recv(self,recv_callback): |