aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-30 17:41:45 +0800
committerpzread <netfirewall@gmail.com>2013-05-30 17:41:45 +0800
commit05c48532dc67c44d12682de3ece293e6f2a412f3 (patch)
tree164d67f8cc1c3009867e98b9852887be3575941d
parent4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e (diff)
downloadtaiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.tar.gz
taiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.tar.zst
taiwan-online-judge-05c48532dc67c44d12682de3ece293e6f2a412f3.zip
Add fault tolerance to sendfile
-rw-r--r--src/py/backend_server.py13
-rwxr-xr-xsrc/py/imc/proxy.py109
-rw-r--r--src/py/netio.py121
3 files changed, 215 insertions, 28 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 2e91382..2ac0f2b 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -104,7 +104,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
time.sleep(1)
if int(self._linkid) == 2:
- self._test_call(None,'4')
+ self._test_call(None,'9')
sock_ip,sock_port = self.sock_addr
netio.send_pack(stream,bytes(json.dumps({
@@ -225,16 +225,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_call(self,iden,param):
- print('start cold test')
-
- filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
-
- dst = '/backend/' + param + '/'
- ret = imc_call(self._idendesc,dst,'test_dst',filekey)
-
- time.sleep(10)
- print('start warm test')
-
+ param = '5'
filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
dst = '/backend/' + param + '/'
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 8897a7e..65589c3 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -1,6 +1,7 @@
import json
import uuid
import os
+import datetime
import tornado.ioloop
import tornado.stack_context
@@ -14,6 +15,7 @@ class Connection:
self.linkid = linkid
self.link_linkidmap = {}
self._close_callback = []
+ self._closed = False
def send_msg(self,data):
pass
@@ -25,15 +27,20 @@ class Connection:
self._close_callback.append(tornado.stack_context.wrap(callback))
def close(self):
+ self._closed = True
+
for callback in self._close_callback:
callback(self)
+ def closed(self):
+ return self._closed
+
class Proxy:
def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None):
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
self.MSGTYPE_SENDFILE = 'sendfile'
- self.MSGTYPE_RECVFILE = 'recvfile'
+ self.MSGTYPE_ABORTFILE = 'abortfile'
self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkclass = linkclass
@@ -91,6 +98,15 @@ class Proxy:
for linkid,retid in wait_ret:
self._ret_call(linkid,retid,(False,'Eclose'))
+ fail_map = self._conn_filekeymap[conn.linkid]
+ fails = fail_map.values()
+ fail_cb = []
+ for callback in fails:
+ fail_cb.append(callback)
+
+ for callback in fail_cb:
+ callback('Eclose')
+
linkids = conn.link_linkidmap.keys()
link_del = []
for linkid in linkids:
@@ -139,12 +155,25 @@ class Proxy:
return filekey
def recvfile(self,filekey,filepath):
+ def _fail_cb(err):
+ try:
+ del self._conn_filekeymap[in_conn.linkid][filekey]
+
+ except KeyError:
+ return
+
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
+
try:
info = self._info_filekeymap.pop(filekey)
src_linkid = info['src_linkid']
filesize = info['filesize']
in_conn = self._request_conn(src_linkid)
+ self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb)
+
in_conn.recv_file(filekey,filesize,filepath)
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
@@ -224,12 +253,45 @@ class Proxy:
return
def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
+ def _send_fail(err):
+ try:
+ del self._conn_filekeymap[out_conn.linkid][filekey]
+
+ except KeyError:
+ return
+
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
+
+ def _bridge_fail(err):
+ try:
+ del self._conn_filekeymap[in_conn.linkid][filekey]
+
+ if not in_conn.closed():
+ in_conn.abort_file(filekey)
+ self._send_msg_abortfile(in_conn,filekey,err)
+
+ except KeyError:
+ pass
+
+ try:
+ del self._conn_filekeymap[out_conn.linkid][filekey]
+
+ if not out_conn.closed():
+ out_conn.abort_file(filekey)
+ self._send_msg_abortfile(out_conn,filekey,err)
+
+ except KeyError:
+ pass
+
if src_linkid == self._linkid:
try:
info = self._info_filekeymap.pop(filekey)
assert info['filesize'] == filesize
- self._conn_filekeymap[out_conn.linkid][filekey] = {}
+ self._add_wait_filekey(filekey,filesize,None,out_conn,_send_fail)
+
out_conn.send_file(filekey,info['filepath'])
except KeyError:
@@ -242,6 +304,8 @@ class Proxy:
print('test start')
in_conn = self._request_conn(src_linkid)
+ self._add_wait_filekey(filekey,filesize,in_conn,out_conn,_bridge_fail)
+
send_fn = out_conn.send_filedata(filekey,filesize)
in_conn.recv_filedata(filekey,filesize,send_fn)
@@ -286,10 +350,28 @@ class Proxy:
elif msg_type == self.MSGTYPE_SENDFILE:
self._recv_msg_sendfile(conn,msg)
+ elif msg_type == self.MSGTYPE_ABORTFILE:
+ self._recv_msg_abortfile(conn,msg)
+
def _conn_close_cb(self,conn):
self.del_conn(conn)
print('connection close')
+
+ def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback):
+ def __call(err):
+ self._ioloop.remove_timeout(timer)
+ fail_callback(err)
+
+ callback = tornado.stack_context.wrap(__call)
+
+ timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout'))
+
+ if in_conn != None:
+ self._conn_filekeymap[in_conn.linkid][filekey] = callback
+ if out_conn != None:
+ self._conn_filekeymap[out_conn.linkid][filekey] = callback
+
def _check_waitcaller(self):
wait_maps = self._conn_retidmap.values()
for wait_map in wait_maps:
@@ -371,6 +453,29 @@ class Proxy:
__call()
+ def _send_msg_abortfile(self,conn,filekey,err):
+ msg = {
+ 'type':self.MSGTYPE_ABORTFILE,
+ 'filekey':filekey,
+ 'error':err
+ }
+
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_abortfile(self,conn,msg):
+ @async.caller
+ def __call():
+ try:
+ self._conn_filekeymap[conn.linkid][filekey](err)
+
+ except:
+ pass
+
+ filekey = msg['filekey']
+ err = msg['error']
+
+ __call()
+
@async.caller
def _pend_recvfile(self,iden,param):
filekey = param['filekey']
diff --git a/src/py/netio.py b/src/py/netio.py
index 8385b58..2910207 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -157,6 +157,9 @@ class SocketStream:
try:
while True:
buf = self._sock.recv(size)
+ if len(buf) == 0:
+ self.close()
+ return
iocb[2](buf)
size -= len(buf)
@@ -175,6 +178,9 @@ class SocketStream:
try:
while True:
buf = self._sock.recv(min(size,65536))
+ if len(buf) == 0:
+ self.close()
+ return
os.write(iocb[2],buf)
size -= len(buf)
@@ -207,7 +213,12 @@ class SocketStream:
try:
while True:
- off += self._sock.send(buf[off:])
+ ret = self._sock.send(buf[off:])
+ if ret == 0:
+ self.close()
+ return
+
+ off += ret
if off == len(buf):
if iocb[3] != None:
@@ -227,7 +238,12 @@ class SocketStream:
try:
while True:
- size -= os.sendfile(sockfd,filefd,None,min(size,65536))
+ ret = os.sendfile(sockfd,filefd,None,min(size,65536))
+ if ret == 0:
+ self.close()
+ return
+
+ size -= ret
if size == 0:
if iocb[3] != None:
@@ -259,7 +275,7 @@ class SocketConnection(Connection):
super().__init__(linkclass,linkid)
self._ioloop = tornado.ioloop.IOLoop.current()
- self._stream_filekeymap = {}
+ self._sendfile_filekeymap = {}
self.main_stream = main_stream
self.main_stream.set_close_callback(lambda conn : self.close())
@@ -268,11 +284,14 @@ class SocketConnection(Connection):
self._start_ping()
def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self.main_stream.write(struct.pack('l',len(data)) + data)
def send_file(self,filekey,filepath):
def _conn_cb():
- self._stream_filekeymap[filekey] = file_stream
+ self._add_sendfile(filekey,_fail_cb)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
@@ -282,25 +301,68 @@ class SocketConnection(Connection):
file_stream.sendfile(fd,_done_cb)
def _done_cb():
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
os.close(fd)
+
print('send done')
+ def _fail_cb():
+ try:
+ del self._sendfile_filekeymap[filekey]
+
+ except:
+ return
+
+ file_stream.close()
+ os.close(fd)
+
+ if self._closed == True:
+ raise ConnectionError
+
fd = os.open(filepath,os.O_RDONLY)
filesize = os.fstat(fd).st_size
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ file_stream.set_close_callback(lambda stream : _fail_cb())
file_stream.connect(_conn_cb)
def recv_file(self,filekey,filesize,filepath):
- def _conn_cb(file_stream):
- self._stream_filekeymap[filekey] = file_stream
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ file_stream.set_close_callback(lambda stream : _fail_cb())
+ self._add_sendfile(filekey,_fail_cb)
+
file_stream.recvfile(fd,filesize,_done_cb)
def _done_cb():
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
os.close(fd)
print('recv done')
print(time.perf_counter() - st)
+ def _fail_cb():
+ try:
+ del self._sendfile_filekeymap[filekey]
+
+ except:
+ return
+
+ file_stream.close()
+
+ os.close(fd)
+ os.remove(filepath)
+
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
self.add_pend_filestream(filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
@@ -308,10 +370,7 @@ class SocketConnection(Connection):
def send_filedata(self,filekey,filesize):
def _conn_cb():
- nonlocal file_stream
-
- file_stream = stream
- self._stream_filekeymap[filekey] = file_stream
+ self._add_sendfile(filekey,lambda : file_stream.close())
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
@@ -323,22 +382,40 @@ class SocketConnection(Connection):
def _send_cb(data):
file_stream.write(data)
- file_stream = None
- stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
old_gr = imc.async.current()
- stream.connect(_conn_cb)
+ file_stream.connect(_conn_cb)
imc.async.switchtop()
return _send_cb
def recv_filedata(self,filekey,filesize,callback):
- def _conn_cb(file_stream):
- self._stream_filekeymap[filekey] = file_stream
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ self._add_sendfile(filekey,lambda : file_stream.close())
+
file_stream.read_bytes(filesize,callback,nonbuf = True)
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
self.add_pend_filestream(filekey,_conn_cb)
+ def abort_file(self,filekey):
+ try:
+ self._sendfile_filekeymap[filekey]()
+
+ except KeyError:
+ pass
+
def start_recv(self,recv_callback):
def _recv_size(data):
size, = struct.unpack('l',data)
@@ -364,10 +441,17 @@ class SocketConnection(Connection):
except AttributeError:
pass
+ if self._closed == True:
+ return
+
+ self._closed = True
self.main_stream.close()
super().close()
+ def _add_sendfile(self,filekey,fail_cb):
+ self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+
def _start_ping(self):
def __check():
try:
@@ -378,6 +462,7 @@ class SocketConnection(Connection):
self._ping_delay += 1
if self._ping_delay > 10:
+ print(self.linkid)
self.close()
self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000)
@@ -392,9 +477,15 @@ class WebSocketConnection(Connection):
self.handler = handler
def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self.handler.write_message(data,True)
def recv_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self._recv_callback(self,data)
def start_recv(self,recv_callback):