diff options
Diffstat (limited to 'src/py/netio.py')
-rwxr-xr-x | src/py/netio.py | 81 |
1 files changed, 43 insertions, 38 deletions
diff --git a/src/py/netio.py b/src/py/netio.py index aa2f9b2..045b94c 100755 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -42,11 +42,18 @@ class SocketStream: self._sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1) self._sock.setblocking(False) self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR) + + def _check_close(f): + def wrap(self,*args): + if self._closed == True: + raise ConnectionError - def connect(self,addr,callback): - if self._closed == True: - raise ConnectionError + return f(self,*args) + + return wrap + @_check_close + def connect(self,addr,callback): try: self._conn_callback = tornado.stack_context.wrap(callback) @@ -59,10 +66,8 @@ class SocketStream: except BlockingIOError: pass + @_check_close def read_bytes(self,size,callback = None,nonbuf = False): - if self._closed == True: - raise ConnectionError - if nonbuf == False: self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)]) @@ -72,19 +77,15 @@ class SocketStream: self._stat |= tornado.ioloop.IOLoop.READ self._ioloop.update_handler(self._sock.fileno(),self._stat) + @_check_close def write(self,buf,callback = None): - if self._closed == True: - raise ConnectionError - self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)]) self._stat |= tornado.ioloop.IOLoop.WRITE self._ioloop.update_handler(self._sock.fileno(),self._stat) + @_check_close def sendfile(self,fd,callback = None): - if self._closed == True: - raise ConnectionError - size = os.fstat(fd).st_size self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) @@ -92,10 +93,8 @@ class SocketStream: self._stat |= tornado.ioloop.IOLoop.WRITE self._ioloop.update_handler(self._sock.fileno(),self._stat) + @_check_close def recvfile(self,fd,size,callback = None): - if self._closed == True: - raise ConnectionError - self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) self._stat |= tornado.ioloop.IOLoop.READ @@ -118,7 +117,7 @@ class SocketStream: if self._close_callback != None: self._close_callback(self) - + def _handle_event(self,fd,evt): if evt & tornado.ioloop.IOLoop.ERROR: print(os.strerror(self._sock.getsockopt(socket.SOL_SOCKET,socket.SO_ERROR))) @@ -305,12 +304,20 @@ class SocketConnection(Connection): self.file_addr = file_addr self.add_pend_filestream = add_pend_filestream_fn - def send_msg(self,data): - if self._closed == True: - raise ConnectionError + def _check_close(f): + def wrap(self,*args): + if self._closed == True: + raise ConnectionError + return f(self,*args) + + return wrap + + @_check_close + def send_msg(self,data): self.main_stream.write(struct.pack('l',len(data)) + data) + @_check_close def send_file(self,filekey,filepath,callback): def _conn_cb(): self._add_wait_filekey(filekey,_callback) @@ -335,22 +342,20 @@ class SocketConnection(Connection): callback(err) - if self._closed == True: - raise ConnectionError - fd = os.open(filepath,os.O_RDONLY) filesize = os.fstat(fd).st_size file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.set_close_callback(lambda stream : _callback('Eabort')) file_stream.connect(self.file_addr,_conn_cb) + @_check_close def recv_file(self,filekey,filesize,filepath,callback): def _conn_cb(stream): nonlocal file_stream file_stream = stream - file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.set_close_callback(lambda stream : _callback('Eabort')) self._add_wait_filekey(filekey,_callback) file_stream.recvfile(fd,filesize,_callback) @@ -375,14 +380,12 @@ class SocketConnection(Connection): callback(err) - if self._closed == True: - raise ConnectionError - file_stream = None self.add_pend_filestream(filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) + @_check_close def send_filedata(self,filekey,filesize,callback): def _conn_cb(): self._add_wait_filekey(filekey,_callback) @@ -416,38 +419,41 @@ class SocketConnection(Connection): file_stream.write(data,__done_cb) - if self._closed == True: - raise ConnectionError - file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) retid = imc.async.get_retid() - file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.set_close_callback(lambda stream : _callback('Eabort')) file_stream.connect(self.file_addr,_conn_cb) imc.async.switch_top() return _send_cb + @_check_close def recv_filedata(self,filekey,filesize,send_fn): def _conn_cb(stream): nonlocal file_stream file_stream = stream - file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.set_close_callback(lambda stream : _callback('Eabort')) self._add_wait_filekey(filekey,_callback) file_stream.read_bytes(filesize,send_fn,nonbuf = True) def _callback(err = None): - file_stream.close() + try: + self._del_wait_filekey(filekey) - if self._closed == True: - raise ConnectionError + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() file_stream = None self.add_pend_filestream(filekey,_conn_cb) + @_check_close def abort_file(self,filekey): try: self._sendfile_filekeymap[filekey]('Eabort') @@ -455,6 +461,7 @@ class SocketConnection(Connection): except KeyError: pass + @_check_close def start_recv(self,recv_callback): def _recv_size(data): size, = struct.unpack('l',data) @@ -471,14 +478,12 @@ class SocketConnection(Connection): if self._closed == True: return - traceback.print_stack() - self._closed = True self.main_stream.close() callbacks = list(self._sendfile_filekeymap.values()) for callback in callbacks: - callback('Eclose') + callback('Eabort') super().close() |