diff options
author | pzread <netfirewall@gmail.com> | 2013-06-03 01:51:21 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-06-03 01:51:21 +0800 |
commit | 13445dbeb426347354b61bd46f738f37b25ffd60 (patch) | |
tree | e498e942970958241dcbee9eb43840ae26377cb1 | |
parent | c00472c8bfbcc86780ad25f7be687815432c4a37 (diff) | |
download | taiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.tar.gz taiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.tar.zst taiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.zip |
Fix few bugs. Sendfile pass stress test A
-rw-r--r-- | src/py/backend_server.py | 64 | ||||
-rw-r--r-- | src/py/center_server.py | 18 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 136 | ||||
-rw-r--r-- | src/py/netio.py | 141 |
4 files changed, 197 insertions, 162 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index edfee8d..3c4f1ba 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -37,9 +37,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): def start(self): sock_port = random.randrange(4096,8192) - self.listen(sock_port) self.sock_addr = ('127.0.0.1',sock_port) + self.bind(sock_port,'',socket.AF_INET,65536) + super().start() + self._conn_center() def handle_stream(self,stream,addr): @@ -55,7 +57,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): fd = stream.fileno() self._ioloop.remove_handler(fd) - sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr) + sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) netio.recv_pack(sock_stream,_recv_conn_info) @@ -94,14 +96,13 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._linkid = iden['linkid'] Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid) - self.center_conn = SocketConnection('center',info['center_linkid'],stream) + self.center_conn = SocketConnection('center',info['center_linkid'],stream,self.center_addr) self.center_conn.add_close_callback(__retry) Proxy.instance.add_conn(self.center_conn) - imc_register_call('','test_dst',self._test_dst) #imc_register_call('','test_dsta',self._test_dsta) - time.sleep(1) + time.sleep(2) #if int(self._linkid) == 2: self._test_call(None,'9') @@ -116,13 +117,18 @@ class BackendWorker(tornado.tcpserver.TCPServer): }),'utf-8')) netio.recv_pack(stream,___recv_info_cb) - stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr) + stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) stream.set_close_callback(__retry) - stream.connect(__send_worker_info) + stream.connect(self.center_addr,__send_worker_info) def _conn_linkid(self,linkid): def __handle_pend(conn): - retids = self._pend_mainconn_linkidmap.pop(worker_linkid) + try: + retids = self._pend_mainconn_linkidmap.pop(worker_linkid) + + except KeyError: + return + for retid in retids: imc.async.ret(retid,conn) @@ -134,17 +140,20 @@ class BackendWorker(tornado.tcpserver.TCPServer): main_stream.close() else: + sock_ip,sock_port = self.sock_addr netio.send_pack(main_stream,bytes(json.dumps({ 'conntype':'main', 'linkclass':'backend', - 'linkid':self._linkid + 'linkid':self._linkid, + 'sock_ip':sock_ip, + 'sock_port':sock_port }),'utf-8')) netio.recv_pack(main_stream,__recv_cb) def __recv_cb(data): stat = json.loads(data.decode('utf-8')) if stat == True: - conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream) + conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,sock_addr,self._add_pend_filestream) Proxy.instance.add_conn(conn) __handle_pend(conn) @@ -177,9 +186,9 @@ class BackendWorker(tornado.tcpserver.TCPServer): sock_addr = (ret['sock_ip'],ret['sock_port']) - main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr) + main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) main_stream.set_close_callback(lambda conn : __handle_pend(None)) - main_stream.connect(__conn_cb) + main_stream.connect(sock_addr,__conn_cb) return imc.async.switch_top() @@ -187,33 +196,28 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) def _handle_mainconn(self,main_stream,addr,info): - def __send_back(stat): - if stat == True: - conn = SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream) - Proxy.instance.add_conn(conn) - - netio.send_pack(main_stream,bytes(json.dumps(stat),'utf-8')) - linkclass = info['linkclass'] linkid = info['linkid'] + sock_ip = info['sock_ip'] + sock_port = info['sock_port'] conn = Proxy.instance.get_conn(linkid) if conn != None: return - if linkid not in self._pend_mainconn_linkidmap: - __send_back(True) + if (linkid not in self._pend_mainconn_linkidmap) or self._linkid > linkid: + conn = SocketConnection(linkclass,linkid,main_stream,(sock_ip,sock_port),self._add_pend_filestream) + Proxy.instance.add_conn(conn) - else: - if self._linkid > linkid: - __send_back(True) - + netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8')) + + if linkid in self._pend_mainconn_linkidmap: retids = self._pend_mainconn_linkidmap.pop(linkid) for retid in retids: imc.async.ret(retid,conn) - - else: - __send_back(False) + + else: + netio.send_pack(main_stream,bytes(json.dumps(False),'utf-8')) def _handle_fileconn(self,file_stream,addr,info): try: @@ -232,7 +236,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): if str((i % 8) + 2) == self._linkid: continue - fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','archlinux-2013.05.01-dual.iso') + fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','test.py') dst = '/backend/' + str((i % 8) + 2) + '/' ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey) @@ -240,7 +244,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): pend.append(fileres) for p in pend: - print(p.wait()) + self._linkid + ' ' + p.wait() print(self._linkid) diff --git a/src/py/center_server.py b/src/py/center_server.py index 0b32610..ed2de4c 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -30,7 +30,7 @@ class Worker: 'center_linkid':center_linkid }),'utf-8')) - conn = SocketConnection(self.linkclass,self.linkid,self.main_stream) + conn = SocketConnection(self.linkclass,self.linkid,self.main_stream,self.sock_addr) conn.add_close_callback(lambda conn : self.close()) Proxy.instance.add_conn(conn) @@ -89,7 +89,7 @@ class CenterServer(tornado.tcpserver.TCPServer): fd = stream.fileno() self._ioloop.remove_handler(fd) - main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr) + main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) netio.recv_pack(main_stream,_recv_worker_info) @@ -139,16 +139,16 @@ class CenterServer(tornado.tcpserver.TCPServer): linkid = param try: - worker = self._worker_linkidmap[linkid] + #worker = self._worker_linkidmap[linkid] - #a = int(iden['linkid']) - #b = int(linkid) + a = int(iden['linkid']) + b = int(linkid) - #if b > a: - # worker = self._worker_linkidmap[str(a + 1)] + if b > a: + worker = self._worker_linkidmap[str(a + 1)] - #else: - # worker = self._worker_linkidmap[str(a - 1)] + else: + worker = self._worker_linkidmap[str(a - 1)] if iden['linkclass'] != 'client': sock_ip,sock_port = worker.sock_addr diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 2a25323..9f96a73 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -111,13 +111,13 @@ class Proxy: del conn.link_linkidmap[linkid] def del_conn(self,conn): - callbacks = list(self._conn_retidmap[conn.linkid].values()) - for callback in callbacks: - callback((False,'Eclose')) + waits = list(self._conn_retidmap[conn.linkid].values()) + for wait in waits: + wait['callback']((False,'Eclose')) - callbacks = list(self._conn_filekeymap[conn.linkid].values()) - for callback in callbacks: - callback('Eclose') + waits = list(self._conn_filekeymap[conn.linkid].values()) + for wait in waits: + wait['callback']('Eclose') linkids = list(conn.link_linkidmap.keys()) for linkid in linkids: @@ -153,24 +153,28 @@ class Proxy: 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } - self.call(10000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize}) + stat,ret = self.call(1000000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize}) + if stat == False: + print('err close ' + ret) + raise ConnectionError(ret) return fileresult def recvfile(self,filekey,filepath): - def _fail_cb(err): + def _callback(err = None): try: - del self._conn_filekeymap[in_conn.linkid][filekey] - + self._del_wait_filekey(in_conn.linkid,filekey) + except KeyError: return - - if not in_conn.closed(): - in_conn.abort_file(filekey) - self._send_msg_abortfile(in_conn,filekey,err) + + if err != None: + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) self._ioloop.add_callback(self._ret_sendfile,filekey,err) - + try: info = self._info_filekeymap[filekey] @@ -181,9 +185,9 @@ class Proxy: filesize = info['filesize'] in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb) + self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback) - in_conn.recv_file(filekey,filesize,filepath,lambda : self._ret_sendfile(filekey)) + in_conn.recv_file(filekey,filesize,filepath,_callback) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) return info['fileresult'] @@ -196,18 +200,18 @@ class Proxy: return dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) - self.call(10000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey}) + self.call(1000000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey}) def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param): - def __add_wait_caller(in_linkid): - def ___call(result): - self._ioloop.remove_timeout(timer) - self._ret_call(caller_linkid,caller_retid,result) + def __add_wait_caller(conn_linkid): + self._conn_retidmap[conn_linkid][caller_retid] = { + 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds= timeout),lambda : callback(('False','Etimeout'))), + 'callback':tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result)) + } - callback = tornado.stack_context.wrap(___call) - timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback(('False','Etimeout'))) - - self._conn_retidmap[in_linkid][caller_retid] = callback + def __del_wait_caller(conn_linkid): + wait = self._conn_retidmap[conn_linkid].pop(caller_retid) + self._ioloop.remove_timeout(wait['timer']) def __ret(result): if caller_linkid == self._linkid: @@ -245,16 +249,16 @@ class Proxy: self._auth.change_iden(old_iden) except KeyError: - result = (False,'Enoexist') + result = (False,'Enoexist1') - del self._conn_retidmap[self._linkid][caller_retid] + __del_wait_caller(self._linkid) return __ret(result) else: conn = self._request_conn(dst_linkid) if conn == None: - return __ret((False,'Enoexist')) + return __ret((False,'Enoexist2')) else: if caller_linkid == self._linkid: @@ -263,7 +267,7 @@ class Proxy: result = async.switch_top() - del self._conn_retidmap[conn.linkid][caller_retid] + __del_wait_caller(conn.linkid) return __ret(result) @@ -286,36 +290,39 @@ class Proxy: __ret_remote() def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): - def __send_fail_cb(err): + def __send_cb(err = None): try: - del self._conn_filekeymap[out_conn.linkid][filekey] + self._del_wait_filekey(out_conn.linkid,filekey) except KeyError: return - if not out_conn.closed(): - out_conn.abort_file(filekey) - self._send_msg_abortfile(out_conn,filekey,err) + if err != None: + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) self._ioloop.add_callback(self._ret_sendfile,filekey,err) - def __bridge_fail_cb(err): + def __bridge_cb(err = None): try: - del self._conn_filekeymap[in_conn.linkid][filekey] - - if not in_conn.closed(): - in_conn.abort_file(filekey) - self._send_msg_abortfile(in_conn,filekey,err) + self._del_wait_filekey(in_conn,filekey) + + if err != None: + if not in_conn.closed(): + in_conn.abort_file(filekey) + self._send_msg_abortfile(in_conn,filekey,err) except KeyError: pass - + try: - del self._conn_filekeymap[out_conn.linkid][filekey] - - if not out_conn.closed(): - out_conn.abort_file(filekey) - self._send_msg_abortfile(out_conn,filekey,err) + self._del_wait_filekey(out_conn,filekey) + + if err != None: + if not out_conn.closed(): + out_conn.abort_file(filekey) + self._send_msg_abortfile(out_conn,filekey,err) except KeyError: pass @@ -330,31 +337,28 @@ class Proxy: self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') return - self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb) - out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey)) + self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb) + out_conn.send_file(filekey,info['filepath'],__send_cb) else: in_conn = self._request_conn(src_linkid) - self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb) + self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb) + self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb) - send_fn = out_conn.send_filedata(filekey,filesize) + send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) in_conn.recv_filedata(filekey,filesize,send_fn) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) - def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback): - def __call(err): - self._ioloop.remove_timeout(timer) - fail_callback(err) - - callback = tornado.stack_context.wrap(__call) - timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')) - - if in_conn != None: - self._conn_filekeymap[in_conn.linkid][filekey] = callback + def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback): + self._conn_filekeymap[conn_linkid][filekey] = { + 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')), + 'callback':tornado.stack_context.wrap(callback) + } - if out_conn != None: - self._conn_filekeymap[out_conn.linkid][filekey] = callback + def _del_wait_filekey(self,conn_linkid,filekey): + wait = self._conn_filekeymap[conn_linkid].pop(filekey) + self._ioloop.remove_timeout(wait['timer']) def _ret_sendfile(self,filekey,err = None): try: @@ -364,7 +368,7 @@ class Proxy: return self._ioloop.remove_timeout(info['timer']) - + fileresult = info['fileresult'] if err == None: fileresult.ret_result('Success') @@ -484,9 +488,9 @@ class Proxy: @async.caller def __call(): try: - self._conn_filekeymap[conn.linkid][filekey](err) + self._conn_filekeymap[conn.linkid][filekey]['callback'](err) - except: + except KeyError: pass filekey = msg['filekey'] diff --git a/src/py/netio.py b/src/py/netio.py index df19627..e148cd6 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -1,11 +1,10 @@ import os +import traceback import json import struct import socket from collections import deque -import time - import tornado.ioloop import tornado.stack_context @@ -23,7 +22,7 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) class SocketStream: - def __init__(self,sock,addr): + def __init__(self,sock): self.DATA_BUF = 0 self.DATA_NOBUF = 1 self.DATA_FILE = 2 @@ -43,9 +42,7 @@ class SocketStream: self._sock.setblocking(False) self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR) - self.addr = addr - - def connect(self,callback): + def connect(self,addr,callback): if self._closed == True: raise ConnectionError @@ -56,7 +53,7 @@ class SocketStream: self._ioloop.update_handler(self._sock.fileno(),self._stat) self._conning = True - self._sock.connect(self.addr) + self._sock.connect(addr) except BlockingIOError: pass @@ -122,6 +119,10 @@ class SocketStream: self._close_callback(self) def _handle_event(self,fd,evt): + if evt & tornado.ioloop.IOLoop.ERROR: + self.close() + return + if evt & tornado.ioloop.IOLoop.READ: while len(self._read_queue) > 0: iocb = self._read_queue[0] @@ -271,7 +272,7 @@ class SocketStream: self._ioloop.update_handler(fd,stat) class SocketConnection(Connection): - def __init__(self,linkclass,linkid,main_stream,add_pend_filestream_fn = None): + def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None): super().__init__(linkclass,linkid) self._ioloop = tornado.ioloop.IOLoop.current() @@ -279,6 +280,7 @@ class SocketConnection(Connection): self.main_stream = main_stream self.main_stream.set_close_callback(lambda conn : self.close()) + self.file_addr = file_addr self.add_pend_filestream = add_pend_filestream_fn self._start_ping() @@ -291,73 +293,69 @@ class SocketConnection(Connection): def send_file(self,filekey,filepath,callback): def _conn_cb(): - self._add_sendfile(filekey,_fail_cb) + self._add_wait_filekey(filekey,_callback) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', 'filekey':filekey }),'utf-8')) - file_stream.sendfile(fd,_done_cb) - - def _done_cb(): - file_stream.set_close_callback(None) - file_stream.close() - - os.close(fd) - - callback() + file_stream.sendfile(fd,_callback) - def _fail_cb(): + def _callback(err = None): try: - del self._sendfile_filekeymap[filekey] + self._del_wait_filekey(filekey) - except: + except KeyError: return + file_stream.set_close_callback(None) file_stream.close() os.close(fd) + if err == None: + callback() + if self._closed == True: raise ConnectionError fd = os.open(filepath,os.O_RDONLY) filesize = os.fstat(fd).st_size - file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) - file_stream.set_close_callback(lambda stream : _fail_cb()) - file_stream.connect(_conn_cb) + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) def recv_file(self,filekey,filesize,filepath,callback): def _conn_cb(stream): nonlocal file_stream file_stream = stream - file_stream.set_close_callback(lambda stream : _fail_cb()) - self._add_sendfile(filekey,_fail_cb) - - file_stream.recvfile(fd,filesize,_done_cb) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) - def _done_cb(): - file_stream.set_close_callback(None) - file_stream.close() + file_stream.recvfile(fd,filesize,_callback) - os.close(fd) - print(time.perf_counter() - st) - - callback() - - def _fail_cb(): + def _callback(err = None): try: - del self._sendfile_filekeymap[filekey] + self._del_wait_filekey(filekey) - except: + except KeyError: return + file_stream.set_close_callback(None) file_stream.close() - os.close(fd) - os.remove(filepath) + + if err == None: + callback() + + else: + try: + os.remove(filepath) + + except FileNotFoundError: + pass if self._closed == True: raise ConnectionError @@ -367,41 +365,64 @@ class SocketConnection(Connection): self.add_pend_filestream(filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) - st = time.perf_counter() - - def send_filedata(self,filekey,filesize): + def send_filedata(self,filekey,filesize,callback): def _conn_cb(): - self._add_sendfile(filekey,lambda : file_stream.close()) + self._add_wait_filekey(filekey,_callback) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', 'filekey':filekey }),'utf-8')) - old_gr.switch() + imc.async.ret(retid) + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() + + if err == None: + callback() def _send_cb(data): - file_stream.write(data) + def __done_cb(): + nonlocal filesize + + filesize -= len(data) + if filesize == 0: + _callback() + + file_stream.write(data,__done_cb) if self._closed == True: raise ConnectionError - file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - old_gr = imc.async.current() - file_stream.connect(_conn_cb) - imc.async.switchtop() + retid = imc.async.get_retid() + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) + imc.async.switch_top() return _send_cb - def recv_filedata(self,filekey,filesize,callback): + def recv_filedata(self,filekey,filesize,send_fn): def _conn_cb(stream): nonlocal file_stream file_stream = stream - self._add_sendfile(filekey,lambda : file_stream.close()) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) - file_stream.read_bytes(filesize,callback,nonbuf = True) + file_stream.read_bytes(filesize,send_fn,nonbuf = True) + + def _callback(err = None): + file_stream.close() if self._closed == True: raise ConnectionError @@ -412,7 +433,7 @@ class SocketConnection(Connection): def abort_file(self,filekey): try: - self._sendfile_filekeymap[filekey]() + self._sendfile_filekeymap[filekey]('Eabort') except KeyError: pass @@ -448,11 +469,18 @@ class SocketConnection(Connection): self._closed = True self.main_stream.close() + callbacks = list(self._sendfile_filekeymap.values()) + for callback in callbacks: + callback('Eclose') + super().close() - def _add_sendfile(self,filekey,fail_cb): + def _add_wait_filekey(self,filekey,fail_cb): self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + def _del_wait_filekey(self,filekey): + del self._sendfile_filekeymap[filekey] + def _start_ping(self): def __check(): try: @@ -463,7 +491,6 @@ class SocketConnection(Connection): self._ping_delay += 1 if self._ping_delay > 10: - print(self.linkid) self.close() self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000) |