diff options
Diffstat (limited to 'src/py/netio.py')
-rw-r--r-- | src/py/netio.py | 121 |
1 files changed, 106 insertions, 15 deletions
diff --git a/src/py/netio.py b/src/py/netio.py index 8385b58..2910207 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -157,6 +157,9 @@ class SocketStream: try: while True: buf = self._sock.recv(size) + if len(buf) == 0: + self.close() + return iocb[2](buf) size -= len(buf) @@ -175,6 +178,9 @@ class SocketStream: try: while True: buf = self._sock.recv(min(size,65536)) + if len(buf) == 0: + self.close() + return os.write(iocb[2],buf) size -= len(buf) @@ -207,7 +213,12 @@ class SocketStream: try: while True: - off += self._sock.send(buf[off:]) + ret = self._sock.send(buf[off:]) + if ret == 0: + self.close() + return + + off += ret if off == len(buf): if iocb[3] != None: @@ -227,7 +238,12 @@ class SocketStream: try: while True: - size -= os.sendfile(sockfd,filefd,None,min(size,65536)) + ret = os.sendfile(sockfd,filefd,None,min(size,65536)) + if ret == 0: + self.close() + return + + size -= ret if size == 0: if iocb[3] != None: @@ -259,7 +275,7 @@ class SocketConnection(Connection): super().__init__(linkclass,linkid) self._ioloop = tornado.ioloop.IOLoop.current() - self._stream_filekeymap = {} + self._sendfile_filekeymap = {} self.main_stream = main_stream self.main_stream.set_close_callback(lambda conn : self.close()) @@ -268,11 +284,14 @@ class SocketConnection(Connection): self._start_ping() def send_msg(self,data): + if self._closed == True: + raise ConnectionError + self.main_stream.write(struct.pack('l',len(data)) + data) def send_file(self,filekey,filepath): def _conn_cb(): - self._stream_filekeymap[filekey] = file_stream + self._add_sendfile(filekey,_fail_cb) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', @@ -282,25 +301,68 @@ class SocketConnection(Connection): file_stream.sendfile(fd,_done_cb) def _done_cb(): + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) + print('send done') + def _fail_cb(): + try: + del self._sendfile_filekeymap[filekey] + + except: + return + + file_stream.close() + os.close(fd) + + if self._closed == True: + raise ConnectionError + fd = os.open(filepath,os.O_RDONLY) filesize = os.fstat(fd).st_size file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + file_stream.set_close_callback(lambda stream : _fail_cb()) file_stream.connect(_conn_cb) def recv_file(self,filekey,filesize,filepath): - def _conn_cb(file_stream): - self._stream_filekeymap[filekey] = file_stream + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + file_stream.set_close_callback(lambda stream : _fail_cb()) + self._add_sendfile(filekey,_fail_cb) + file_stream.recvfile(fd,filesize,_done_cb) def _done_cb(): + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) print('recv done') print(time.perf_counter() - st) + def _fail_cb(): + try: + del self._sendfile_filekeymap[filekey] + + except: + return + + file_stream.close() + + os.close(fd) + os.remove(filepath) + + if self._closed == True: + raise ConnectionError + + file_stream = None + self.add_pend_filestream(filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) @@ -308,10 +370,7 @@ class SocketConnection(Connection): def send_filedata(self,filekey,filesize): def _conn_cb(): - nonlocal file_stream - - file_stream = stream - self._stream_filekeymap[filekey] = file_stream + self._add_sendfile(filekey,lambda : file_stream.close()) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', @@ -323,22 +382,40 @@ class SocketConnection(Connection): def _send_cb(data): file_stream.write(data) - file_stream = None - stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + if self._closed == True: + raise ConnectionError + + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) old_gr = imc.async.current() - stream.connect(_conn_cb) + file_stream.connect(_conn_cb) imc.async.switchtop() return _send_cb def recv_filedata(self,filekey,filesize,callback): - def _conn_cb(file_stream): - self._stream_filekeymap[filekey] = file_stream + def _conn_cb(stream): + nonlocal file_stream + + file_stream = stream + self._add_sendfile(filekey,lambda : file_stream.close()) + file_stream.read_bytes(filesize,callback,nonbuf = True) + if self._closed == True: + raise ConnectionError + + file_stream = None + self.add_pend_filestream(filekey,_conn_cb) + def abort_file(self,filekey): + try: + self._sendfile_filekeymap[filekey]() + + except KeyError: + pass + def start_recv(self,recv_callback): def _recv_size(data): size, = struct.unpack('l',data) @@ -364,10 +441,17 @@ class SocketConnection(Connection): except AttributeError: pass + if self._closed == True: + return + + self._closed = True self.main_stream.close() super().close() + def _add_sendfile(self,filekey,fail_cb): + self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + def _start_ping(self): def __check(): try: @@ -378,6 +462,7 @@ class SocketConnection(Connection): self._ping_delay += 1 if self._ping_delay > 10: + print(self.linkid) self.close() self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000) @@ -392,9 +477,15 @@ class WebSocketConnection(Connection): self.handler = handler def send_msg(self,data): + if self._closed == True: + raise ConnectionError + self.handler.write_message(data,True) def recv_msg(self,data): + if self._closed == True: + raise ConnectionError + self._recv_callback(self,data) def start_recv(self,recv_callback): |