aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-06-03 01:51:21 +0800
committerpzread <netfirewall@gmail.com>2013-06-03 01:51:21 +0800
commit13445dbeb426347354b61bd46f738f37b25ffd60 (patch)
treee498e942970958241dcbee9eb43840ae26377cb1
parentc00472c8bfbcc86780ad25f7be687815432c4a37 (diff)
downloadtaiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.tar.gz
taiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.tar.zst
taiwan-online-judge-13445dbeb426347354b61bd46f738f37b25ffd60.zip
Fix few bugs. Sendfile pass stress test A
-rw-r--r--src/py/backend_server.py64
-rw-r--r--src/py/center_server.py18
-rwxr-xr-xsrc/py/imc/proxy.py136
-rw-r--r--src/py/netio.py141
4 files changed, 197 insertions, 162 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index edfee8d..3c4f1ba 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -37,9 +37,11 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def start(self):
sock_port = random.randrange(4096,8192)
- self.listen(sock_port)
self.sock_addr = ('127.0.0.1',sock_port)
+ self.bind(sock_port,'',socket.AF_INET,65536)
+ super().start()
+
self._conn_center()
def handle_stream(self,stream,addr):
@@ -55,7 +57,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
fd = stream.fileno()
self._ioloop.remove_handler(fd)
- sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr)
+ sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0))
netio.recv_pack(sock_stream,_recv_conn_info)
@@ -94,14 +96,13 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._linkid = iden['linkid']
Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid)
- self.center_conn = SocketConnection('center',info['center_linkid'],stream)
+ self.center_conn = SocketConnection('center',info['center_linkid'],stream,self.center_addr)
self.center_conn.add_close_callback(__retry)
Proxy.instance.add_conn(self.center_conn)
-
imc_register_call('','test_dst',self._test_dst)
#imc_register_call('','test_dsta',self._test_dsta)
- time.sleep(1)
+ time.sleep(2)
#if int(self._linkid) == 2:
self._test_call(None,'9')
@@ -116,13 +117,18 @@ class BackendWorker(tornado.tcpserver.TCPServer):
}),'utf-8'))
netio.recv_pack(stream,___recv_info_cb)
- stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr)
+ stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
stream.set_close_callback(__retry)
- stream.connect(__send_worker_info)
+ stream.connect(self.center_addr,__send_worker_info)
def _conn_linkid(self,linkid):
def __handle_pend(conn):
- retids = self._pend_mainconn_linkidmap.pop(worker_linkid)
+ try:
+ retids = self._pend_mainconn_linkidmap.pop(worker_linkid)
+
+ except KeyError:
+ return
+
for retid in retids:
imc.async.ret(retid,conn)
@@ -134,17 +140,20 @@ class BackendWorker(tornado.tcpserver.TCPServer):
main_stream.close()
else:
+ sock_ip,sock_port = self.sock_addr
netio.send_pack(main_stream,bytes(json.dumps({
'conntype':'main',
'linkclass':'backend',
- 'linkid':self._linkid
+ 'linkid':self._linkid,
+ 'sock_ip':sock_ip,
+ 'sock_port':sock_port
}),'utf-8'))
netio.recv_pack(main_stream,__recv_cb)
def __recv_cb(data):
stat = json.loads(data.decode('utf-8'))
if stat == True:
- conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream)
+ conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,sock_addr,self._add_pend_filestream)
Proxy.instance.add_conn(conn)
__handle_pend(conn)
@@ -177,9 +186,9 @@ class BackendWorker(tornado.tcpserver.TCPServer):
sock_addr = (ret['sock_ip'],ret['sock_port'])
- main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr)
+ main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
main_stream.set_close_callback(lambda conn : __handle_pend(None))
- main_stream.connect(__conn_cb)
+ main_stream.connect(sock_addr,__conn_cb)
return imc.async.switch_top()
@@ -187,33 +196,28 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback)
def _handle_mainconn(self,main_stream,addr,info):
- def __send_back(stat):
- if stat == True:
- conn = SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream)
- Proxy.instance.add_conn(conn)
-
- netio.send_pack(main_stream,bytes(json.dumps(stat),'utf-8'))
-
linkclass = info['linkclass']
linkid = info['linkid']
+ sock_ip = info['sock_ip']
+ sock_port = info['sock_port']
conn = Proxy.instance.get_conn(linkid)
if conn != None:
return
- if linkid not in self._pend_mainconn_linkidmap:
- __send_back(True)
+ if (linkid not in self._pend_mainconn_linkidmap) or self._linkid > linkid:
+ conn = SocketConnection(linkclass,linkid,main_stream,(sock_ip,sock_port),self._add_pend_filestream)
+ Proxy.instance.add_conn(conn)
- else:
- if self._linkid > linkid:
- __send_back(True)
-
+ netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8'))
+
+ if linkid in self._pend_mainconn_linkidmap:
retids = self._pend_mainconn_linkidmap.pop(linkid)
for retid in retids:
imc.async.ret(retid,conn)
-
- else:
- __send_back(False)
+
+ else:
+ netio.send_pack(main_stream,bytes(json.dumps(False),'utf-8'))
def _handle_fileconn(self,file_stream,addr,info):
try:
@@ -232,7 +236,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if str((i % 8) + 2) == self._linkid:
continue
- fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','archlinux-2013.05.01-dual.iso')
+ fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','test.py')
dst = '/backend/' + str((i % 8) + 2) + '/'
ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey)
@@ -240,7 +244,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
pend.append(fileres)
for p in pend:
- print(p.wait())
+ self._linkid + ' ' + p.wait()
print(self._linkid)
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 0b32610..ed2de4c 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -30,7 +30,7 @@ class Worker:
'center_linkid':center_linkid
}),'utf-8'))
- conn = SocketConnection(self.linkclass,self.linkid,self.main_stream)
+ conn = SocketConnection(self.linkclass,self.linkid,self.main_stream,self.sock_addr)
conn.add_close_callback(lambda conn : self.close())
Proxy.instance.add_conn(conn)
@@ -89,7 +89,7 @@ class CenterServer(tornado.tcpserver.TCPServer):
fd = stream.fileno()
self._ioloop.remove_handler(fd)
- main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr)
+ main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0))
netio.recv_pack(main_stream,_recv_worker_info)
@@ -139,16 +139,16 @@ class CenterServer(tornado.tcpserver.TCPServer):
linkid = param
try:
- worker = self._worker_linkidmap[linkid]
+ #worker = self._worker_linkidmap[linkid]
- #a = int(iden['linkid'])
- #b = int(linkid)
+ a = int(iden['linkid'])
+ b = int(linkid)
- #if b > a:
- # worker = self._worker_linkidmap[str(a + 1)]
+ if b > a:
+ worker = self._worker_linkidmap[str(a + 1)]
- #else:
- # worker = self._worker_linkidmap[str(a - 1)]
+ else:
+ worker = self._worker_linkidmap[str(a - 1)]
if iden['linkclass'] != 'client':
sock_ip,sock_port = worker.sock_addr
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 2a25323..9f96a73 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -111,13 +111,13 @@ class Proxy:
del conn.link_linkidmap[linkid]
def del_conn(self,conn):
- callbacks = list(self._conn_retidmap[conn.linkid].values())
- for callback in callbacks:
- callback((False,'Eclose'))
+ waits = list(self._conn_retidmap[conn.linkid].values())
+ for wait in waits:
+ wait['callback']((False,'Eclose'))
- callbacks = list(self._conn_filekeymap[conn.linkid].values())
- for callback in callbacks:
- callback('Eclose')
+ waits = list(self._conn_filekeymap[conn.linkid].values())
+ for wait in waits:
+ wait['callback']('Eclose')
linkids = list(conn.link_linkidmap.keys())
for linkid in linkids:
@@ -153,24 +153,28 @@ class Proxy:
'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
- self.call(10000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize})
+ stat,ret = self.call(1000000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize})
+ if stat == False:
+ print('err close ' + ret)
+ raise ConnectionError(ret)
return fileresult
def recvfile(self,filekey,filepath):
- def _fail_cb(err):
+ def _callback(err = None):
try:
- del self._conn_filekeymap[in_conn.linkid][filekey]
-
+ self._del_wait_filekey(in_conn.linkid,filekey)
+
except KeyError:
return
-
- if not in_conn.closed():
- in_conn.abort_file(filekey)
- self._send_msg_abortfile(in_conn,filekey,err)
+
+ if err != None:
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
self._ioloop.add_callback(self._ret_sendfile,filekey,err)
-
+
try:
info = self._info_filekeymap[filekey]
@@ -181,9 +185,9 @@ class Proxy:
filesize = info['filesize']
in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb)
+ self._add_wait_filekey(in_conn.linkid,filekey,filesize,_callback)
- in_conn.recv_file(filekey,filesize,filepath,lambda : self._ret_sendfile(filekey))
+ in_conn.recv_file(filekey,filesize,filepath,_callback)
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
return info['fileresult']
@@ -196,18 +200,18 @@ class Proxy:
return
dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/'])
- self.call(10000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey})
+ self.call(1000000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey})
def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param):
- def __add_wait_caller(in_linkid):
- def ___call(result):
- self._ioloop.remove_timeout(timer)
- self._ret_call(caller_linkid,caller_retid,result)
+ def __add_wait_caller(conn_linkid):
+ self._conn_retidmap[conn_linkid][caller_retid] = {
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds= timeout),lambda : callback(('False','Etimeout'))),
+ 'callback':tornado.stack_context.wrap(lambda result : self._ret_call(caller_linkid,caller_retid,result))
+ }
- callback = tornado.stack_context.wrap(___call)
- timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback(('False','Etimeout')))
-
- self._conn_retidmap[in_linkid][caller_retid] = callback
+ def __del_wait_caller(conn_linkid):
+ wait = self._conn_retidmap[conn_linkid].pop(caller_retid)
+ self._ioloop.remove_timeout(wait['timer'])
def __ret(result):
if caller_linkid == self._linkid:
@@ -245,16 +249,16 @@ class Proxy:
self._auth.change_iden(old_iden)
except KeyError:
- result = (False,'Enoexist')
+ result = (False,'Enoexist1')
- del self._conn_retidmap[self._linkid][caller_retid]
+ __del_wait_caller(self._linkid)
return __ret(result)
else:
conn = self._request_conn(dst_linkid)
if conn == None:
- return __ret((False,'Enoexist'))
+ return __ret((False,'Enoexist2'))
else:
if caller_linkid == self._linkid:
@@ -263,7 +267,7 @@ class Proxy:
result = async.switch_top()
- del self._conn_retidmap[conn.linkid][caller_retid]
+ __del_wait_caller(conn.linkid)
return __ret(result)
@@ -286,36 +290,39 @@ class Proxy:
__ret_remote()
def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
- def __send_fail_cb(err):
+ def __send_cb(err = None):
try:
- del self._conn_filekeymap[out_conn.linkid][filekey]
+ self._del_wait_filekey(out_conn.linkid,filekey)
except KeyError:
return
- if not out_conn.closed():
- out_conn.abort_file(filekey)
- self._send_msg_abortfile(out_conn,filekey,err)
+ if err != None:
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
self._ioloop.add_callback(self._ret_sendfile,filekey,err)
- def __bridge_fail_cb(err):
+ def __bridge_cb(err = None):
try:
- del self._conn_filekeymap[in_conn.linkid][filekey]
-
- if not in_conn.closed():
- in_conn.abort_file(filekey)
- self._send_msg_abortfile(in_conn,filekey,err)
+ self._del_wait_filekey(in_conn,filekey)
+
+ if err != None:
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
except KeyError:
pass
-
+
try:
- del self._conn_filekeymap[out_conn.linkid][filekey]
-
- if not out_conn.closed():
- out_conn.abort_file(filekey)
- self._send_msg_abortfile(out_conn,filekey,err)
+ self._del_wait_filekey(out_conn,filekey)
+
+ if err != None:
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
except KeyError:
pass
@@ -330,31 +337,28 @@ class Proxy:
self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
return
- self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb)
- out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey))
+ self._add_wait_filekey(out_conn.linkid,filekey,filesize,__send_cb)
+ out_conn.send_file(filekey,info['filepath'],__send_cb)
else:
in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb)
+ self._add_wait_filekey(in_conn.linkid,filekey,filesize,__bridge_cb)
+ self._add_wait_filekey(out_conn.linkid,filekey,filesize,__bridge_cb)
- send_fn = out_conn.send_filedata(filekey,filesize)
+ send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
in_conn.recv_filedata(filekey,filesize,send_fn)
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
- def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback):
- def __call(err):
- self._ioloop.remove_timeout(timer)
- fail_callback(err)
-
- callback = tornado.stack_context.wrap(__call)
- timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout'))
-
- if in_conn != None:
- self._conn_filekeymap[in_conn.linkid][filekey] = callback
+ def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback):
+ self._conn_filekeymap[conn_linkid][filekey] = {
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')),
+ 'callback':tornado.stack_context.wrap(callback)
+ }
- if out_conn != None:
- self._conn_filekeymap[out_conn.linkid][filekey] = callback
+ def _del_wait_filekey(self,conn_linkid,filekey):
+ wait = self._conn_filekeymap[conn_linkid].pop(filekey)
+ self._ioloop.remove_timeout(wait['timer'])
def _ret_sendfile(self,filekey,err = None):
try:
@@ -364,7 +368,7 @@ class Proxy:
return
self._ioloop.remove_timeout(info['timer'])
-
+
fileresult = info['fileresult']
if err == None:
fileresult.ret_result('Success')
@@ -484,9 +488,9 @@ class Proxy:
@async.caller
def __call():
try:
- self._conn_filekeymap[conn.linkid][filekey](err)
+ self._conn_filekeymap[conn.linkid][filekey]['callback'](err)
- except:
+ except KeyError:
pass
filekey = msg['filekey']
diff --git a/src/py/netio.py b/src/py/netio.py
index df19627..e148cd6 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -1,11 +1,10 @@
import os
+import traceback
import json
import struct
import socket
from collections import deque
-import time
-
import tornado.ioloop
import tornado.stack_context
@@ -23,7 +22,7 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
class SocketStream:
- def __init__(self,sock,addr):
+ def __init__(self,sock):
self.DATA_BUF = 0
self.DATA_NOBUF = 1
self.DATA_FILE = 2
@@ -43,9 +42,7 @@ class SocketStream:
self._sock.setblocking(False)
self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR)
- self.addr = addr
-
- def connect(self,callback):
+ def connect(self,addr,callback):
if self._closed == True:
raise ConnectionError
@@ -56,7 +53,7 @@ class SocketStream:
self._ioloop.update_handler(self._sock.fileno(),self._stat)
self._conning = True
- self._sock.connect(self.addr)
+ self._sock.connect(addr)
except BlockingIOError:
pass
@@ -122,6 +119,10 @@ class SocketStream:
self._close_callback(self)
def _handle_event(self,fd,evt):
+ if evt & tornado.ioloop.IOLoop.ERROR:
+ self.close()
+ return
+
if evt & tornado.ioloop.IOLoop.READ:
while len(self._read_queue) > 0:
iocb = self._read_queue[0]
@@ -271,7 +272,7 @@ class SocketStream:
self._ioloop.update_handler(fd,stat)
class SocketConnection(Connection):
- def __init__(self,linkclass,linkid,main_stream,add_pend_filestream_fn = None):
+ def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None):
super().__init__(linkclass,linkid)
self._ioloop = tornado.ioloop.IOLoop.current()
@@ -279,6 +280,7 @@ class SocketConnection(Connection):
self.main_stream = main_stream
self.main_stream.set_close_callback(lambda conn : self.close())
+ self.file_addr = file_addr
self.add_pend_filestream = add_pend_filestream_fn
self._start_ping()
@@ -291,73 +293,69 @@ class SocketConnection(Connection):
def send_file(self,filekey,filepath,callback):
def _conn_cb():
- self._add_sendfile(filekey,_fail_cb)
+ self._add_wait_filekey(filekey,_callback)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
'filekey':filekey
}),'utf-8'))
- file_stream.sendfile(fd,_done_cb)
-
- def _done_cb():
- file_stream.set_close_callback(None)
- file_stream.close()
-
- os.close(fd)
-
- callback()
+ file_stream.sendfile(fd,_callback)
- def _fail_cb():
+ def _callback(err = None):
try:
- del self._sendfile_filekeymap[filekey]
+ self._del_wait_filekey(filekey)
- except:
+ except KeyError:
return
+ file_stream.set_close_callback(None)
file_stream.close()
os.close(fd)
+ if err == None:
+ callback()
+
if self._closed == True:
raise ConnectionError
fd = os.open(filepath,os.O_RDONLY)
filesize = os.fstat(fd).st_size
- file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
- file_stream.set_close_callback(lambda stream : _fail_cb())
- file_stream.connect(_conn_cb)
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
def recv_file(self,filekey,filesize,filepath,callback):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- file_stream.set_close_callback(lambda stream : _fail_cb())
- self._add_sendfile(filekey,_fail_cb)
-
- file_stream.recvfile(fd,filesize,_done_cb)
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
- def _done_cb():
- file_stream.set_close_callback(None)
- file_stream.close()
+ file_stream.recvfile(fd,filesize,_callback)
- os.close(fd)
- print(time.perf_counter() - st)
-
- callback()
-
- def _fail_cb():
+ def _callback(err = None):
try:
- del self._sendfile_filekeymap[filekey]
+ self._del_wait_filekey(filekey)
- except:
+ except KeyError:
return
+ file_stream.set_close_callback(None)
file_stream.close()
-
os.close(fd)
- os.remove(filepath)
+
+ if err == None:
+ callback()
+
+ else:
+ try:
+ os.remove(filepath)
+
+ except FileNotFoundError:
+ pass
if self._closed == True:
raise ConnectionError
@@ -367,41 +365,64 @@ class SocketConnection(Connection):
self.add_pend_filestream(filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
- st = time.perf_counter()
-
- def send_filedata(self,filekey,filesize):
+ def send_filedata(self,filekey,filesize,callback):
def _conn_cb():
- self._add_sendfile(filekey,lambda : file_stream.close())
+ self._add_wait_filekey(filekey,_callback)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
'filekey':filekey
}),'utf-8'))
- old_gr.switch()
+ imc.async.ret(retid)
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
+ if err == None:
+ callback()
def _send_cb(data):
- file_stream.write(data)
+ def __done_cb():
+ nonlocal filesize
+
+ filesize -= len(data)
+ if filesize == 0:
+ _callback()
+
+ file_stream.write(data,__done_cb)
if self._closed == True:
raise ConnectionError
- file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- old_gr = imc.async.current()
- file_stream.connect(_conn_cb)
- imc.async.switchtop()
+ retid = imc.async.get_retid()
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
+ imc.async.switch_top()
return _send_cb
- def recv_filedata(self,filekey,filesize,callback):
+ def recv_filedata(self,filekey,filesize,send_fn):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- self._add_sendfile(filekey,lambda : file_stream.close())
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
- file_stream.read_bytes(filesize,callback,nonbuf = True)
+ file_stream.read_bytes(filesize,send_fn,nonbuf = True)
+
+ def _callback(err = None):
+ file_stream.close()
if self._closed == True:
raise ConnectionError
@@ -412,7 +433,7 @@ class SocketConnection(Connection):
def abort_file(self,filekey):
try:
- self._sendfile_filekeymap[filekey]()
+ self._sendfile_filekeymap[filekey]('Eabort')
except KeyError:
pass
@@ -448,11 +469,18 @@ class SocketConnection(Connection):
self._closed = True
self.main_stream.close()
+ callbacks = list(self._sendfile_filekeymap.values())
+ for callback in callbacks:
+ callback('Eclose')
+
super().close()
- def _add_sendfile(self,filekey,fail_cb):
+ def _add_wait_filekey(self,filekey,fail_cb):
self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+ def _del_wait_filekey(self,filekey):
+ del self._sendfile_filekeymap[filekey]
+
def _start_ping(self):
def __check():
try:
@@ -463,7 +491,6 @@ class SocketConnection(Connection):
self._ping_delay += 1
if self._ping_delay > 10:
- print(self.linkid)
self.close()
self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000)