aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/netio.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/netio.py')
-rw-r--r--src/py/netio.py121
1 files changed, 106 insertions, 15 deletions
diff --git a/src/py/netio.py b/src/py/netio.py
index 8385b58..2910207 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -157,6 +157,9 @@ class SocketStream:
try:
while True:
buf = self._sock.recv(size)
+ if len(buf) == 0:
+ self.close()
+ return
iocb[2](buf)
size -= len(buf)
@@ -175,6 +178,9 @@ class SocketStream:
try:
while True:
buf = self._sock.recv(min(size,65536))
+ if len(buf) == 0:
+ self.close()
+ return
os.write(iocb[2],buf)
size -= len(buf)
@@ -207,7 +213,12 @@ class SocketStream:
try:
while True:
- off += self._sock.send(buf[off:])
+ ret = self._sock.send(buf[off:])
+ if ret == 0:
+ self.close()
+ return
+
+ off += ret
if off == len(buf):
if iocb[3] != None:
@@ -227,7 +238,12 @@ class SocketStream:
try:
while True:
- size -= os.sendfile(sockfd,filefd,None,min(size,65536))
+ ret = os.sendfile(sockfd,filefd,None,min(size,65536))
+ if ret == 0:
+ self.close()
+ return
+
+ size -= ret
if size == 0:
if iocb[3] != None:
@@ -259,7 +275,7 @@ class SocketConnection(Connection):
super().__init__(linkclass,linkid)
self._ioloop = tornado.ioloop.IOLoop.current()
- self._stream_filekeymap = {}
+ self._sendfile_filekeymap = {}
self.main_stream = main_stream
self.main_stream.set_close_callback(lambda conn : self.close())
@@ -268,11 +284,14 @@ class SocketConnection(Connection):
self._start_ping()
def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self.main_stream.write(struct.pack('l',len(data)) + data)
def send_file(self,filekey,filepath):
def _conn_cb():
- self._stream_filekeymap[filekey] = file_stream
+ self._add_sendfile(filekey,_fail_cb)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
@@ -282,25 +301,68 @@ class SocketConnection(Connection):
file_stream.sendfile(fd,_done_cb)
def _done_cb():
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
os.close(fd)
+
print('send done')
+ def _fail_cb():
+ try:
+ del self._sendfile_filekeymap[filekey]
+
+ except:
+ return
+
+ file_stream.close()
+ os.close(fd)
+
+ if self._closed == True:
+ raise ConnectionError
+
fd = os.open(filepath,os.O_RDONLY)
filesize = os.fstat(fd).st_size
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ file_stream.set_close_callback(lambda stream : _fail_cb())
file_stream.connect(_conn_cb)
def recv_file(self,filekey,filesize,filepath):
- def _conn_cb(file_stream):
- self._stream_filekeymap[filekey] = file_stream
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ file_stream.set_close_callback(lambda stream : _fail_cb())
+ self._add_sendfile(filekey,_fail_cb)
+
file_stream.recvfile(fd,filesize,_done_cb)
def _done_cb():
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
os.close(fd)
print('recv done')
print(time.perf_counter() - st)
+ def _fail_cb():
+ try:
+ del self._sendfile_filekeymap[filekey]
+
+ except:
+ return
+
+ file_stream.close()
+
+ os.close(fd)
+ os.remove(filepath)
+
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
self.add_pend_filestream(filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
@@ -308,10 +370,7 @@ class SocketConnection(Connection):
def send_filedata(self,filekey,filesize):
def _conn_cb():
- nonlocal file_stream
-
- file_stream = stream
- self._stream_filekeymap[filekey] = file_stream
+ self._add_sendfile(filekey,lambda : file_stream.close())
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
@@ -323,22 +382,40 @@ class SocketConnection(Connection):
def _send_cb(data):
file_stream.write(data)
- file_stream = None
- stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
old_gr = imc.async.current()
- stream.connect(_conn_cb)
+ file_stream.connect(_conn_cb)
imc.async.switchtop()
return _send_cb
def recv_filedata(self,filekey,filesize,callback):
- def _conn_cb(file_stream):
- self._stream_filekeymap[filekey] = file_stream
+ def _conn_cb(stream):
+ nonlocal file_stream
+
+ file_stream = stream
+ self._add_sendfile(filekey,lambda : file_stream.close())
+
file_stream.read_bytes(filesize,callback,nonbuf = True)
+ if self._closed == True:
+ raise ConnectionError
+
+ file_stream = None
+
self.add_pend_filestream(filekey,_conn_cb)
+ def abort_file(self,filekey):
+ try:
+ self._sendfile_filekeymap[filekey]()
+
+ except KeyError:
+ pass
+
def start_recv(self,recv_callback):
def _recv_size(data):
size, = struct.unpack('l',data)
@@ -364,10 +441,17 @@ class SocketConnection(Connection):
except AttributeError:
pass
+ if self._closed == True:
+ return
+
+ self._closed = True
self.main_stream.close()
super().close()
+ def _add_sendfile(self,filekey,fail_cb):
+ self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+
def _start_ping(self):
def __check():
try:
@@ -378,6 +462,7 @@ class SocketConnection(Connection):
self._ping_delay += 1
if self._ping_delay > 10:
+ print(self.linkid)
self.close()
self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000)
@@ -392,9 +477,15 @@ class WebSocketConnection(Connection):
self.handler = handler
def send_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self.handler.write_message(data,True)
def recv_msg(self,data):
+ if self._closed == True:
+ raise ConnectionError
+
self._recv_callback(self,data)
def start_recv(self,recv_callback):