diff options
author | pzread <netfirewall@gmail.com> | 2013-06-03 21:40:37 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-06-03 21:40:37 +0800 |
commit | 9fa7badc787ec364d58f65b95355c8725ad75a9c (patch) | |
tree | 45c37afd8ec06e093c5fb257259110fe86fd770c | |
parent | 13445dbeb426347354b61bd46f738f37b25ffd60 (diff) | |
download | taiwan-online-judge-9fa7badc787ec364d58f65b95355c8725ad75a9c.tar.gz taiwan-online-judge-9fa7badc787ec364d58f65b95355c8725ad75a9c.tar.zst taiwan-online-judge-9fa7badc787ec364d58f65b95355c8725ad75a9c.zip |
IMC framework 1.0 Alpha 1
-rw-r--r-- | src/py/backend_server.py | 19 | ||||
-rw-r--r-- | src/py/center_server.py | 14 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 36 | ||||
-rw-r--r-- | src/py/netio.py | 20 |
4 files changed, 65 insertions, 24 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 3c4f1ba..087551a 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -37,7 +37,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): def start(self): sock_port = random.randrange(4096,8192) - self.sock_addr = ('127.0.0.1',sock_port) + self.sock_addr = ('10.8.0.6',sock_port) self.bind(sock_port,'',socket.AF_INET,65536) super().start() @@ -104,8 +104,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): #imc_register_call('','test_dsta',self._test_dsta) time.sleep(2) - #if int(self._linkid) == 2: - self._test_call(None,'9') + if int(self._linkid) == 2: + self._test_call(None,'9') sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -228,15 +228,15 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,iden,param): - param = '6' pend = [] - for i in range(0,8): + for i in range(0,3): if str((i % 8) + 2) == self._linkid: continue - fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','test.py') + fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + + '/','test.py') dst = '/backend/' + str((i % 8) + 2) + '/' ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey) @@ -244,7 +244,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): pend.append(fileres) for p in pend: - self._linkid + ' ' + p.wait() + print(self._linkid + ' ' + p.wait()) print(self._linkid) @@ -253,7 +253,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) #return ret + ' Too' - fileres = Proxy.instance.recvfile(param,'data') + Proxy.instance.rejectfile(param) + #fileres = Proxy.instance.recvfile(param,'data') #print(fileres.wait()) return 'ok' @@ -294,7 +295,7 @@ def start_backend_worker(ws_port): ])) http_serv.listen(ws_port) - backend_worker = BackendWorker(('localhost',5730),ws_port) + backend_worker = BackendWorker(('10.8.0.6',5730),ws_port) backend_worker.start() tornado.ioloop.IOLoop.instance().start() diff --git a/src/py/center_server.py b/src/py/center_server.py index ed2de4c..d0bd429 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -139,16 +139,16 @@ class CenterServer(tornado.tcpserver.TCPServer): linkid = param try: - #worker = self._worker_linkidmap[linkid] + worker = self._worker_linkidmap[linkid] - a = int(iden['linkid']) - b = int(linkid) + #a = int(iden['linkid']) + #b = int(linkid) - if b > a: - worker = self._worker_linkidmap[str(a + 1)] + #if b > a: + # worker = self._worker_linkidmap[str(a + 1)] - else: - worker = self._worker_linkidmap[str(a - 1)] + #else: + # worker = self._worker_linkidmap[str(a - 1)] if iden['linkclass'] != 'client': sock_ip,sock_port = worker.sock_addr diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 9f96a73..8f37b62 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -22,9 +22,24 @@ class Connection: def send_msg(self,data): pass + def send_file(self,filekey,filepath,callback): + pass + + def recv_file(self,filekey,filesize,filepath,callback): + pass + + def send_filedata(self,filekey,filesize,callback): + pass + + def recv_filedata(self,filekey,filesize,send_fn): + pass + def start_recv(self,recv_callback): pass + def abort_file(self,filekey): + pass + def add_close_callback(self,callback): self._close_callback.append(tornado.stack_context.wrap(callback)) @@ -153,9 +168,8 @@ class Proxy: 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } - stat,ret = self.call(1000000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize}) + stat,ret = self.call(65536,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize}) if stat == False: - print('err close ' + ret) raise ConnectionError(ret) return fileresult @@ -200,7 +214,7 @@ class Proxy: return dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) - self.call(1000000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey}) + self.call(65536,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey}) def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param): def __add_wait_caller(conn_linkid): @@ -249,7 +263,7 @@ class Proxy: self._auth.change_iden(old_iden) except KeyError: - result = (False,'Enoexist1') + result = (False,'Enoexist') __del_wait_caller(self._linkid) @@ -258,7 +272,7 @@ class Proxy: else: conn = self._request_conn(dst_linkid) if conn == None: - return __ret((False,'Enoexist2')) + return __ret((False,'Enoexist')) else: if caller_linkid == self._linkid: @@ -351,9 +365,10 @@ class Proxy: self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) def _add_wait_filekey(self,conn_linkid,filekey,filesize,callback): + callback = tornado.stack_context.wrap(callback) self._conn_filekeymap[conn_linkid][filekey] = { 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')), - 'callback':tornado.stack_context.wrap(callback) + 'callback':callback } def _del_wait_filekey(self,conn_linkid,filekey): @@ -393,7 +408,12 @@ class Proxy: print('connection close') def _recv_dispatch(self,conn,data): - msg = json.loads(data.decode('utf-8')) + try: + msg = json.loads(data.decode('utf-8')) + + except: + return + msg_type = msg['type'] if msg_type == self.MSGTYPE_CALL: @@ -517,7 +537,7 @@ class Proxy: self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject') def imc_call(idendesc,dst,func_name,param): - return Proxy.instance.call(1000000,idendesc,dst,func_name,param) + return Proxy.instance.call(65536,idendesc,dst,func_name,param) def imc_call_async(idendesc,dst,func_name,param,callback = None): @async.caller diff --git a/src/py/netio.py b/src/py/netio.py index e148cd6..4fe61bd 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -152,6 +152,10 @@ class SocketStream: iocb[1] = size break + except Exception: + self.close() + return + elif datatype == self.DATA_NOBUF: size = iocb[1] @@ -173,6 +177,10 @@ class SocketStream: iocb[1] = size break + except Exception: + self.close() + return + elif datatype == self.DATA_FILE: size = iocb[1] @@ -197,6 +205,10 @@ class SocketStream: iocb[1] = size break + except Exception: + self.close() + return + if evt & tornado.ioloop.IOLoop.WRITE: if self._conning == True: self._conning = False @@ -232,6 +244,10 @@ class SocketStream: iocb[1] = off break + except Exception: + self.close() + return + elif datatype == self.DATA_FILE: size = iocb[1] filefd = iocb[2] @@ -257,6 +273,10 @@ class SocketStream: iocb[1] = size break + except Exception: + self.close() + return + if self._closed == True: return |