diff options
Diffstat (limited to 'src/py/netio.py')
-rw-r--r-- | src/py/netio.py | 141 |
1 files changed, 84 insertions, 57 deletions
diff --git a/src/py/netio.py b/src/py/netio.py index df19627..e148cd6 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -1,11 +1,10 @@ import os +import traceback import json import struct import socket from collections import deque -import time - import tornado.ioloop import tornado.stack_context @@ -23,7 +22,7 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) class SocketStream: - def __init__(self,sock,addr): + def __init__(self,sock): self.DATA_BUF = 0 self.DATA_NOBUF = 1 self.DATA_FILE = 2 @@ -43,9 +42,7 @@ class SocketStream: self._sock.setblocking(False) self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR) - self.addr = addr - - def connect(self,callback): + def connect(self,addr,callback): if self._closed == True: raise ConnectionError @@ -56,7 +53,7 @@ class SocketStream: self._ioloop.update_handler(self._sock.fileno(),self._stat) self._conning = True - self._sock.connect(self.addr) + self._sock.connect(addr) except BlockingIOError: pass @@ -122,6 +119,10 @@ class SocketStream: self._close_callback(self) def _handle_event(self,fd,evt): + if evt & tornado.ioloop.IOLoop.ERROR: + self.close() + return + if evt & tornado.ioloop.IOLoop.READ: while len(self._read_queue) > 0: iocb = self._read_queue[0] @@ -271,7 +272,7 @@ class SocketStream: self._ioloop.update_handler(fd,stat) class SocketConnection(Connection): - def __init__(self,linkclass,linkid,main_stream,add_pend_filestream_fn = None): + def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None): super().__init__(linkclass,linkid) self._ioloop = tornado.ioloop.IOLoop.current() @@ -279,6 +280,7 @@ class SocketConnection(Connection): self.main_stream = main_stream self.main_stream.set_close_callback(lambda conn : self.close()) + self.file_addr = file_addr self.add_pend_filestream = add_pend_filestream_fn self._start_ping() @@ -291,73 +293,69 @@ class SocketConnection(Connection): def send_file(self,filekey,filepath,callback): def _conn_cb(): - self._add_sendfile(filekey,_fail_cb) + self._add_wait_filekey(filekey,_callback) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', 'filekey':filekey }),'utf-8')) - file_stream.sendfile(fd,_done_cb) - - def _done_cb(): - file_stream.set_close_callback(None) - file_stream.close() - - os.close(fd) - - callback() + file_stream.sendfile(fd,_callback) - def _fail_cb(): + def _callback(err = None): try: - del self._sendfile_filekeymap[filekey] + self._del_wait_filekey(filekey) - except: + except KeyError: return + file_stream.set_close_callback(None) file_stream.close() os.close(fd) + if err == None: + callback() + if self._closed == True: raise ConnectionError fd = os.open(filepath,os.O_RDONLY) filesize = os.fstat(fd).st_size - file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) - file_stream.set_close_callback(lambda stream : _fail_cb()) - file_stream.connect(_conn_cb) + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) def recv_file(self,filekey,filesize,filepath,callback): def _conn_cb(stream): nonlocal file_stream file_stream = stream - file_stream.set_close_callback(lambda stream : _fail_cb()) - self._add_sendfile(filekey,_fail_cb) - - file_stream.recvfile(fd,filesize,_done_cb) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) - def _done_cb(): - file_stream.set_close_callback(None) - file_stream.close() + file_stream.recvfile(fd,filesize,_callback) - os.close(fd) - print(time.perf_counter() - st) - - callback() - - def _fail_cb(): + def _callback(err = None): try: - del self._sendfile_filekeymap[filekey] + self._del_wait_filekey(filekey) - except: + except KeyError: return + file_stream.set_close_callback(None) file_stream.close() - os.close(fd) - os.remove(filepath) + + if err == None: + callback() + + else: + try: + os.remove(filepath) + + except FileNotFoundError: + pass if self._closed == True: raise ConnectionError @@ -367,41 +365,64 @@ class SocketConnection(Connection): self.add_pend_filestream(filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) - st = time.perf_counter() - - def send_filedata(self,filekey,filesize): + def send_filedata(self,filekey,filesize,callback): def _conn_cb(): - self._add_sendfile(filekey,lambda : file_stream.close()) + self._add_wait_filekey(filekey,_callback) send_pack(file_stream,bytes(json.dumps({ 'conntype':'file', 'filekey':filekey }),'utf-8')) - old_gr.switch() + imc.async.ret(retid) + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + file_stream.set_close_callback(None) + file_stream.close() + + if err == None: + callback() def _send_cb(data): - file_stream.write(data) + def __done_cb(): + nonlocal filesize + + filesize -= len(data) + if filesize == 0: + _callback() + + file_stream.write(data,__done_cb) if self._closed == True: raise ConnectionError - file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - old_gr = imc.async.current() - file_stream.connect(_conn_cb) - imc.async.switchtop() + retid = imc.async.get_retid() + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + file_stream.connect(self.file_addr,_conn_cb) + imc.async.switch_top() return _send_cb - def recv_filedata(self,filekey,filesize,callback): + def recv_filedata(self,filekey,filesize,send_fn): def _conn_cb(stream): nonlocal file_stream file_stream = stream - self._add_sendfile(filekey,lambda : file_stream.close()) + file_stream.set_close_callback(lambda stream : _callback('Eclose')) + self._add_wait_filekey(filekey,_callback) - file_stream.read_bytes(filesize,callback,nonbuf = True) + file_stream.read_bytes(filesize,send_fn,nonbuf = True) + + def _callback(err = None): + file_stream.close() if self._closed == True: raise ConnectionError @@ -412,7 +433,7 @@ class SocketConnection(Connection): def abort_file(self,filekey): try: - self._sendfile_filekeymap[filekey]() + self._sendfile_filekeymap[filekey]('Eabort') except KeyError: pass @@ -448,11 +469,18 @@ class SocketConnection(Connection): self._closed = True self.main_stream.close() + callbacks = list(self._sendfile_filekeymap.values()) + for callback in callbacks: + callback('Eclose') + super().close() - def _add_sendfile(self,filekey,fail_cb): + def _add_wait_filekey(self,filekey,fail_cb): self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + def _del_wait_filekey(self,filekey): + del self._sendfile_filekeymap[filekey] + def _start_ping(self): def __check(): try: @@ -463,7 +491,6 @@ class SocketConnection(Connection): self._ping_delay += 1 if self._ping_delay > 10: - print(self.linkid) self.close() self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000) |