aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/netio.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/netio.py')
-rwxr-xr-xsrc/py/netio.py81
1 files changed, 43 insertions, 38 deletions
diff --git a/src/py/netio.py b/src/py/netio.py
index aa2f9b2..045b94c 100755
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -42,11 +42,18 @@ class SocketStream:
self._sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1)
self._sock.setblocking(False)
self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR)
+
+ def _check_close(f):
+ def wrap(self,*args):
+ if self._closed == True:
+ raise ConnectionError
- def connect(self,addr,callback):
- if self._closed == True:
- raise ConnectionError
+ return f(self,*args)
+
+ return wrap
+ @_check_close
+ def connect(self,addr,callback):
try:
self._conn_callback = tornado.stack_context.wrap(callback)
@@ -59,10 +66,8 @@ class SocketStream:
except BlockingIOError:
pass
+ @_check_close
def read_bytes(self,size,callback = None,nonbuf = False):
- if self._closed == True:
- raise ConnectionError
-
if nonbuf == False:
self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)])
@@ -72,19 +77,15 @@ class SocketStream:
self._stat |= tornado.ioloop.IOLoop.READ
self._ioloop.update_handler(self._sock.fileno(),self._stat)
+ @_check_close
def write(self,buf,callback = None):
- if self._closed == True:
- raise ConnectionError
-
self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)])
self._stat |= tornado.ioloop.IOLoop.WRITE
self._ioloop.update_handler(self._sock.fileno(),self._stat)
+ @_check_close
def sendfile(self,fd,callback = None):
- if self._closed == True:
- raise ConnectionError
-
size = os.fstat(fd).st_size
self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
@@ -92,10 +93,8 @@ class SocketStream:
self._stat |= tornado.ioloop.IOLoop.WRITE
self._ioloop.update_handler(self._sock.fileno(),self._stat)
+ @_check_close
def recvfile(self,fd,size,callback = None):
- if self._closed == True:
- raise ConnectionError
-
self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
self._stat |= tornado.ioloop.IOLoop.READ
@@ -118,7 +117,7 @@ class SocketStream:
if self._close_callback != None:
self._close_callback(self)
-
+
def _handle_event(self,fd,evt):
if evt & tornado.ioloop.IOLoop.ERROR:
print(os.strerror(self._sock.getsockopt(socket.SOL_SOCKET,socket.SO_ERROR)))
@@ -305,12 +304,20 @@ class SocketConnection(Connection):
self.file_addr = file_addr
self.add_pend_filestream = add_pend_filestream_fn
- def send_msg(self,data):
- if self._closed == True:
- raise ConnectionError
+ def _check_close(f):
+ def wrap(self,*args):
+ if self._closed == True:
+ raise ConnectionError
+ return f(self,*args)
+
+ return wrap
+
+ @_check_close
+ def send_msg(self,data):
self.main_stream.write(struct.pack('l',len(data)) + data)
+ @_check_close
def send_file(self,filekey,filepath,callback):
def _conn_cb():
self._add_wait_filekey(filekey,_callback)
@@ -335,22 +342,20 @@ class SocketConnection(Connection):
callback(err)
- if self._closed == True:
- raise ConnectionError
-
fd = os.open(filepath,os.O_RDONLY)
filesize = os.fstat(fd).st_size
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.set_close_callback(lambda stream : _callback('Eabort'))
file_stream.connect(self.file_addr,_conn_cb)
+ @_check_close
def recv_file(self,filekey,filesize,filepath,callback):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.set_close_callback(lambda stream : _callback('Eabort'))
self._add_wait_filekey(filekey,_callback)
file_stream.recvfile(fd,filesize,_callback)
@@ -375,14 +380,12 @@ class SocketConnection(Connection):
callback(err)
- if self._closed == True:
- raise ConnectionError
-
file_stream = None
self.add_pend_filestream(filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
+ @_check_close
def send_filedata(self,filekey,filesize,callback):
def _conn_cb():
self._add_wait_filekey(filekey,_callback)
@@ -416,38 +419,41 @@ class SocketConnection(Connection):
file_stream.write(data,__done_cb)
- if self._closed == True:
- raise ConnectionError
-
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
retid = imc.async.get_retid()
- file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.set_close_callback(lambda stream : _callback('Eabort'))
file_stream.connect(self.file_addr,_conn_cb)
imc.async.switch_top()
return _send_cb
+ @_check_close
def recv_filedata(self,filekey,filesize,send_fn):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.set_close_callback(lambda stream : _callback('Eabort'))
self._add_wait_filekey(filekey,_callback)
file_stream.read_bytes(filesize,send_fn,nonbuf = True)
def _callback(err = None):
- file_stream.close()
+ try:
+ self._del_wait_filekey(filekey)
- if self._closed == True:
- raise ConnectionError
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
file_stream = None
self.add_pend_filestream(filekey,_conn_cb)
+ @_check_close
def abort_file(self,filekey):
try:
self._sendfile_filekeymap[filekey]('Eabort')
@@ -455,6 +461,7 @@ class SocketConnection(Connection):
except KeyError:
pass
+ @_check_close
def start_recv(self,recv_callback):
def _recv_size(data):
size, = struct.unpack('l',data)
@@ -471,14 +478,12 @@ class SocketConnection(Connection):
if self._closed == True:
return
- traceback.print_stack()
-
self._closed = True
self.main_stream.close()
callbacks = list(self._sendfile_filekeymap.values())
for callback in callbacks:
- callback('Eclose')
+ callback('Eabort')
super().close()