aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/netio.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/netio.py')
-rw-r--r--src/py/netio.py141
1 files changed, 84 insertions, 57 deletions
diff --git a/src/py/netio.py b/src/py/netio.py
index df19627..e148cd6 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -1,11 +1,10 @@
import os
+import traceback
import json
import struct
import socket
from collections import deque
-import time
-
import tornado.ioloop
import tornado.stack_context
@@ -23,7 +22,7 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
class SocketStream:
- def __init__(self,sock,addr):
+ def __init__(self,sock):
self.DATA_BUF = 0
self.DATA_NOBUF = 1
self.DATA_FILE = 2
@@ -43,9 +42,7 @@ class SocketStream:
self._sock.setblocking(False)
self._ioloop.add_handler(sock.fileno(),self._handle_event,tornado.ioloop.IOLoop.ERROR)
- self.addr = addr
-
- def connect(self,callback):
+ def connect(self,addr,callback):
if self._closed == True:
raise ConnectionError
@@ -56,7 +53,7 @@ class SocketStream:
self._ioloop.update_handler(self._sock.fileno(),self._stat)
self._conning = True
- self._sock.connect(self.addr)
+ self._sock.connect(addr)
except BlockingIOError:
pass
@@ -122,6 +119,10 @@ class SocketStream:
self._close_callback(self)
def _handle_event(self,fd,evt):
+ if evt & tornado.ioloop.IOLoop.ERROR:
+ self.close()
+ return
+
if evt & tornado.ioloop.IOLoop.READ:
while len(self._read_queue) > 0:
iocb = self._read_queue[0]
@@ -271,7 +272,7 @@ class SocketStream:
self._ioloop.update_handler(fd,stat)
class SocketConnection(Connection):
- def __init__(self,linkclass,linkid,main_stream,add_pend_filestream_fn = None):
+ def __init__(self,linkclass,linkid,main_stream,file_addr,add_pend_filestream_fn = None):
super().__init__(linkclass,linkid)
self._ioloop = tornado.ioloop.IOLoop.current()
@@ -279,6 +280,7 @@ class SocketConnection(Connection):
self.main_stream = main_stream
self.main_stream.set_close_callback(lambda conn : self.close())
+ self.file_addr = file_addr
self.add_pend_filestream = add_pend_filestream_fn
self._start_ping()
@@ -291,73 +293,69 @@ class SocketConnection(Connection):
def send_file(self,filekey,filepath,callback):
def _conn_cb():
- self._add_sendfile(filekey,_fail_cb)
+ self._add_wait_filekey(filekey,_callback)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
'filekey':filekey
}),'utf-8'))
- file_stream.sendfile(fd,_done_cb)
-
- def _done_cb():
- file_stream.set_close_callback(None)
- file_stream.close()
-
- os.close(fd)
-
- callback()
+ file_stream.sendfile(fd,_callback)
- def _fail_cb():
+ def _callback(err = None):
try:
- del self._sendfile_filekeymap[filekey]
+ self._del_wait_filekey(filekey)
- except:
+ except KeyError:
return
+ file_stream.set_close_callback(None)
file_stream.close()
os.close(fd)
+ if err == None:
+ callback()
+
if self._closed == True:
raise ConnectionError
fd = os.open(filepath,os.O_RDONLY)
filesize = os.fstat(fd).st_size
- file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
- file_stream.set_close_callback(lambda stream : _fail_cb())
- file_stream.connect(_conn_cb)
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
def recv_file(self,filekey,filesize,filepath,callback):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- file_stream.set_close_callback(lambda stream : _fail_cb())
- self._add_sendfile(filekey,_fail_cb)
-
- file_stream.recvfile(fd,filesize,_done_cb)
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
- def _done_cb():
- file_stream.set_close_callback(None)
- file_stream.close()
+ file_stream.recvfile(fd,filesize,_callback)
- os.close(fd)
- print(time.perf_counter() - st)
-
- callback()
-
- def _fail_cb():
+ def _callback(err = None):
try:
- del self._sendfile_filekeymap[filekey]
+ self._del_wait_filekey(filekey)
- except:
+ except KeyError:
return
+ file_stream.set_close_callback(None)
file_stream.close()
-
os.close(fd)
- os.remove(filepath)
+
+ if err == None:
+ callback()
+
+ else:
+ try:
+ os.remove(filepath)
+
+ except FileNotFoundError:
+ pass
if self._closed == True:
raise ConnectionError
@@ -367,41 +365,64 @@ class SocketConnection(Connection):
self.add_pend_filestream(filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
- st = time.perf_counter()
-
- def send_filedata(self,filekey,filesize):
+ def send_filedata(self,filekey,filesize,callback):
def _conn_cb():
- self._add_sendfile(filekey,lambda : file_stream.close())
+ self._add_wait_filekey(filekey,_callback)
send_pack(file_stream,bytes(json.dumps({
'conntype':'file',
'filekey':filekey
}),'utf-8'))
- old_gr.switch()
+ imc.async.ret(retid)
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
+ if err == None:
+ callback()
def _send_cb(data):
- file_stream.write(data)
+ def __done_cb():
+ nonlocal filesize
+
+ filesize -= len(data)
+ if filesize == 0:
+ _callback()
+
+ file_stream.write(data,__done_cb)
if self._closed == True:
raise ConnectionError
- file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+ file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- old_gr = imc.async.current()
- file_stream.connect(_conn_cb)
- imc.async.switchtop()
+ retid = imc.async.get_retid()
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ file_stream.connect(self.file_addr,_conn_cb)
+ imc.async.switch_top()
return _send_cb
- def recv_filedata(self,filekey,filesize,callback):
+ def recv_filedata(self,filekey,filesize,send_fn):
def _conn_cb(stream):
nonlocal file_stream
file_stream = stream
- self._add_sendfile(filekey,lambda : file_stream.close())
+ file_stream.set_close_callback(lambda stream : _callback('Eclose'))
+ self._add_wait_filekey(filekey,_callback)
- file_stream.read_bytes(filesize,callback,nonbuf = True)
+ file_stream.read_bytes(filesize,send_fn,nonbuf = True)
+
+ def _callback(err = None):
+ file_stream.close()
if self._closed == True:
raise ConnectionError
@@ -412,7 +433,7 @@ class SocketConnection(Connection):
def abort_file(self,filekey):
try:
- self._sendfile_filekeymap[filekey]()
+ self._sendfile_filekeymap[filekey]('Eabort')
except KeyError:
pass
@@ -448,11 +469,18 @@ class SocketConnection(Connection):
self._closed = True
self.main_stream.close()
+ callbacks = list(self._sendfile_filekeymap.values())
+ for callback in callbacks:
+ callback('Eclose')
+
super().close()
- def _add_sendfile(self,filekey,fail_cb):
+ def _add_wait_filekey(self,filekey,fail_cb):
self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+ def _del_wait_filekey(self,filekey):
+ del self._sendfile_filekeymap[filekey]
+
def _start_ping(self):
def __check():
try:
@@ -463,7 +491,6 @@ class SocketConnection(Connection):
self._ping_delay += 1
if self._ping_delay > 10:
- print(self.linkid)
self.close()
self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000)