diff options
author | pzread <netfirewall@gmail.com> | 2013-07-29 01:55:58 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-07-29 01:55:58 +0800 |
commit | e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c (patch) | |
tree | ada51b6e8685bfb44f08ba4f02ffa734e91cdb53 | |
parent | 456b6867161c1f1295993c61b3726ff44a3c809e (diff) | |
download | taiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.tar.gz taiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.tar.zst taiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.zip |
Add websocket sendfile
-rw-r--r-- | src/css/index.less | 13 | ||||
-rw-r--r-- | src/js/com.js | 143 | ||||
-rw-r--r-- | src/js/imc.js | 220 | ||||
-rw-r--r-- | src/pmod/pmod_test/css/manage.less | 23 | ||||
-rw-r--r-- | src/pmod/pmod_test/html/manage.html | 91 | ||||
-rw-r--r-- | src/pmod/pmod_test/js/pmod_test.js | 140 | ||||
-rw-r--r-- | src/pmod/pmod_test/py/pmod_test.py | 186 | ||||
-rwxr-xr-x | src/py/backend_server.py | 211 | ||||
-rwxr-xr-x | src/py/center_server.py | 15 | ||||
-rw-r--r-- | src/py/com.py | 12 | ||||
-rw-r--r-- | src/py/dispatch.py | 110 | ||||
-rw-r--r-- | src/py/imc/async.py | 4 | ||||
-rwxr-xr-x | src/py/imc/blobclient.py | 287 | ||||
-rwxr-xr-x | src/py/imc/blobhandle.py | 76 | ||||
-rwxr-xr-x | src/py/imc/blobserver.py | 341 | ||||
-rwxr-xr-x | src/py/imc/blobtable.py | 12 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 106 | ||||
-rw-r--r-- | src/py/mod.py | 2 | ||||
-rwxr-xr-x | src/py/netio.py | 257 | ||||
-rw-r--r-- | src/py/spup.c | 268 | ||||
-rw-r--r-- | src/py/square.py | 9 |
21 files changed, 1986 insertions, 540 deletions
diff --git a/src/css/index.less b/src/css/index.less index 4cd496a..04f75eb 100644 --- a/src/css/index.less +++ b/src/css/index.less @@ -17,20 +17,18 @@ body{ overflow:hidden; &.active{ - height:41px; + height:33px; opacity:1; display:block; } } .index_tag{ - height:32px; - line-height:32px; + height:33px; + line-height:33px; opacity:0.2; transition:opacity @fast; &.active,&:hover{ - height:41px; - line-height:41px; opacity:1; } @@ -84,7 +82,7 @@ body{ div.tagblock{ width:100%; - height:41px; + height:33px; box-shadow:0px 3px 2px -2px fade(@black,10%); } div.menu{ @@ -92,7 +90,6 @@ body{ li > a{ height:48px; padding-left:32px; - padding-right:-32px; font-size:@NormalFontSize; font-weight:bold; line-height:48px; @@ -114,7 +111,7 @@ body{ div.tagblock{ width:100%; - height:41px; + height:33px; box-shadow:0px 3px 2px -2px fade(@black,10%); } div.notice{ diff --git a/src/js/com.js b/src/js/com.js index 1c8af3c..1a6d732 100644 --- a/src/js/com.js +++ b/src/js/com.js @@ -10,14 +10,75 @@ var ACCESS_DELETE = 0x8; var ACCESS_SETPER = 0x10; var ACCESS_EXECUTE = 0x20; -var WebSocketConnection = function(link,ws){ +var WebSocketConnection = function(link,ws,file_addr){ var that = this; + var sendfile_filekeymap = {}; that.__super__(link); that.send_msg = function(data){ ws.send(new Blob([data],{'type':'application/octet-stream'})) }; + that.send_file = function(filekey,blob,callback){ + var i; + var file_ws = new Array(4); + var filesize = blob.size; + var partsize = Math.ceil(filesize / 4); + var count = 0; + + function _callback(err){ + if(!(filekey in sendfile_filekeymap)){ + return; + } + + delete sendfile_filekeymap[filekey]; + + for(i = 0;i < 4;i++){ + if(file_ws[i] != undefined){ + file_ws[i].close(); + } + } + + callback(err); + } + + for(i = 0;i < 4;i++){ + file_ws[i] = new WebSocket('ws://' + file_addr + '/conn'); + file_ws[i].onopen = function(idx){return function(){ + var ws = file_ws[idx]; + var off = idx * partsize; + var end = Math.min(filesize,off + partsize); + + ws.onmessage = function(e){ + if(off >= end){ + count += 1; + if(count == 4){ + _callback(); + } + }else{ + ws.send(blob.slice(off,Math.min(end,off + 524288), + 'application/octet-stream')); + off += 524288; + } + }; + + console.log('file open ' + off); + + ws.send(JSON.stringify({ + 'conntype':'file', + 'filekey':filekey + })); + + ws.send(new Blob([JSON.stringify({'off':off})], + {'type':'application/octet-stream'})); + }}(i); + } + }; + that.abort_file = function(filekey,err){ + if(filekey in sendfile_filekeymap){ + sendfile_filekeymap[filekey]('Eabort'); + } + }; that.start_recv = function(recv_callback){ ws.onmessage = function(e){ var reader = new FileReader; @@ -197,7 +258,7 @@ var com = new function(){ }else{ return false; } - } + }; $.fn.tagbox = function(option){ var tagbox = this.data('tagbox'); @@ -206,7 +267,16 @@ var com = new function(){ } return tagbox; - } + }; + $.fn.codebox = function(option){ + var codebox = this.data('codebox'); + + if(option != undefined){ + codebox = that.create_codebox(this,option.mode,option.readonly); + } + + return codebox; + }; }; that.url_push = function(url){ @@ -533,7 +603,7 @@ var com = new function(){ ex($('[exheight="true"]'),'height'); ex($('[exminheight="true"]'),'min-height'); - $('.modal-body').css('max-height',(winheight * 0.5) + 'px'); + $('.modal-body').css('max-height',(winheight * 0.9 - 192) + 'px'); }; that.get_cookie = function(){ var ret; @@ -994,6 +1064,41 @@ var com = new function(){ var reto; var idendesc; var ws; + var addr; + + function x(idx){ + var tws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn'); + var offset; + var end; + var blob; + + tws.onmessage = function(){ + console.log(offset) + + if(offset < end){ + console.log(offset); + tws.send(blob.slice(offset,Math.min(end,offset + 524288 * 1),'application/octet-stream')); + offset += (524288 * 1); + }else{ + console.log(new Date().getTime()); + } + }; + tws.onopen = function(){ + tws.send('filestream'); + }; + $('#test_fs').on('change',function(e){ + blob = this.files[0]; + offset = Math.floor(blob.size / 4) * idx; + if(idx != 3){ + end = offset + Math.floor(blob.size / 4); + }else{ + end = blob.size; + } + + console.log(new Date().getTime()); + tws.send('start'); + }); + } if(res[0] != 'E'){ reto = JSON.parse(res) @@ -1001,8 +1106,9 @@ var com = new function(){ that.link = reto.client_link; that.backend_link = reto.backend_link; idendesc = reto.client_idendesc; + addr = reto.ip + ':' + reto.port; - ws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn'); + ws = new WebSocket('ws://' + addr + '/conn'); ws.onopen = function(){ var i; var conn; @@ -1011,10 +1117,11 @@ var com = new function(){ console.log('open'); ws.send(JSON.stringify({ + 'conntype':'main', 'client_link':that.link })); - conn = new WebSocketConnection(reto.backend_link,ws); + conn = new WebSocketConnection(reto.backend_link,ws,addr); new imc.Auth(); new imc.Proxy(that.link,imc.Auth.instance,function(link,callback){ @@ -1037,12 +1144,30 @@ var com = new function(){ }else{ that.conn_callback.fire(); } + + $('#test_fs').on('change',function(e){ + var blob = this.files[0]; + + console.log(new Date().getTime()); + + that.sendfile_backend(blob,function(filekey){ + that.call_backend('test/','test_dst',function(result){ + console.log(result); + },filekey); + console.log(filekey); + },function(result){ + console.log(result); + }); + + }); + }; }else{ setTimeout(that.conn_backend,5000); } }); } + that.call_backend = function(path,func_name,callback){ var i; var params = new Array() @@ -1053,5 +1178,9 @@ var com = new function(){ } imc.Proxy.instance.call.apply(undefined,params); - } + }; + that.sendfile_backend = function(blob,filekey_callback,result_callback){ + return imc.Proxy.instance.sendfile(that.backend_link,blob, + filekey_callback,result_callback); + }; }; diff --git a/src/js/imc.js b/src/js/imc.js index ae81684..05ce6be 100644 --- a/src/js/imc.js +++ b/src/js/imc.js @@ -15,6 +15,7 @@ var imc = new function(){ that.link = link; that.send_msg = function(data){}; + that.send_file = function(filekey,blob,callback){}; that.start_recv = function(recv_callback){}; that.close = function(){ @@ -29,14 +30,18 @@ var imc = new function(){ that.Proxy = function(self_link,auth,conn_link){ var MSGTYPE_CALL = 'call'; var MSGTYPE_RET = 'ret'; + var MSGTYPE_SENDFILE = 'sendfile'; + var MSGTYPE_ABORTFILE = 'abortfile'; var that = this; var caller_retid_count = 0; var conn_linkmap = {}; var conn_retidmap = {}; var callpath_root = {'child':{},'name':{},'filt':[]}; + var info_filekeymap = {}; + var conn_filekeymap = {}; - var walk_path = function(path,create){ + function walk_path(path,create){ var i; var parts; var part; @@ -67,9 +72,9 @@ var imc = new function(){ } return cnode; - }; + } - var route_call = function(caller_link,caller_retid,idendesc,dst,func_name,timeout,callback,param){ + function route_call(caller_link,caller_retid,idendesc,dst,func_name,timeout,callback,param){ var i; var j; var part; @@ -80,12 +85,12 @@ var imc = new function(){ var dpart; var func; - var _add_wait_caller = function(conn_link){ + function _add_wait_caller(conn_link){ conn_retidmap[conn_link][caller_retid] = { 'timeout':timeout, 'callback':callback } - }; + } part = dst.split('/'); dst_link = part.slice(0,3).join('/') + '/' @@ -114,13 +119,20 @@ var imc = new function(){ _add_wait_caller(self_link); func.apply(undefined,[function(data){ - if(self_link in conn_retidmap && caller_retid in conn_retidmap[self_link]){ + if(self_link in conn_retidmap && + caller_retid in conn_retidmap[self_link]){ + delete conn_retidmap[self_link][caller_retid]; - callback({'stat':true,'data':data}); + + if(callback != null && callback != undefined){ + callback({'stat':true,'data':data}); + } } }].concat(param)); }else{ - callback({'stat':false,'data':'Enoexist'}); + if(callback != null && callback != undefined){ + callback({'stat':false,'data':'Enoexist'}); + } } }else{ that.request_conn(dst_link,function(conn){ @@ -128,22 +140,109 @@ var imc = new function(){ _add_wait_caller(conn.link); } - send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param); + send_msg_call(conn,caller_link,caller_retid,idendesc,dst, + func_name,timeout,param); }); } - }; + } + function ret_call(conn_link,caller_link,caller_retid,result){ + var wait; + + if(conn_link in conn_retidmap && + caller_retid in conn_retidmap[conn_link]){ + + wait = conn_retidmap[conn_link][caller_retid]; + delete conn_retidmap[conn_link][caller_retid]; + }else{ + return; + } + + if(caller_link == self_link){ + wait.callback(result); + }else{ + request_conn(caller_link,function(conn){ + send_msg_ret(conn,caller_link,caller_retid,result); + }); + } + } + function route_sendfile(out_conn,src_link,filekey,filesize){ + var info; + + function _send_cb(err){ + if(del_wait_filekey(out_conn,filekey)){ + return; + } + + if(err != undefined){ + out_conn.abort_file(filekey,err); + send_msg_abortfile(out_conn,filekey,err); + } + + ret_sendfile(filekey,err); + } + + if(src_link != self_link){ + //TODO + return; + } + + if((info = info_filekeymap[filekey]) == undefined){ + return; + } + + info.callback = _send_cb; + add_wait_filekey(out_conn.link,filekey,info.blob.size,_send_cb); + out_conn.send_file(filekey,info.blob,_send_cb); + } + function ret_sendfile(filekey,err){ + var info; + + if((info = info_filekeymap[filekey]) == undefined){ + return false; + } + + delete info_filekeymap[filekey]; + + if(err == undefined){ + info.result_callback('Success'); + }else{ + info.result_callback(err); + } + + return true; + } + function add_wait_filekey(conn_link,filekey,filesize,callback){ + conn_filekeymap[conn_link][filekey] = { + 'callback':callback + }; + } + function del_wait_filekey(conn_link,filekey){ + if(conn_link in conn_filekeymap && + filekey in conn_filekeymap[conn_link]){ - var recv_dispatch = function(conn,data){ + delete conn_filekeymap[conn_link][filekey]; + + return true; + }else{ + return false; + } + } + + function recv_dispatch(conn,data){ var msgo = JSON.parse(data); if(msgo.type == MSGTYPE_CALL){ recv_msg_call(conn,msgo); }else if(msgo.type == MSGTYPE_RET){ recv_msg_ret(conn,msgo); + }else if(msgo.type == MSGTYPE_SENDFILE){ + recv_msg_sendfile(conn,msgo); + }else if(msgo.type == MSGTYPE_ABORTFILE){ + recv_msg_abortfile(conn,msgo); } - }; + } - var send_msg_call = function(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param){ + function send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param){ var msg = { 'type':MSGTYPE_CALL, 'caller_link':caller_link, @@ -156,8 +255,8 @@ var imc = new function(){ }; conn.send_msg(JSON.stringify(msg)); - }; - var recv_msg_call = function(conn,msg){ + } + function recv_msg_call(conn,msg){ var caller_link = msg.caller_link var caller_retid = msg.caller_retid; var timeout = msg.timeout; @@ -171,9 +270,9 @@ var imc = new function(){ send_msg_ret(conn,caller_link,caller_retid,result); }); },param); - }; + } - var send_msg_ret = function(conn,caller_link,caller_retid,result){ + function send_msg_ret(conn,caller_link,caller_retid,result){ var msg = { 'type':MSGTYPE_RET, 'caller_link':caller_link, @@ -182,30 +281,43 @@ var imc = new function(){ }; conn.send_msg(JSON.stringify(msg)); - }; - var recv_msg_ret = function(conn,msg){ + } + function recv_msg_ret(conn,msg){ var caller_link = msg['caller_link']; var caller_retid = msg['caller_retid']; var result = msg['result']; - var wait; - if(caller_link == self_link){ - if(conn.link in conn_retidmap && caller_retid in conn_retidmap[conn.link]){ - wait = conn_retidmap[conn.link][caller_retid]; - delete conn_retidmap[conn.link][caller_retid]; + ret_call(conn.link,caller_link,caller_retid,result); + } - wait.callback(result); - } - }else{ - request_conn(caller_link,function(conn){ - send_msg_ret(conn,caller_link,caller_retid,result); - }); + function recv_msg_sendfile(conn,msg){ + route_sendfile(conn,msg.src_link,msg.filekey,msg.filesize); + } + + function send_msg_abortfile(conn,filekey,err){ + var msg = { + 'type':MSGTYPE_ABORTFILE, + 'filekey':filekey, + 'error':err + }; + + conn.send_msg(JSON.stringify(msg)); + } + function recv_msg_abortfile(conn,msg){ + var filekey = msg.filekey; + var err = msg.error; + + if(conn.link in conn_filekeymap && + filekey in conn_filekeymap[conn.link]){ + + conn_filekeymap[conn.link][filekey].callback(err); } - }; + } that.add_conn = function(conn){ conn_linkmap[conn.link] = conn; conn_retidmap[conn.link] = {}; + conn_filekeymap[conn.link] = {}; conn.start_recv(recv_dispatch); }; that.link_conn = function(link,conn){ @@ -218,7 +330,19 @@ var imc = new function(){ delete conn.link_linkmap[link]; }; that.del_conn = function(conn){ + retids = conn_retidmap[conn.link]; + for(retid in retids){ + ret_call(conn.link,caller_link,caller_retid,result); + } + + filekeys = conn_filekeymap[conn.link]; + for(filekey in filekeys){ + filekeys[filekey].callback('Eclose'); + } + + delete conn_retidmap[conn.link]; delete conn_linkmap[conn.link]; + delete conn_filekeymap[conn.link]; }; that.request_conn = function(link,callback){ var conn = conn_linkmap[link]; @@ -251,6 +375,32 @@ var imc = new function(){ route_call(self_link,caller_retid,imc.Auth.get_current_idendesc(),dst,func_name,timeout,callback,params); }; + that.sendfile = function(dst_link, + blob, + filekey_callback, + result_callback){ + + var filekey = self_link + '_' + Math.random(); + + info_filekeymap[filekey] = { + 'blob':blob, + 'result_callback':result_callback, + 'callback':function(err){ + if(ret_sendfile(filekey,err) && err != undefined){ + that.call(dst_link + 'imc/','abort_sendfile',65536,null,filekey,err); + } + } + }; + + that.call(dst_link + 'imc/','pend_recvfile',65536,function(result){ + filekey_callback(filekey); + },self_link,filekey,blob.size); + }; + that.abortfile = function(filekey){ + if((info = info_filekeymap[filekey]) != undefined){ + info.callback('Eabort'); + } + }; that.register_call = function(path,func_name,func){ var cnode; @@ -284,7 +434,15 @@ var imc = new function(){ cnode.filt.remove(func); }; + that.register_call('imc/','abort_sendfile', + function(callback,filekey,err){ + + callback('Success'); + ret_sendfile(filekey,'Eabort'); + }); + conn_retidmap[self_link] = {}; + conn_filekeymap[self_link] = {}; imc.Proxy.instance = that; }; diff --git a/src/pmod/pmod_test/css/manage.less b/src/pmod/pmod_test/css/manage.less index d0b1d59..3c7fd79 100644 --- a/src/pmod/pmod_test/css/manage.less +++ b/src/pmod/pmod_test/css/manage.less @@ -1,14 +1,25 @@ +@import 'mixin.less'; +@import 'color.less'; + #index_page{ div.create_mode,div.set_mode{ - div.content{ - height:256px; + div.block{ + margin-bottom:@MediumPad; + + div.data{ + height:256px; + } } } table.table{ - td.oper{ - div.btn-group{ - position:absolute; + tr.item{ + height:48px; + + td.oper{ + div.btn-group{ + position:absolute; + } } - } + } } } diff --git a/src/pmod/pmod_test/html/manage.html b/src/pmod/pmod_test/html/manage.html index e3b1816..b4ab4d8 100644 --- a/src/pmod/pmod_test/html/manage.html +++ b/src/pmod/pmod_test/html/manage.html @@ -12,13 +12,48 @@ <select name="testmode"></select> </div> </div> - <div class="row-fluid"> - <div class="span13"> - <label>題目內容</label> - <div class="content"></div> + + <div class="block content"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">題目內容</span> + <input class="span2 title" type="text" value="內容" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> </div> </div> - </div> + + <div class="block format"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">格式說明</span> + <input class="span2 title" type="text" value="I/O格式" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> + </div> + </div> + + <div class="block testdata"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">範例資料</span> + <input class="span2 title" type="text" value="範例測資" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> + </div> + </div> </div> <div class="modal-footer"> <button class="btn btn-primary submit">確定</button> <button class="btn cancel">取消</button> @@ -36,10 +71,46 @@ <select name="testmode"></select> </div> </div> - <div class="row-fluid"> - <div class="span13"> - <label>題目內容</label> - <div class="content"></div> + + <div class="block content"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">題目內容</span> + <input class="span2 title" type="text" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> + </div> + </div> + + <div class="block format"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">格式說明</span> + <input class="span2 title" type="text" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> + </div> + </div> + + <div class="block testdata"> + <div class="row-fluid"> + <div class="input-prepend span13"> + <span class="add-on">範例資料</span> + <input class="span2 title" type="text" placeholder="區塊標題"> + </div> + </div> + <div class="row-fluid"> + <div class="span13"> + <div class="data"></div> + </div> </div> </div> </div> @@ -97,7 +168,7 @@ <div class="row"> <div class="span3"> - sdfsdfs + </div> <div class="span4"> <h3>模式</h3> diff --git a/src/pmod/pmod_test/js/pmod_test.js b/src/pmod/pmod_test/js/pmod_test.js index 3d43c54..93565b4 100644 --- a/src/pmod/pmod_test/js/pmod_test.js +++ b/src/pmod/pmod_test/js/pmod_test.js @@ -86,7 +86,7 @@ var pmod_test = function(proid,pro_node){ } } function _mode_create(modeid,testmodeid){ - var j_item = $('<tr><td class="id"></td><td class="testmode"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>') + var j_item = $('<tr class="item"><td class="id"></td><td class="testmode"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>') _mode_set(j_item,modeid,testmodeid); @@ -160,7 +160,7 @@ var pmod_test = function(proid,pro_node){ }); } function _testmode_create(testmodeid,testmodename){ - var j_item = $('<tr><td class="id"></td><td class="name"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>') + var j_item = $('<tr class="item"><td class="id"></td><td class="name"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>') _testmode_set(j_item,testmodeid,testmodename); @@ -198,6 +198,79 @@ var pmod_test = function(proid,pro_node){ function _update(){ _testmode_update().done(_mode_update); } + function _mix_content(j_box){ + var content_title = j_box.find('div.content input.title').val(); + var content = j_box.find('div.content div.data').data('codebox').getValue(); + var format_title = j_box.find('div.format input.title').val(); + var format = j_box.find('div.format div.data').data('codebox').getValue(); + var testdata_title = j_box.find('div.testdata input.title').val(); + var testdata = j_box.find('div.testdata div.data').data('codebox').getValue(); + + console.log(content_title); + + return '<!--content_title_start--><h4>' + content_title + '</h4><!--content_title_end-->' + + '<!--content_start-->' + content + '<!--content_end-->' + + '<!--format_title_start--><h4>' + format_title + '</h4><!--format_title_end-->' + + '<!--format_start-->' + format + '<!--format_end-->' + + '<!--testdata_title_start--><h4>' + testdata_title + '</h4><!--testdata_title_end-->' + + '<!--testdata_start-->' + testdata + '<!--testdata_end-->'; + } + function _parse_content(j_box,mix_content){ + var part; + var content_title; + var content; + var format_title; + var format; + var testdata_title; + var testdata; + + console.log(mix_content); + part = mix_content.match(/<!--content_title_start--><h4>([\s\S.]*)<\/h4><!--content_title_end-->/); + if(part != null){ + content_title = part[1]; + }else{ + content_title = ''; + } + part = mix_content.match(/<!--content_start-->([\s\S.]*)<!--content_end-->/); + if(part != null){ + content = part[1]; + }else{ + content = ''; + } + + part = mix_content.match(/<!--format_title_start--><h4>([\s\S.]*)<\/h4><!--format_title_end-->/); + if(part != null){ + format_title = part[1]; + }else{ + format_title = ''; + } + part = mix_content.match(/<!--format_start-->([\s\S.]*)<!--format_end-->/); + if(part != null){ + format = part[1]; + }else{ + format = ''; + } + + part = mix_content.match(/<!--testdata_title_start--><h4>([\s\S.]*)<\/h4><!--testdata_title_end-->/); + if(part != null){ + testdata_title = part[1]; + }else{ + testdata_title = ''; + } + part = mix_content.match(/<!--testdata_start-->([\s\S.]*)<!--testdata_end-->/); + if(part != null){ + testdata = part[1]; + }else{ + testdata = ''; + } + + j_box.find('div.content input.title').val(content_title); + j_box.find('div.content div.data').codebox().setValue(content); + j_box.find('div.format input.title').val(format_title); + j_box.find('div.format div.data').codebox().setValue(format); + j_box.find('div.testdata input.title').val(testdata_title); + j_box.find('div.testdata div.data').codebox().setValue(testdata); + } if(direct == 'in'){ com.loadpage('/toj/pmod/pmod_test/html/manage.html').done(function(){ @@ -205,17 +278,37 @@ var pmod_test = function(proid,pro_node){ j_testmode_list = j_index_page.find('table.testmode > tbody'); j_create_mode = j_index_page.find('div.create_mode'); - com.create_codebox(j_create_mode.find('div.content'),'text/html'); + j_create_mode.find('div.content div.data').codebox({'mode':'text/html'}); + j_create_mode.find('div.format div.data').codebox({'mode':'text/html'}); + j_create_mode.find('div.testdata div.data').codebox({'mode':'text/html'}); j_create_mode.on('shown',function(e){ - j_create_mode.find('div.content').data('codebox').refresh(); + var i; + var codeboxs; + + codeboxs = j_create_mode.find('div.block div.data'); + for(i = 0;i < codeboxs.length;i++){ + $(codeboxs[i]).data('codebox').refresh(); + } }); j_create_mode.on('hide',function(e){ - j_create_mode.find('div.content').data('codebox').setValue(''); + var i; + var codeboxs; + + j_create_mode.find('div.content input.title').val('內容'); + j_create_mode.find('div.format input.title').val('I/O格式'); + j_create_mode.find('div.testdata input.title').val('範例測資'); + + codeboxs = j_create_mode.find('div.block div.data'); + for(i = 0;i < codeboxs.length;i++){ + $(codeboxs[i]).data('codebox').setValue(''); + } }); j_create_mode.find('button.submit').on('click',function(e){ - var content = j_create_mode.find('div.content').data('codebox').getValue(); var testmodeid = parseInt(j_create_mode.find('[name="testmode"]').val()); + var mix_content; + + mix_content = _mix_content(j_create_mode); com.call_backend(callpath,'add_mode',function(result){ if(com.is_callerr(result)){ @@ -226,7 +319,7 @@ var pmod_test = function(proid,pro_node){ _update(); } - },content,testmodeid); + },mix_content,testmodeid); }); j_create_mode.find('button.cancel').on('click',function(e){ j_create_mode.modal('hide'); @@ -237,16 +330,20 @@ var pmod_test = function(proid,pro_node){ }); j_set_mode = j_index_page.find('div.set_mode'); - com.create_codebox(j_set_mode.find('div.content'),'text/html'); + j_set_mode.find('div.content div.data').codebox({'mode':'text/html'}); + j_set_mode.find('div.format div.data').codebox({'mode':'text/html'}); + j_set_mode.find('div.testdata div.data').codebox({'mode':'text/html'}); j_set_mode.on('show',function(e){ com.call_backend(callpath,'get_mode',function(result){ var data = result.data; + var parse_content; if(com.is_callerr(result)){ index.add_alert('','警告','管理發生錯誤'); }else{ - j_set_mode.find('div.content').data('codebox').setValue(data.content); + parse_content = _parse_content(j_set_mode,data.content); + if(data.testmodeid == null){ j_set_mode.find('[name="testmode"]').val(0); }else{ @@ -256,20 +353,37 @@ var pmod_test = function(proid,pro_node){ },set_mode_id); }); j_set_mode.on('shown',function(e){ - j_set_mode.find('div.content').data('codebox').refresh(); + var i; + var codeboxs; + + codeboxs = j_set_mode.find('div.block div.data'); + for(i = 0;i < codeboxs.length;i++){ + $(codeboxs[i]).data('codebox').refresh(); + } }); j_set_mode.on('hide',function(e){ + var i; + var codeboxs; + set_mode_id = null; - j_set_mode.find('div.content').data('codebox').setValue(''); + + j_set_mode.find('div.block input.title').val(''); + + codeboxs = j_set_mode.find('div.block div.data'); + for(i = 0;i < codeboxs.length;i++){ + $(codeboxs[i]).data('codebox').setValue(''); + } }); j_set_mode.find('button.submit').on('click',function(e){ - var content = j_set_mode.find('div.content').data('codebox').getValue(); var testmodeid = parseInt(j_set_mode.find('[name="testmode"]').val()); + var mix_content; if(testmodeid == 0){ testmodeid = null; } + mix_content = _mix_content(j_set_mode); + com.call_backend(callpath,'set_mode',function(result){ if(com.is_callerr(result)){ index.add_alert('','警告','管理發生錯誤'); @@ -279,7 +393,7 @@ var pmod_test = function(proid,pro_node){ _update(); } - },set_mode_id,content,testmodeid); + },set_mode_id,mix_content,testmodeid); }); j_set_mode.find('button.cancel').on('click',function(e){ j_set_mode.modal('hide'); diff --git a/src/pmod/pmod_test/py/pmod_test.py b/src/pmod/pmod_test/py/pmod_test.py index 33e4a0e..ac96f19 100644 --- a/src/pmod/pmod_test/py/pmod_test.py +++ b/src/pmod/pmod_test/py/pmod_test.py @@ -46,9 +46,19 @@ class pmod_test(Problem): Proxy.instance.register_call( self._reg_path, 'list_testmode', self.list_testmode) Proxy.instance.register_call( - self._reg_path, 'set_testdata', self.set_testdata) + self._reg_path, 'create_testdata', self.create_testdata) + Proxy.instance.register_call( + self._reg_path, 'delete_testdata', self.delete_testdata) Proxy.instance.register_call( self._reg_path, 'get_testdata', self.get_testdata) + Proxy.instance.register_call( + self._reg_path, 'list_testdata', self.list_testdata) + Proxy.instance.register_call( + self._reg_path, 'set_testdata', self.set_testdata) + Proxy.instance.register_call( + self._reg_path, 'set_testmode_testdata', self.set_testdata) + Proxy.instance.register_call( + self._reg_path, 'get_testmode_testdata', self.get_testdata) def unload(self, force): Proxy.instance.unregister_call( @@ -72,9 +82,19 @@ class pmod_test(Problem): Proxy.instance.unregister_call( self._reg_path, 'get_testmode') Proxy.instance.unregister_call( - self._reg_path, 'set_testdata') + self._reg_path, 'create_testdata') + Proxy.instance.unregister_call( + self._reg_path, 'delete_testdata') Proxy.instance.unregister_call( self._reg_path, 'get_testdata') + Proxy.instance.unregister_call( + self._reg_path, 'list_testdata') + Proxy.instance.unregister_call( + self._reg_path, 'set_testdata') + Proxy.instance.unregister_call( + self._reg_path, 'set_testmode_testdata') + Proxy.instance.unregister_call( + self._reg_path, 'get_testmode_testdata') @staticmethod @TOJAuth.check_access(mod.ProblemMg._accessid, TOJAuth.ACCESS_CREATE) @@ -398,7 +418,161 @@ class pmod_test(Problem): return testmode_list @imc.async.caller - def set_testdata(self, testmodeid, testdata): + def create_testdata(self, info, filekey, expire = None): + if expire != None: + expire = com.isoptime(expire) + if expire == None: + return 'Eparameter' + + if( + type(info) != str or + type(filekey) != str + ): + return 'Eparameter' + + testid = self._create_testdata(info, filekey, expire) + + if testid == None: + return 'Eupload' + + return {'testid': testid} + + def _create_testdata(self, info, filekey, expire): + TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) + + # Upload file + + blobname = 'TEST_BLOBNAME' + + with TOJAuth.change_current_iden(self._idendesc): + testid = mod.TestdataMg.instance._add_testdata( + blobname, expire, self._proid, info) + + return testid + + @imc.async.caller + def delete_testdata(self, testid): + if( + type(testid) != int + ): + return 'Eparameter' + + with TOJAuth.change_current_iden(self._idendesc): + test = mod.TestdataMg.instance._get_testdata(testid) + + if test == None: + return 'Etestid' + + if test['proid'] != self._proid: + return 'Eother_proid' + + self._delete_testdata(testid) + + return 'Success' + + def _delete_testdata(self, testid): + TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_DELETE) + + with TOJAuth.change_current_iden(self._idendesc): + mod.TestdataMg.instance._del_testdata(testid) + + @imc.async.caller + def get_testdata(self, testid): + if( + type(testid) != int + ): + return 'Eparameter' + + TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) + + with TOJAuth.change_current_iden(self._idendesc): + test = mod.TestdataMg.instance._get_testdata(testid) + + if test == None: + return 'Etestid' + + if test['proid'] != self._proid: + return 'Eother_proid' + + del test['blobname'] + del test['proid'] + + return test + + @imc.async.caller + def list_testdata(self): + testdata_list = self._list_testdata() + + return testdata_list + + def _list_testdata(self): + TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) + + with TOJAuth.change_current_iden(self._idendesc): + testdata_list = mod.TestdataMg.instance._list_testdata(self._proid) + + for test in testdata_list: + del test['blobname'] + del test['proid'] + + return testdata_list + + @imc.async.caller + def set_testdata(self, testid, info, filekey = None, expire = None): + if expire != None: + expire = com.isoptime(expire) + if expire == None: + return 'Eparameter' + + if( + type(testid) != int or + type(info) != str or + (filekey != None and type(filekey) != str) + ): + return 'Eparameter' + + with TOJAuth.change_current_iden(self._idendesc): + test = mod.TestdataMg.instance._get_testdata(testid) + + if test == None: + return 'Etestid' + + if test['proid'] != self._proid: + return 'Eother_proid' + + result = self._set_testdata(testid, info, filekey, expire) + + if result == None: + return 'Efailed' + + if result == False: + return 'Eupload' + + return 'Success' + + def _set_testdata(self, testid, info, filekey, expire): + TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) + + with TOJAuth.change_current_iden(self._idendesc): + test = mod.TestdataMg.instance._get_testdata(testid) + + blobname = test['blobname'] + if test['proid'] != self._proid: + return None + + if filekey != None: + # Upload file + # Update blob 'blobname' + # If failed return False + + with TOJAuth.change_current_iden(self._idendesc): + mod.TestdataMg.instance._update_testdata( + testid, blobname, expire, self._proid, info) + + return True + + @imc.async.caller + def set_testmode_testdata(self, testmodeid, testdata): if( type(testmodeid) != int or type(testdata) != list @@ -432,7 +606,7 @@ class pmod_test(Problem): return 'Success' - def _set_testdata(self, testmodeid, testdata): + def _set_testmode_testdata(self, testmodeid, testdata): TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) cur = self.db.cursor() @@ -472,7 +646,7 @@ class pmod_test(Problem): cur.execute(sqlstr, sqlarr) @imc.async.caller - def get_testdata(self, testmodeid): + def get_testmode_testdata(self, testmodeid): if( type(testmodeid) != int ): @@ -485,7 +659,7 @@ class pmod_test(Problem): return testdata - def _get_testdata(self, testmodeid): + def _get_testmode_testdata(self, testmodeid): TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE) cur = self.db.cursor() diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 782b000..2e56b52 100755 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -1,5 +1,6 @@ #! /usr/bin/env python +import os import traceback import sys import socket @@ -7,6 +8,8 @@ import json import datetime import time import random +import uuid +from collections import deque from multiprocessing import Process import tornado.ioloop @@ -16,14 +19,15 @@ import tornado.websocket import imc.async from imc.proxy import Proxy,Connection +from imc.blobclient import BlobClient import mod import netio -from netio import SocketStream,SocketConnection,WebSocketConnection +from netio import SocketStream,SocketConnection +from netio import WebSocketStream,WebSocketConnection from tojauth import TOJAuth from test_blob import TOJBlobTable,TOJBlobHandle -from imc.blobclient import BlobClient class StdLogger(object): def __init__(self,callback): @@ -48,7 +52,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._link = None self._idendesc = None self._pend_mainconn_linkmap = {} - self._pend_filestream_filekeymap = {} + self._pend_filekeymap = {} self._client_linkmap = {} def start(self): @@ -66,7 +70,12 @@ class BackendWorker(tornado.tcpserver.TCPServer): def handle_stream(self,stream,addr): def _recv_conn_info(data): info = json.loads(data.decode('utf-8')) - conntype = info['conntype'] + + try: + conntype = info['conntype'] + + except KeyError: + socket_stream.close() if conntype == 'main': self._handle_mainconn(sock_stream,addr,info) @@ -74,13 +83,16 @@ class BackendWorker(tornado.tcpserver.TCPServer): elif conntype == 'file': self._handle_fileconn(sock_stream,addr,info) + else: + socket_stream.close() + fd = stream.fileno() self._ioloop.remove_handler(fd) sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0)) netio.recv_pack(sock_stream,_recv_conn_info) - def add_client(self,link,handler): + def add_client(self,link,main_stream): @imc.async.caller def _call(): with TOJAuth.change_current_iden(self._idendesc): @@ -88,14 +100,13 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._client_linkmap[link] = {} - conn = netio.WebSocketConnection(link,handler) + conn = netio.WebSocketConnection(link,main_stream,self.pend_filestream, + self.del_pend_filestream) conn.add_close_callback(lambda conn : self.del_client(conn.link)) Proxy.instance.add_conn(conn) _call() - return conn - def del_client(self,link): @imc.async.caller def _call(): @@ -105,6 +116,34 @@ class BackendWorker(tornado.tcpserver.TCPServer): del self._client_linkmap[link] _call() + + def pend_filestream(self,streamtype,filekey,callback,count = 1): + assert(filekey not in self._pend_filekeymap) + + self._pend_filekeymap[filekey] = { + 'streamtype':streamtype, + 'count':count, + 'stream':[], + 'callback':tornado.stack_context.wrap(callback) + } + + def add_filestream(self,streamtype,filekey,stream): + try: + pend = self._pend_filekeymap[filekey] + + except KeyError: + raise + + assert(pend['streamtype'] == streamtype) + + pend['count'] -= 1 + if pend['count'] == 0: + self._pend_filekeymap.pop(filekey) + + pend['callback'](stream) + + def del_pend_filestream(self,filekey): + self._pend_filekeymap.pop(filekey,None) def _conn_center(self): def __retry(conn): @@ -123,14 +162,17 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._link = info['worker_link'] Proxy(self._link,TOJAuth.instance,self._idendesc,self._conn_link) - self.center_conn = SocketConnection(info['center_link'],stream,self.center_addr,self._add_pend_filestream) + self.center_conn = SocketConnection(info['center_link'],stream, + self.center_addr, + self.pend_filestream, + self.del_pend_filestream) self.center_conn.add_close_callback(__retry) Proxy.instance.add_conn(self.center_conn) - self._init_blobclient() + #self._init_blobclient() #Proxy.instance.register_call('test/','get_client_list',self._test_get_client_list) - #Proxy.instance.register_call('test/','test_dst',self._test_dst) + Proxy.instance.register_call('test/','test_dst',self._test_dst) #Proxy.instance.register_filter('test/',self._test_filter) try: @@ -143,8 +185,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): except Exception as e: print(e) - #if self._link == '/backend/2/': - # self._test_call(None) + if self._link == '/backend/2/': + self._test_call(None) sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -171,19 +213,52 @@ class BackendWorker(tornado.tcpserver.TCPServer): 'blobtmp/' + str(self.ws_port - 79), TOJBlobTable(self.ws_port - 79), TOJBlobHandle) - - blobclient.open_container('test','ACTIVE') - try: + + print(self.ws_port, "open cantainer test") + print(blobclient.open_container('test','ACTIVE')) + # if False: + if self.ws_port == 81: handle = blobclient.open( 'test','testblob', TOJBlobHandle.WRITE | TOJBlobHandle.CREATE ) - except: - pass - print(handle._fileno) - handle.write(bytes('Hello Data','utf-8'),0) - handle.commit(False); + print(handle._fileno) + handle.write(bytes('Hello Data','utf-8'),0) + print('create commit:', handle.commit(False)) + handle.close() + print("#########################################################") + # print("wait for 3 secs...") + # time.sleep(3) + # try: + # handle = blobclient.open( + # 'test', 'testblob', + # TOJBlobHandle.CREATE + # ) + # except ValueError as e: + # print("catch ValueError:", str(e)) + # print("#########################################################") + # print("wait for 3 secs...") + # time.sleep(3) + # handle = blobclient.open( + # 'test', 'testblob', + # TOJBlobHandle.WRITE + # ) + # handle.write(bytes('Hello new line\n','utf-8'),30) + # print('write commit:', handle.commit(False)) + # handle.close() + # print("#########################################################") + # print("wait for 3 secs...") + # time.sleep(3) + # handle = blobclient.open( + # 'test', 'testblob', + # TOJBlobHandle.WRITE | TOJBlobHandle.DELETE + # ) + # handle.delete() + # print('delete commit:', handle.commit(False)) + # handle.close() + blobclient.clean() + blobclient.show_status() def _conn_link(self,link): def __handle_pend(conn): @@ -216,7 +291,9 @@ class BackendWorker(tornado.tcpserver.TCPServer): def __recv_cb(data): stat = json.loads(data.decode('utf-8')) if stat == True: - conn = SocketConnection(worker_link,main_stream,sock_addr,self._add_pend_filestream) + conn = SocketConnection(worker_link,main_stream,sock_addr, + self.pend_filestream, + self.del_pend_filestream) Proxy.instance.add_conn(conn) __handle_pend(conn) @@ -255,9 +332,6 @@ class BackendWorker(tornado.tcpserver.TCPServer): return imc.async.switch_top() - def _add_pend_filestream(self,filekey,callback): - self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) - def _handle_mainconn(self,main_stream,addr,info): link = info['link'] sock_ip = info['sock_ip'] @@ -268,7 +342,9 @@ class BackendWorker(tornado.tcpserver.TCPServer): return if (link not in self._pend_mainconn_linkmap) or self._link > link: - conn = SocketConnection(link,main_stream,(sock_ip,sock_port),self._add_pend_filestream) + conn = SocketConnection(link,main_stream,(sock_ip,sock_port), + self.pend_filestream, + self.del_pend_filestream) Proxy.instance.add_conn(conn) netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8')) @@ -283,10 +359,10 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _handle_fileconn(self,file_stream,addr,info): try: - self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream) + self.add_filestream('socket',info['filekey'],file_stream) - except KeyError: - pass + except Exception: + file_stream.close() def _get_link(self,linkclass,uid = 0): if linkclass == 'center': @@ -319,6 +395,10 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,param): with TOJAuth.change_current_iden(self._idendesc): + ret = Proxy.instance.call('/backend/3/test/','test_dst',1000,'Hello') + print(ret) + + ''' st = time.perf_counter() for i in range(0,2): dst = '/backend/' + str((i % 2) + 2) + '/' @@ -332,33 +412,17 @@ class BackendWorker(tornado.tcpserver.TCPServer): print(time.perf_counter() - st) print(self._link) - - return - - pend = [] - for i in range(0,32): - if str((i % 16) + 2) == self._link: - continue - - fileres = Proxy.instance.sendfile('/backend/' + str((i % 16) + 2) + '/','Fedora-18-x86_64-DVD.iso') - - dst = '/backend/' + str((i % 16) + 2) + '/' - ret = Proxy.instance.call(self._idendesc,dst,'test_dst',fileres.filekey) - - pend.append(fileres) - - for p in pend: - print(self._link + ' ' + p.wait()) - - print(self._link) + ''' @imc.async.caller def _test_dst(self,filekey): print(filekey) - self._ioloop.add_timeout(datetime.timedelta(milliseconds = 2000),lambda : Proxy.instance.abortfile(filekey)) - #Proxy.instance.abortfile(filekey) fileres = Proxy.instance.recvfile(filekey,'data') + + #self._ioloop.add_timeout(datetime.timedelta(milliseconds = 500),lambda : Proxy.instance.abortfile(filekey)) + #Proxy.instance.abortfile(filekey) + #fileres = Proxy.instance.recvfile(filekey,'data') #print('recv ' + fileres.wait()) print(fileres.wait()) @@ -375,24 +439,47 @@ class WebSocketConnHandler(tornado.websocket.WebSocketHandler): def on_message(self,msg): global backend_worker - if hasattr(self,'backend_conn'): - self.backend_conn.recv_msg(msg) - + if hasattr(self,'conntype'): + self.stream.recv_msg(msg) + else: - try: - info = json.loads(msg) - print(info) - self.backend_conn = backend_worker.add_client(info['client_link'],self) + info = json.loads(msg) + self.conntype = info['conntype'] + self.stream = WebSocketStream(self) + + if self.conntype == 'main': + self._handle_mainconn(self.stream,info) - except Exception: - self.close() + elif self.conntype == 'file': + self._handle_fileconn(self.stream,info) + + else: + self.stream.close() def on_close(self): - global backend_backend + if hasattr(self,'conntype'): + self.stream.close() + + def _handle_mainconn(self,main_stream,info): + global backend_worker + + try: + backend_worker.add_client(info['client_link'],main_stream) - if hasattr(self,'backend_conn'): - self.backend_conn.close() + except Exception: + main_stream.close() + def _handle_fileconn(self,file_stream,info): + global backend_worker + + try: + backend_worker.add_filestream('websocket',info['filekey'], + file_stream) + print('test') + + except Exception as err: + file_stream.close() + def start_backend_worker(ws_port): global backend_worker diff --git a/src/py/center_server.py b/src/py/center_server.py index c6e81b0..fed7bb5 100755 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -4,6 +4,7 @@ import random import json import uuid import socket +import time import tornado.ioloop import tornado.tcpserver @@ -142,13 +143,13 @@ class CenterServer(tornado.tcpserver.TCPServer): @imc.async.caller def _init_blobserver(self): - BlobServer(Proxy.instance, - TOJAuth.instance, - self._idendesc, - self._link, - 'blobtmp/1', - TOJBlobTable(1), - TOJBlobHandle) + blobserver = BlobServer(Proxy.instance, + TOJAuth.instance, + self._idendesc, + self._link, + 'blobtmp/1', + TOJBlobTable(1), + TOJBlobHandle) def _create_link(self,linkclass): linkid = uuid.uuid1() diff --git a/src/py/com.py b/src/py/com.py new file mode 100644 index 0000000..91b21a6 --- /dev/null +++ b/src/py/com.py @@ -0,0 +1,12 @@ +import datetime + +def isoptime(time): + try: + if time[-1] == 'Z': + return datetime.datetime.strptime(time,'%Y-%m-%dT%H:%M:%S.%fZ') + + else: + return datetime.datetime.strptime(time,'%Y-%m-%dT%H:%M:%S.%f') + + except ValueError: + return None diff --git a/src/py/dispatch.py b/src/py/dispatch.py new file mode 100644 index 0000000..9484a5c --- /dev/null +++ b/src/py/dispatch.py @@ -0,0 +1,110 @@ +import json + +import config +import imc.async +from tojauth import TOJAuth +from asyncdb import AsyncDB + +class Data: + def __init__(self,dataid,datatype,source,target,status,data,gid,gcount): + self.dataid = dataid + self.datatype = datatype + self.source = source + self.target = target + self.data = data + self.gid = gid + self,gcount = gcount + +class DispatchMg: + _accessid = -1 + + def __init__(self,mod_idendesc,get_link_fn): + self.DATATYPE_CHALLENGE = 1 + self.DATATYPE_STATUS = 2 + + self.DATASTSTUS_PEND = 1 + self.DATASTSTUS_WAIT = 2 + self.DATASTSTUS_DONE = 3 + + self._idendesc = mod_idendesc + self.get_link = get_link_fn + self.db = AsyncDB(config.CORE_DBNAME,config.CORE_DBUSER, + config.CORE_DBPASSWORD) + + self.collector_namemap = {} + + @imc.async.caller + @TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE) + def _add_challenge(self,source,target,data,gid = None,gcount = 1): + cur = self.db.cursor() + + cur.execute(('INSERT INTO "DATA_POOL" ' + '("type","source","target","gid","gcount","status","data") ' + 'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'), + (self.DATATYPE_CHALLENGE,source,target,gid,gcount, + self.DATASTSTUS_PEND,json.dumps(data,'utf-8'))) + + if cur.rowcount == 0: + return 'Efailed' + + dataid = cur.fetchone()[0] + + data = Data(dataid,self.DATATYPE_CHALLENGE,source,target, + self.DATASTATUS_PEND,data,gid,gcount) + + return {'dataid':dataid} + + @imc.async.caller + def _register_challenge_collector(self,name): + link = TOJAuth.get_current_iden() + linkclass = link.split('/')[1] + if linkclass != 'backend': + return 'Efailed' + + return self._register_collector(link,''.join(['challenge/',name])) + + @imc.async.caller + def _register_status_collector(self,name): + link = TOJAuth.get_current_iden() + linkclass = link.split('/')[1] + if linkclass != 'backend': + return 'Efailed' + + return self._register_collector(link,''.join(['status/',name])) + + @imc.async.caller + def _unregister_challenge_collector(self,name): + link = TOJAuth.get_current_iden(); + linkclass = link.split('/')[1] + if linkclass != 'backend': + return 'Efailed' + + return self._unregister_collector(link,''.join(['challenge/',name])) + + @imc.async.caller + def _unregister_status_collector(self,name): + link = TOJAuth.get_current_iden(); + linkclass = link.split('/')[1] + if linkclass != 'backend': + return 'Efailed' + + return self._unregister_collector(link,''.join(['challenge/',name])) + + def _register_collector(self,link,name): + if name not in self.collector_namemap: + self.collector_namemap[name] = {} + + self.collector_namemap[name][link] = {} + + return 'Success' + + def _unregister_collector(self,link,name): + if name not in self.collector_namemap: + return 'Success' + + self.collector_namemap[name].pop(link) + + return 'Success' + + def _dispatch_data(self,datatype,target): + pass diff --git a/src/py/imc/async.py b/src/py/imc/async.py index a9dd048..1642ad3 100644 --- a/src/py/imc/async.py +++ b/src/py/imc/async.py @@ -101,7 +101,7 @@ def ret(retid,value = None,err = None): global gr_idmap global ret_idmap - assert greenlet.getcurrent() == gr_main + #assert greenlet.getcurrent() == gr_main try: gr = ret_idmap.pop(retid) @@ -122,7 +122,7 @@ def ret(retid,value = None,err = None): except Exception as err: traceback.print_stack() - print(err) + print(type(err)) finally: tornado.stack_context._state.contexts = old_contexts diff --git a/src/py/imc/blobclient.py b/src/py/imc/blobclient.py index 8a7baea..1def7b6 100755 --- a/src/py/imc/blobclient.py +++ b/src/py/imc/blobclient.py @@ -1,13 +1,13 @@ #! /usr/bin/env python import os +import uuid +from collections import Counter from imc.auth import Auth import imc.async from imc.proxy import Proxy -from collections import Counter - class BlobClient: def __init__(self, proxy, auth, idendesc, link, serverlink, location, cachetable, BlobHandle): @@ -26,42 +26,53 @@ class BlobClient: self.connect_server() self._proxy.register_call('blobclient/', 'get_update', self.get_update) + self._proxy.register_call('blobclient/', 'get_request', + self.get_request) + + self.clean() +##################debug code########################## + print('blobclient: init') + # self.show_status() + + def show_status(self): + print('blobclient: containers=>\n', self._containers) + print('blobclient: blobs=>\n', self._cachetable.get_blob_list()) + print('blobclient: opencounts=>\n', self._opencounts) + print('blobclient: deltags=>\n', self._deltags) +###################################################### def __del__(self): self._proxy.unregister_call('blobclient/', 'get_update') + self._proxy.unregister_call('blobclient/', 'get_request') def _server_call(self, func, *args): + if not self._is_connected: + if not self.connect_server(): + pass + server = self._server + 'blobserver/' with Auth.change_current_iden(self._idendesc): - for i in range(5): - sta, ret = self._proxy.call(server, func, 10000, - self._link, *args) - if sta or (not sta and ret == 'Enoexist'): - break + sta, ret = self._proxy.call(server, func, 10000, + self._link, *args) + if not sta: + self._is_connected = False return (sta, ret) def _client_call(self, otherclient, func, *args): otherclient += 'blobclient/' with TOJAuth.change_current_iden(self._idendesc): - for i in range(5): - sta, ret = self._proxy.call(otherclient, func, 10000, - self._link, *args) - if sta or (not sta and ret == 'Enoexist'): - break + sta, ret = self._proxy.call(otherclient, func, 10000, + self._link, *args) return (sta, ret) def connect_server(self, serverlink=None): if serverlink: self._server = serverlink - sta, ret = self._server_call('connect_client', - self._cachetable.get_blob_list()) - if sta: - if ret: - self._is_connected = True - else: - pass - else: - pass + server = self._server + 'blobserver/' + with Auth.change_current_iden(self._idendesc): + sta, ret = self._proxy.call(server, 'connect_client', 10000, + self._link) + self._is_connected = sta if self._is_connected: # TODO: pass @@ -72,7 +83,7 @@ class BlobClient: if not sta: # TODO: # pend operation when client can't imc call server - return None + return 'Edisconnected' if ret: self._containers[container] = method return True @@ -94,114 +105,94 @@ class BlobClient: # TODO: # periodically call this function to clean old data and do something else # ex: send pending operation - def sync(self): - for blobname_rev in self._deltags: - self.del_real_blob(blobname_rev) + def clean(self): + del_list = list(self._deltags) + for rev in del_list: + self.del_real_blob(rev) + del_list = [] + for blob, rev in (self._cachetable.get_blob_list().items()): + if not self.blob_exists(rev): + del_list.append(blob) + for container, name in del_list: + self.del_blob(container, name) # for container in self._containers: # if self._containers[container] == 'ALWAYS': # for blob in self._cachetable.get_blob_list(container): # self.update(blob) @imc.async.caller - def get_update(self, blobname, info, filekey=None): - if info is None: - self.del_blob(blobname) - elif filekey is not None: - rev = info['rev'] - if self.recv_blob(filekey, blobname, rev).wait() == 'Success': - self.update_blob(blobname, info) - sta, ret = self._server_call('recv_update_result', - blobname, "Success", rev) - if not sta: - # TODO: - pass - else: - self.update_blob(blobname, info) - return self._link - - def update(self, blobname): - cacherev = self._cachetable.get_blob_info(blobname, 'rev') - if cacherev == None: - cacherev = 0 + def get_update(self, container, name): + self.update(container, name) + # pass - sta, ret = self._server_call('check_blob', blobname, cacherev) + def update(self, container, name, force=False): + info = self._cachetable.get_blob_info(container, name) + if info and not force and self.blob_exists(info['rev']): + cacherev = info['rev'] + cachesha1 = info['sha1'] + else: + cacherev = None + cachesha1 = None + sta, ret = self._server_call('check_blob', container, name, + cacherev, cachesha1) if not sta: # TODO: # pend operation when client can't imc call server - pass - elif ret == 'up_to_date': - pass - elif ret == 'no_exist': - self._cachetable.del_blob(blobname) + return 'Efailed' + elif ret[0] == 'up_to_date': + return info + elif ret[0] == 'no_exist': + self._cachetable.del_blob(container, name) if cacherev: - self.del_real_blob(''.join([blobname, '_', str(cacherev)])) + self.del_real_blob(cacherev) + return None else: - info = ret['info'] - rev = info['rev'] - for i in range(4): - rst = self.recv_blob(ret['filekey'], blobname, rev).wait() - sta, ret = self._server_call('recv_update_result', blobname, - rst, rev, True) - - if 'Success' == ret: - self.update_blob(blobname, info) - break + filekey, info = ret + if filekey: + rst = self.recv_blob(filekey, info['rev']).wait() + else: + if self.copy_blob(cacherev, info['rev']): + rst = 'Success' + else: + rst = None + if 'Success' == rst: + self.update_blob(info) + return info + else: + return 'Efailed' def commit(self, commit_info, force_flag, blobhandle): - filekey = None - if not commit_info['deltag'] and commit_info['written']: - result = self.send_blob(blobhandle._tmpfile) - filekey = result.filekey - sta, ret = self._server_call('recv_commit', commit_info, - force_flag, filekey) + sta, ret = self._server_call('recv_commit', commit_info, force_flag) if not sta: # TODO: # pend operation when client can't imc call server + # local commit return False else: - # TODO: - # if commit success , copy tmpfile to location if ret: - blobhandle.copy_tmp() - - # TODO: - # opencounts ? - def send_blob(self, blobpath, otherclient=None): - if otherclient is None: - return self._proxy.sendfile(self._server, blobpath) - else: - return self._proxy.sendfile(otherclient, blobpath) - - def recv_blob(self, filekey, blobname, rev): - blobpath = os.path.join(self._location, blobname + '_' + str(rev)) - return self._proxy.recvfile(filekey, blobpath) - - def update_blob(self, blobname, info): - rev = self._cachetable.get_blob_info(blobname, 'rev') - blobname_rev = ''.join([blobname, '_', str(rev)]) - self.del_real_blob(blobname_rev) - self._cachetable.update_blob(blobname, info) - - def del_blob(self, blobname): - rev = self._cachetable.get_blob_info(blobname, 'rev') - blobname_rev = ''.join([blobname, '_', str(rev)]) - self._cachetable.del_blob(blobname) - self.del_real_blob(blobname_rev) - - def del_real_blob(self, blobname_rev): - if self._opencounts[blobname_rev] == 0: - path = os.path.join(self._location, blobname_rev) - self.BlobHandle.del_blob(path) - else: - self._deltags.add(blobname_rev) + if ret['rev']: + if blobhandle.copy_tmp(ret['rev']): + self.update_blob(ret) + return True + else: + pass + else: + self.del_blob(ret['container'], ret['name']) + return True + else: + return False - def open(self, container, blobname, flag): + def open(self, container, name, flag): if container not in self._containers: raise Exception("this container isn't open") - blob = ''.join([container, '_', blobname]) - self.update(blob) - info = self._cachetable.get_blob_info(blob) + if (flag & self.BlobHandle.CREATE and + not flag & self.BlobHandle.WRITE): + raise ValueError("invalid flag") + info = self.update(container, name) + info = 'Efailed' + if info == 'Efailed': + info = self._cachetable.get_blob_info(container, name) if info is None: if (not flag & self.BlobHandle.WRITE or not flag & self.BlobHandle.CREATE): @@ -209,21 +200,83 @@ class BlobClient: "add a create flag") else: info = {'container': container, - 'rev': 0, + 'name': name, + 'rev': None, + 'sha1': None, 'metadata': '', 'size': None, 'commit_time': None} + else: + self._opencounts[info['rev']] += 1 + handle = self.BlobHandle(info, flag, self) + return handle + + def close(self, rev): + if self._opencounts[rev] > 0: + self._opencounts[rev] -= 1 + + # @imc.async.caller + # def send_to_other(self, info, otherclient): + # pass + + @imc.async.caller + def get_request(self, target): + result = self.send_blob(target) + if result: + return result.filekey + else: + return None + + # def request_blob(self, container, name): + # sta, ret = self._server_call('get_request', container, name) + # if not sta: + # return False + # else: + # return ret + + def send_blob(self, blobpath, otherclient=None): try: - handle = self.BlobHandle(blob, info, flag, self) - except ValueError: - raise + if otherclient: + ret = self._proxy.sendfile(otherclient, blobpath) + else: + ret = self._proxy.sendfile(self._server, blobpath) + except ConnectionError: + return 'Efailtosend' else: - blob = ''.join(blob, '_', str(handle.get_rev())) - self._opencounts[blob] += 1 - return handle + return ret + + def recv_blob(self, filekey, filename): + blobpath = os.path.join(self._location, filename) + return self._proxy.recvfile(filekey, blobpath) + + def update_blob(self, info): + rev = self._cachetable.get_blob_info(info['container'], + info['name'], 'rev') + if rev: + self.del_real_blob(rev) + self._cachetable.update_blob(info) + + def del_blob(self, container, name): + rev = self._cachetable.get_blob_info(container, name, 'rev') + self._cachetable.del_blob(container, name) + if rev: + self.del_real_blob(rev) + + def del_real_blob(self, rev): + self._deltags.add(rev) + if self._opencounts[rev] == 0: + self._deltags.remove(rev) + path = os.path.join(self._location, rev) + self.BlobHandle.del_blob(path) - def close(self, blobhandle): - blob = ''.join([blobhandle._name, '_', - str(blobhandle.get_rev())]) - self._opencounts[blob] -= 1 + def copy_blob(self, rev, newrev): + path = os.path.join(self._location, rev) + newpath = os.path.join(self._location, newrev) + return self.BlobHandle.copy_file(path, newpath) + def vertify_blob(self, rev, sha1): + blobpath = os.path.join(self._location, rev) + return self.BlobHandle._sha1(blobpath) == sha1 + + def blob_exists(self, rev): + return self.BlobHandle.file_exists(os.path.join(self._location, rev)) diff --git a/src/py/imc/blobhandle.py b/src/py/imc/blobhandle.py index 40ff523..683026e 100755 --- a/src/py/imc/blobhandle.py +++ b/src/py/imc/blobhandle.py @@ -1,6 +1,7 @@ #! /usr/bin/env python from abc import abstractmethod +import os class BlobHandle: READ = 0x1 @@ -8,36 +9,28 @@ class BlobHandle: CREATE = 0x4 DELETE = 0x8 WRITEMETA = 0x10 - def __init__(self, name, info, flag, blobclient): - self._name = name + def __init__(self, info, flag, blobclient): self._info = info self._flag = flag self._blobclient = blobclient self._location = self._blobclient._location self._is_closed = False self._deltag = False - self._written = False self._createtag = False - self._need_commit = False self._tmpfile = None - self._blobpath = ''.join([self._location, self._name, - '_', str(self.get_rev())]) + if info['rev']: + self._blobpath = os.path.join(self._location, self._info['rev']) + else: + self._blobpath = None if flag & BlobHandle.CREATE: - if not flag & BlobHandle.WRITE: - raise ValueError("invalid flag") - else: - self._need_commit = True - self._createtag = True - self._written = True + self._createtag = True if flag & BlobHandle.WRITE: self._tmpfile = self.gen_tmp() def __del__(self): - self.del_tmp() - self._blobclient.close(self) - - def create(self): - self._create(self.location + self._name) + if self._flag & BlobHandle.WRITE: + self.del_tmp() + self.close() def read(self, length, offset): if self._is_closed: @@ -51,9 +44,7 @@ class BlobHandle: raise Exception("This Blob is closed") if not self._flag & BlobHandle.WRITE: raise Exception("Permission Denied") - self._need_commit = True written_bytes = self._write(data, offset) - self._written = bool(written_bytes) self._info['size'] = self._get_size() return written_bytes @@ -62,20 +53,18 @@ class BlobHandle: raise Exception("This Blob is closed") if not self._flag & BlobHandle.DELETE: raise Exception("Permission Denied") - self._need_commit = True + self._info['name'] = newname def delete(self, deltag=True): if self._is_closed: raise Exception("This Blob is closed") if not self._flag & BlobHandle.DELETE: raise Exception("Permission Denied") - self._need_commit = True self._deltag = deltag def close(self): self._is_closed = True - if self._flag != BlobHandle.READ: - self._blobclient.close(self) + self._blobclient.close(self._info['rev']) def get_metadata(self): if self._is_closed: @@ -86,9 +75,8 @@ class BlobHandle: if self._is_closed: raise Exception("This Blob is closed") if not self._flag & BlobHandle.WRITEMETA: - raise Exception("Permission Deniedd") + raise Exception("Permission Denied") self._info['metadata'] = metadata - self._need_commit = True def get_rev(self): if self._is_closed: @@ -105,29 +93,29 @@ class BlobHandle: raise Exception("This Blob is closed") return self._info['size'] + def get_sha1(self): + if self._is_closed: + raise Exception("This Blob is closed") + return self._info['sha1'] + def commit(self, flag): if self._is_closed: raise Exception("This Blob is closed") - if not self._need_commit: - return False + self._info['sha1'] = self._sha1(self._tmpfile) commit_info = dict() - commit_info['blobname'] = self._name + commit_info['info'] = self._info + commit_info['target'] = self._tmpfile if self._deltag: commit_info['deltag'] = True + commit_info['createtag'] = False else: commit_info['deltag'] = False commit_info['createtag'] = self._createtag - commit_info['info'] = self._info - commit_info['written'] = self._written return self._blobclient.commit(commit_info, flag, self) - def copy_tmp(self): - BlobHandle.copy_file( - self._tmpfile, - ''.join([self._location, self._info['container'], '_', - self._name, '_', str(self._info['rev'])]) - ) - pass + def copy_tmp(self, rev): + path = os.path.join(self._location, rev) + return self.copy_file(self._tmpfile, path) @abstractmethod def gen_tmp(self): @@ -152,7 +140,15 @@ class BlobHandle: @staticmethod @abstractmethod + def _sha1(file): + # calculate the sha1 of file + # return string + pass + + @staticmethod + @abstractmethod def copy_file(source, dest): + # return Boolean pass @staticmethod @@ -160,4 +156,8 @@ class BlobHandle: def del_blob(blobpath): pass - + @staticmethod + @abstractmethod + def file_exists(path): + # return Boolean + pass diff --git a/src/py/imc/blobserver.py b/src/py/imc/blobserver.py index b876833..1ec7de8 100755 --- a/src/py/imc/blobserver.py +++ b/src/py/imc/blobserver.py @@ -1,17 +1,16 @@ #! /usr/bin/env python import os +import uuid +from collections import Counter from imc.auth import Auth import imc.async from imc.proxy import Proxy -from collections import Counter - class BlobServer: def __init__(self, proxy, auth, idendesc, link, location, blobtable, BlobHandle): - self._proxy = proxy self._auth = auth self._idendesc = idendesc @@ -19,7 +18,9 @@ class BlobServer: self._location = location self._blobtable = blobtable self.BlobHandle = BlobHandle - self._clients = {} + self._clients = set() + self._opencounts = Counter() + self._deltags = set() self._containers = dict.fromkeys(self._blobtable.get_container_list(), dict()) self._proxy.register_call('blobserver/', 'connect_client', @@ -30,55 +31,55 @@ class BlobServer: self.close_container) self._proxy.register_call('blobserver/', 'check_blob', self.check_blob) - self._proxy.register_call('blobserver/', 'recv_update_result', - self.recv_update_result) self._proxy.register_call('blobserver/', 'recv_commit', self.recv_commit) + self.clean() +##################debug code########################## + print('blobserver: init') + self.show_status() + + def show_status(self): + print('blobserver: containers =>\n', self._containers) + print('blobserver: blobs =>\n', self._blobtable.get_blob_list()) +###################################################### + def __del__(self): self._proxy.unregister_call('blobserver/', 'connect_client') self._proxy.unregister_call('blobserver/', 'open_container') self._proxy.unregister_call('blobserver/', 'close_container') self._proxy.unregister_call('blobserver/', 'check_blob') - self._proxy.unregister_call('blobserver/', 'recv_update_result') self._proxy.unregister_call('blobserver/', 'recv_commit') - def _client_call(self, client, func, timeout=10000, *args): + def _client_call(self, client, func, *args, timeout=10000): client += 'blobclient/' with Auth.change_current_iden(self._idendesc): - for i in range(5): - sta, ret = self._proxy.call(client, func, timeout, *args) - if sta or (not sta and ret == 'Enoexist'): - break + sta, ret = self._proxy.call(client, func, timeout, *args) + if not sta: + self.disconnect_client(client) return (sta, ret) def _client_call_async(self, client, func, callback, - timeout=10000, *args, **kwargs): + *args, timeout=10000): client += 'blobclient/' with Auth.change_current_iden(self._idendesc): - for i in range(5): - sta, ret = self._proxy.call_async(client, func, timeout, - callback, *args) - if sta or (not sta and ret == 'Enoexist'): - break - return (sta, ret) + self._proxy.call_async(client, func, timeout, callback, *args) @imc.async.caller - def connect_client(self, client, cache_list): + def connect_client(self, client): if client not in self._clients: - self._clients.update({client: cache_list}) - else: - self._clients[client] = cache_list + self._clients.add(client) def disconnect_client(self, client): try: - self._clients.pop[client] - except ValueError: - raise Exception("this client doesn't exist") + self._clients.remove(client) + except KeyError: + print("client", client, "doesn't exist") def create_container(self, container): - self._blobtable.create_container(container) - self._containers[container] = dict() + if container not in self._containers: + self._blobtable.create_container(container) + self._containers[container] = dict() def del_container(self, container): try: @@ -101,163 +102,175 @@ class BlobServer: try: self._containers[container].pop(client) except KeyError: - raise - - def update_blob(self, blobname, info): - self._blobtable.update_blob(blobname, info) - - def del_blob(self, blobname): - rev = self._blobtable.get_blob_info(blobname, 'rev') - blobname_rev = ''.join([blobname, '_', str(rev)]) - self._blobtable.del_blob(blobname) - self.del_real_blob(blobname_rev) - - def del_real_blob(self, blobname_rev): - blobpath = self._location + blobname_rev - self.BlobHandle.del_blob(blobpath) - - def send_blob(self, client, blobname): - rev = str(self._blobtable.get_blob_info(blobname, 'rev')) - blobpath = os.path.join(self._location, blobname + '_' + rev) - return self._proxy.sendfile(client, blobpath) + return False + else: + return True - def recv_blob(self, filekey, blobname, rev): - blobpath = os.path.join(self._location, blobname + '_' + str(rev)) - ret = self._proxy.recvfile(filekey, blobpath) + def clean(self): + del_list = list(self._deltags) + for rev in del_list: + self.del_real_blob(rev) + del_list = [] + for blob, rev in (self._blobtable.get_blob_list().items()): + if not self.blob_exists(rev): + del_list.append(blob) + for container, name in del_list: + self.del_blob(container, name) - return ret @imc.async.caller - def check_blob(self, client, blobname, cacherev): - rev = self._blobtable.get_blob_info(blobname, 'rev') - if rev is None: - return 'no_exist' - elif cacherev < rev: - result = self.send_blob(client, blobname) - response = {'filekey': result.filekey, - 'info': self._blobtable.get_blob_info(blobname)} - return response + def check_blob(self, client, container, name, cacherev, cachesha1): + info = self._blobtable.get_blob_info(container, name) + if info is None: + return ('no_exist', None) + elif cacherev != info['rev']: + if cachesha1 != info['sha1']: + result = self.send_blob(container, name, client) + return (result.filekey, info) + else: + return (None, info) else: - return 'up_to_date' + return ('up_to_date', None) - @imc.async.caller - def recv_update_result(self, client, blobname, result, - cacherev, retry=False): - if client not in self._clients: - return None - else: - if result == 'Success': - self._clients[client].append({blobname: cacherev}) - return 'Success' - elif retry: - result = self.send_blob(client, blobname) - response = {'filekey': result.filekey, - 'info': self._blobtable.get_blob_info(blobname)} - return response - else: - return 'Finish' + def get_update_list(self, container): + clients = set() + for client, method in (self._containers[container].items()): + if method == "ACTIVE": + clients.add(client) + return clients - def send_update(self, clients, blobname, info, written): - result_table = dict.fromkeys(clients) + def send_update(self, client, container, name): def recv_result(result): - nonlocal result_table - nonlocal blobname - nonlocal info - sta, client = result + nonlocal client + nonlocal container + nonlocal name + sta, ret = result # TODO: # limit retry if not sta: - self._client_call_async(client, 'get_update', - recv_result, - blobname, info, - result_table[client].filekey) - else: - if result_table[client] is None: - result_table.pop(client) - elif result_table[client].wait() != 'Success': - result_table[client] = self.send_blob(client, blobname) - self._client_call_async(client, 'get_update', - recv_result, - blobname, info, - result_table[client].filekey) - else: - result_table.pop(client) + # self._client_call_async(client, 'get_update', + # recv_result, container, name) + self.disconnect_client(client) + self._containers[container].pop(client) - for client in clients: - if written: - result_table[client] = self.send_blob(client, blobname) - else: - result_table[client] = None - sta, ret = self._client_call(client, 'get_update', - recv_result, - blobname, info, - result_table[client].filekey) - if not sta: - # TODO: - pass + self._client_call_async(client, 'get_update', + recv_result, container, name) @imc.async.caller - def recv_commit(self, client, commit_info, force_flag, filekey=None): - blobname = commit_info['blobname'] - info = commit_info['info'] - rev = self._blobtable.get_blob_info(blobname, 'rev') - if rev is None: - if commit_info['createtag']: - rev = 0 - else: + def recv_commit(self, client, commit_info, force_flag): + clientinfo = commit_info['info'] + container = clientinfo['container'] + name = clientinfo['name'] + info = self._blobtable.get_blob_info(container, name) + if info is None: + if not commit_info['createtag']: +##################debug code########################## + print("blob doesn't exist, please add createtag") +##################debug code########################## return False - elif info['rev'] < rev and not force_flag: + elif info['rev'] != clientinfo['rev'] and not force_flag: +##################debug code########################## + print("revision conflict") +##################debug code########################## return False if commit_info['deltag']: - self.del_blob(blobname) - clients = set() - for needed_client, method in ( - self._containers[info['container']].items() - ): - if method == "ACTIVE": - clients.add(needed_client) - clients.discard(client) - self.send_update(clients, blobname, None, False) - result = True + clientinfo['rev'] = None + self.del_blob(container, name) + return clientinfo else: - info['rev'] = rev + 1 - if commit_info['written']: - status = self.recv_blob(filekey, blobname, rev + 1) + clientinfo['rev'] = str(uuid.uuid4()) + if not info or info['sha1'] != clientinfo['sha1']: + filekey = self.request_blob(client, commit_info['target']) + if (filekey == 'Efailtosend' or + filekey == 'Edisconnected'): + return False + else: + filekey = None + if filekey: + status = self.recv_blob(filekey, clientinfo['rev']) result = status.wait() - if rev: - self.del_real_blob(''.join([blobname, '_', str(rev)])) else: - result = True - if result: - self.update_blob(blobname, info) - clients = set() - for needed_client, method in ( - self._containers[info['container']].items()): - if method == "ACTIVE": - clients.add(needed_client) - clients.discard(client) - self.send_update(clients, blobname, - info, commit_info['written']) + if info: + result = self.copy_blob(info['rev'], clientinfo['rev']) + else: + result = False + if result: + if clientinfo['rev']: + self.update_blob(clientinfo) + if info: + self.del_real_blob(info['rev']) + clients = self.get_update_list(container) + clients.discard(client) + for cli in clients: + self.send_update(cli, container, name) + return clientinfo + else: + return False - return True + # @imc.async.caller + # def get_request(self, client, container, name): + # result = self.send_blob(client, container, name) + # if result: + # return result.filekey + # else: + # return None + + def request_blob(self, client, target): + sta, ret = self._client_call(client, 'get_request', target) + if not sta: + return 'Edisconnected' else: - return False - - - -################### Testing Code ####################### -''' -if __name__ == '__main__': - global blob_serv - - blob_serv = BlobServer() - blob_serv.listen(5730) + return ret - #http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([ - # ('/conn',WebConnHandler), - #])) - #http_serv.listen(83) - - tornado.ioloop.IOLoop.instance().start() -''' + def send_blob(self, container, name, client): + rev = self._blobtable.get_blob_info(container, name, 'rev') + if not rev: + return False + else: + def send_finish(result): + nonlocal rev + if self._opencounts[rev] > 0: + self._opencounts[rev] -= 1 + + blobpath = os.path.join(self._location, rev) + try: + ret = self._proxy.sendfile(client, blobpath) + except ConnectionError: + return False + else: + self._opencounts[rev] += 1 + ret.wait_async(send_finish) + return ret + + def recv_blob(self, filekey, filename): + blobpath = os.path.join(self._location, filename) + return self._proxy.recvfile(filekey, blobpath) + + def update_blob(self, info): + self._blobtable.update_blob(info) + + def del_blob(self, container, name): + rev = self._blobtable.get_blob_info(container, name, 'rev') + self._blobtable.del_blob(container, name) + if rev: + self.del_real_blob(rev) + + def del_real_blob(self, rev): + self._deltags.add(rev) + if self._opencounts[rev] == 0: + self._deltags.remove(rev) + path = os.path.join(self._location, rev) + self.BlobHandle.del_blob(path) + + def copy_blob(self, rev, newrev): + path = os.path.join(self._location, rev) + newpath = os.path.join(self._location, newrev) + return self.BlobHandle.copy_file(path, newpath) + + def vertify_blob(self, rev, sha1): + blobpath = os.path.join(self._location, rev) + return self.BlobHandle._sha1(blobpath) == sha1 + + def blob_exists(self, rev): + return self.BlobHandle.file_exists(os.path.join(self._location, rev)) diff --git a/src/py/imc/blobtable.py b/src/py/imc/blobtable.py index 34e2d01..e6cd7cd 100755 --- a/src/py/imc/blobtable.py +++ b/src/py/imc/blobtable.py @@ -18,7 +18,7 @@ class BlobTable: # client # server @abstractmethod - def get_blob_info(self, blobname, attr=None): + def get_blob_info(self, container, name, attr=None): # if the blobname doesn't exist, return None if attr is None: # return blob info @@ -46,19 +46,23 @@ class BlobTable: # client # server @abstractmethod - def update_blob(self, blobname, info): + def update_blob(self, info): pass # client # server @abstractmethod - def del_blob(self, blobname): + def del_blob(self, container, name): + # return the info + # if the blob doesn't exist, return None pass """ info: - rev (int) container (str) + name (str) + rev (str) + sha1 (str) metadata (str) size (???) commit_time (???) diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 5846bd2..960c82a 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -37,7 +37,7 @@ class Connection: def start_recv(self,recv_callback): pass - def abort_file(self,filekey): + def abort_file(self,filekey,err): pass def add_close_callback(self,callback): @@ -54,10 +54,12 @@ class Connection: class FileResult(): def __init__(self,filekey): - self.filekey = filekey + self._ioloop = tornado.ioloop.IOLoop.instance() self._retid = None self._result = None + self.filekey = filekey + def ret_result(self,res): if self._result != None: return @@ -73,6 +75,16 @@ class FileResult(): return self._result + def wait_async(self,callback): + @async.caller + def _call(): + result = self.wait() + + if callback != None: + callback(result) + + self._ioloop.add_callback(tornado.stack_context.wrap(_call)) + class Proxy: def __init__(self,link,auth,idendesc,conn_link_fn = lambda link : None): self.MSGTYPE_CALL = 'call' @@ -124,7 +136,8 @@ class Proxy: def del_conn(self,conn): waits = list(self._conn_retidmap[conn.link].values()) for wait in waits: - wait['callback']((False,'Eclose')) + self._ret_call(wait['caller_link'],wait['caller_retid'], + (False,'Eclose')) waits = list(self._conn_filekeymap[conn.link].values()) for wait in waits: @@ -175,10 +188,11 @@ class Proxy: self._ioloop.add_callback(tornado.stack_context.wrap(_call)) def sendfile(self,dst_link,filepath): - def _abort_cb(): - if self._ret_sendfile(filekey,'Eabort'): + def _callback(err): + if self._ret_sendfile(filekey,err) and err != None: with Auth.change_current_iden(self._idendesc,self._auth): - self.call(dst_link + 'imc/','abort_sendfile',65536,filekey) + self.call(dst_link + 'imc/','abort_sendfile',65536, + filekey,err) filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() filesize = os.stat(filepath).st_size @@ -189,8 +203,9 @@ class Proxy: 'filesize':filesize, 'filepath':filepath, 'fileresult':fileresult, - 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')), - 'abort_callback':tornado.stack_context.wrap(_abort_cb) + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1), + lambda : self._ret_sendfile(filekey,'Etimeout')), + 'callback':tornado.stack_context.wrap(_callback) } with Auth.change_current_iden(self._idendesc,self._auth): @@ -211,7 +226,7 @@ class Proxy: if err != None: if not in_conn.closed(): - in_conn.abort_file(filekey) + in_conn.abort_file(filekey,err) self._send_msg_abortfile(in_conn,filekey,err) self._ioloop.add_callback(self._ret_sendfile,filekey,err) @@ -224,7 +239,7 @@ class Proxy: in_conn = self._request_conn(src_link) if filekey in self._info_filekeymap: - info['abort_callback'] = tornado.stack_context.wrap(lambda : _callback('Eabort')) + info['callback'] = tornado.stack_context.wrap(_callback) self._add_wait_filekey(in_conn.link,filekey,filesize,_callback) in_conn.recv_file(filekey,filesize,filepath,_callback) @@ -234,7 +249,7 @@ class Proxy: def abortfile(self,filekey): try: - self._info_filekeymap[filekey]['abort_callback']() + self._info_filekeymap[filekey]['callback']('Eabort') except: pass @@ -274,10 +289,13 @@ class Proxy: def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param): def __add_wait_caller(conn_link): - callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_link,caller_retid,result)) self._conn_retidmap[conn_link][caller_retid] = { - 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))), - 'callback':callback + 'timer':self._ioloop.add_timeout(datetime.timedelta( + milliseconds = timeout), + lambda : self._ret_call(caller_link,caller_retid, + (False,'Etimeout'))), + 'caller_link':caller_link, + 'caller_retid':caller_retid } def __del_wait_caller(conn_link): @@ -333,8 +351,8 @@ class Proxy: except KeyError: return __ret((False,'Enoexist')) - #except Exception: - # return __ret((False,'Einternal')) + except Exception: + return __ret((False,'Einternal')) if Auth.get_current_idendesc() == idendesc: result = func(*param) @@ -355,7 +373,9 @@ class Proxy: else: if caller_link == self._link: __add_wait_caller(conn.link) - self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param) + + self._send_msg_call(conn,caller_link,caller_retid,idendesc, + dst,func_name,timeout,param) result = async.switch_top() @@ -364,7 +384,8 @@ class Proxy: return __ret(result) else: - self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param) + self._send_msg_call(conn,caller_link,caller_retid,idendesc, + dst,func_name,timeout,param) return @@ -391,7 +412,7 @@ class Proxy: if err != None: if not out_conn.closed(): - out_conn.abort_file(filekey) + out_conn.abort_file(filekey,err) self._send_msg_abortfile(out_conn,filekey,err) self._ioloop.add_callback(self._ret_sendfile,filekey,err) @@ -402,7 +423,7 @@ class Proxy: if err != None: if not in_conn.closed(): - in_conn.abort_file(filekey) + in_conn.abort_file(filekey,err) self._send_msg_abortfile(in_conn,filekey,err) except KeyError: @@ -413,7 +434,7 @@ class Proxy: if err != None: if not out_conn.closed(): - out_conn.abort_file(filekey) + out_conn.abort_file(filekey,err) self._send_msg_abortfile(out_conn,filekey,err) except KeyError: @@ -425,13 +446,19 @@ class Proxy: if info['filesize'] != filesize: raise ValueError - except (KeyError,ValueError): + except KeyError: + self._send_msg_abortfile(out_conn,filekey,'Enoexist') + + return + + except ValueError: self._send_msg_abortfile(out_conn,filekey,'Enoexist') self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') + return - info['abort_callback'] = tornado.stack_context.wrap(lambda : __send_cb('Eabort')) - self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb) + info['callback'] = tornado.stack_context.wrap(__send_cb) + self._add_wait_filekey(out_conn.link,filekey,__send_cb) out_conn.send_file(filekey,info['filepath'],__send_cb) else: @@ -440,8 +467,10 @@ class Proxy: self._send_msg_abortfile(out_conn,filekey,'Enoexist') else: - self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb) - self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb) + self._add_wait_filekey(in_conn.link,filekey,filesize, + __bridge_cb) + self._add_wait_filekey(out_conn.link,filekey,filesize, + __bridge_cb) send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb) in_conn.recv_filedata(filekey,filesize,send_fn) @@ -450,8 +479,11 @@ class Proxy: def _add_wait_filekey(self,conn_link,filekey,filesize,callback): callback = tornado.stack_context.wrap(callback) + self._conn_filekeymap[conn_link][filekey] = { - 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = max(filesize,10000)),lambda : callback('Etimeout')), + 'timer':self._ioloop.add_timeout( + datetime.timedelta(milliseconds = max(filesize,10000)), + lambda : callback('Etimeout')), 'callback':callback } @@ -607,20 +639,26 @@ class Proxy: @async.caller def _pend_recvfile(self,src_link,filekey,filesize): - def __abort_cb(): - if self._ret_sendfile(filekey,'Eabort'): + def __abort_cb(err): + if self._ret_sendfile(filekey,err): with Auth.change_current_iden(self._idendesc,self._auth): - self.call(src_link + 'imc/','abort_sendfile',65536,filekey) + self.call(src_link + 'imc/','abort_sendfile',65536, + filekey,err) self._info_filekeymap[filekey] = { 'src_link':src_link, 'filesize':filesize, 'fileresult':FileResult(filekey), - 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')), - 'abort_callback':tornado.stack_context.wrap(__abort_cb) + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1), + lambda : self._ret_sendfile(filekey,'Etimeout')), + 'callback':tornado.stack_context.wrap(__abort_cb) } + + return 'Success' @async.caller - def _abort_sendfile(self,filekey): + def _abort_sendfile(self,filekey,err): if filekey in self._info_filekeymap: - self._ioloop.add_callback(self._ret_sendfile,filekey,'Eabort') + self._ioloop.add_callback(self._ret_sendfile,filekey,err) + + return 'Success' diff --git a/src/py/mod.py b/src/py/mod.py index af24577..73c3e1e 100644 --- a/src/py/mod.py +++ b/src/py/mod.py @@ -26,7 +26,7 @@ def unload(name): del mod_list[name] def load_sqmod(sqmodname): - instance = import_module(''.join(['/srv/py/sqmod/',sqmodname,'/py/',sqmodname])) + instance = import_module(''.join(['/srv/http/toj/sqmod/',sqmodname,'/py/',sqmodname])) return getattr(instance,sqmodname) diff --git a/src/py/netio.py b/src/py/netio.py index b896452..dac8273 100755 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -1,5 +1,5 @@ import os -import traceback +import math import json import struct import socket @@ -26,6 +26,7 @@ class SocketStream: self.DATA_BUF = 0 self.DATA_NOBUF = 1 self.DATA_FILE = 2 + self.STREAMTYPE = 'socket' self._ioloop = tornado.ioloop.IOLoop.current() self._sock = sock @@ -294,20 +295,27 @@ class SocketStream: self._ioloop.update_handler(fd,stat) class SocketConnection(Connection): - def __init__(self,link,main_stream,file_addr,add_pend_filestream_fn = None): + def __init__(self, + link, + main_stream, + file_addr, + pend_filestream_fn = None, + del_pend_filestream_fn = None): + super().__init__(link) self._ioloop = tornado.ioloop.IOLoop.current() self._sendfile_filekeymap = {} self.main_stream = main_stream - self.main_stream.set_close_callback(lambda conn : self.close()) + self.main_stream.set_close_callback(lambda stream : self.close()) self.file_addr = file_addr - self.add_pend_filestream = add_pend_filestream_fn + self.pend_filestream = pend_filestream_fn + self.del_pend_filestream = del_pend_filestream_fn def _check_close(f): def wrap(self,*args): - if self._closed == True: + if self.closed(): raise ConnectionError return f(self,*args) @@ -343,7 +351,13 @@ class SocketConnection(Connection): callback(err) - fd = os.open(filepath,os.O_RDONLY) + try: + fd = os.open(filepath,os.O_RDONLY) + + except FileNotFoundError: + callback('Eabort') + return + filesize = os.fstat(fd).st_size file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) @@ -357,7 +371,6 @@ class SocketConnection(Connection): file_stream = stream file_stream.set_close_callback(lambda stream : _callback('Eabort')) - self._add_wait_filekey(filekey,_callback) file_stream.recvfile(fd,filesize,_callback) @@ -368,8 +381,13 @@ class SocketConnection(Connection): except KeyError: return - file_stream.set_close_callback(None) - file_stream.close() + if file_stream == None: + self.del_pend_filestream(filekey) + + else: + file_stream.set_close_callback(None) + file_stream.close() + os.close(fd) if err != None: @@ -383,7 +401,8 @@ class SocketConnection(Connection): file_stream = None - self.add_pend_filestream(filekey,_conn_cb) + self._add_wait_filekey(filekey,_callback) + self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb) fd = os.open(filepath,os.O_WRONLY | os.O_CREAT) @_check_close @@ -436,7 +455,6 @@ class SocketConnection(Connection): file_stream = stream file_stream.set_close_callback(lambda stream : _callback('Eabort')) - self._add_wait_filekey(filekey,_callback) file_stream.read_bytes(filesize,send_fn,nonbuf = True) @@ -446,18 +464,23 @@ class SocketConnection(Connection): except KeyError: return + + if file_stream == None: + self.del_pend_filestream(filekey) - file_stream.set_close_callback(None) - file_stream.close() + else: + file_stream.set_close_callback(None) + file_stream.close() file_stream = None - self.add_pend_filestream(filekey,_conn_cb) + self._add_wait_filekey(filekey,_callback) + self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb) @_check_close - def abort_file(self,filekey): + def abort_file(self,filekey,err): try: - self._sendfile_filekeymap[filekey]('Eabort') + self._sendfile_filekeymap[filekey](err) except KeyError: pass @@ -476,42 +499,216 @@ class SocketConnection(Connection): self.main_stream.read_bytes(8,_recv_size) def close(self): - if self._closed == True: + if self.closed(): return - self._closed = True + super().close() self.main_stream.close() callbacks = list(self._sendfile_filekeymap.values()) for callback in callbacks: callback('Eabort') - super().close() - def _add_wait_filekey(self,filekey,fail_cb): self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) def _del_wait_filekey(self,filekey): del self._sendfile_filekeymap[filekey] +class WebSocketStream: + def __init__(self,handler): + self._closed = False + self._handler = handler + self._recv_callback = None + self._close_callback = None + + def _check_close(f): + def wrap(self,*args): + if self._closed == True: + raise ConnectionError + + return f(self,*args) + + return wrap + + def set_close_callback(self,callback): + if callback == None: + self._close_callback = None + + else: + self._close_callback = tornado.stack_context.wrap(callback) + + def close(self): + if self._closed == True: + return + + self._closed = True + try: + self._handler.close() + + except Exception: + pass + + if self._close_callback != None: + self._close_callback(self) + + @_check_close + def set_recv_callback(self,callback): + self._recv_callback = tornado.stack_context.wrap(callback) + + @_check_close + def send_msg(self,data): + self._handler.write_message(data,True) + + @_check_close + def recv_msg(self,data): + if self._recv_callback != None: + self._recv_callback(data) + + @_check_close + def recv_file(self,fd,filesize,callback = None): + def _recv_info(data): + nonlocal off + nonlocal partsize + + info = json.loads(data.decode('utf-8')) + off = info['off'] + + partsize = max(0,min(math.ceil(filesize / 4),filesize - off)) + self.set_recv_callback(_recv_cb) + self.send_msg('Success') + + def _recv_cb(data): + nonlocal count + + if count + len(data) > partsize: + if callback != None: + callback('Eillegal') + + return + + self.send_msg('Success') + + os.pwrite(fd,data,off + count) + count += len(data) + + if count == partsize: + if callback != None: + callback() + + count = 0 + off = 0 + partsize = 0 + + self.set_recv_callback(_recv_info) + class WebSocketConnection(Connection): - def __init__(self,link,handler): + def __init__(self, + link, + main_stream, + pend_filestream_fn = None, + del_pend_filestream_fn = None): + self.STREAMTYPE = 'websocket' + super().__init__(link) self._ioloop = tornado.ioloop.IOLoop.current() - self.handler = handler + self._sendfile_filekeymap = {} - def send_msg(self,data): - if self._closed == True: - raise ConnectionError + self.main_stream = main_stream + self.main_stream.set_close_callback(lambda stream : self.close()) + self.pend_filestream = pend_filestream_fn + self.del_pend_filestream = del_pend_filestream_fn + + def _check_close(f): + def wrap(self,*args): + if self._closed == True: + raise ConnectionError - self.handler.write_message(data,True) + return f(self,*args) - def recv_msg(self,data): + return wrap + + def close(self): if self._closed == True: - raise ConnectionError + return - self._recv_callback(self,data) + self.main_stream.close() + super().close() + + @_check_close + def send_msg(self,data): + self.main_stream.send_msg(data) + @_check_close def start_recv(self,recv_callback): - self._recv_callback = tornado.stack_context.wrap(recv_callback) + self.main_stream.set_recv_callback( + lambda data : recv_callback(self,data)) + + @_check_close + def recv_file(self,filekey,filesize,filepath,callback): + def _conn_cb(stream): + nonlocal file_streams + + file_streams.append(stream) + stream.recv_file(fd,filesize,_stream_cb) + stream.set_close_callback(lambda stream : _stream_cb('Eabort')) + + def _stream_cb(err = None): + nonlocal count + + count += 1 + + if err != None: + _callback(err) + + if count == 4: + _callback() + + def _callback(err = None): + try: + self._del_wait_filekey(filekey) + + except KeyError: + return + + self.del_pend_filestream(filekey) + + for stream in file_streams: + stream.set_close_callback(None) + stream.close() + + os.close(fd) + + if err != None: + try: + os.remove(filepath) + + except FileNotFoundError: + pass + + callback(err) + + file_streams = [] + count = 0 + + self._add_wait_filekey(filekey,_callback) + self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb,4) + + fd = os.open(filepath,os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.ftruncate(fd,filesize); + + @_check_close + def abort_file(self,filekey,err): + try: + self._sendfile_filekeymap[filekey](err) + + except KeyError: + pass + + def _add_wait_filekey(self,filekey,fail_cb): + self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb) + + def _del_wait_filekey(self,filekey): + del self._sendfile_filekeymap[filekey] + diff --git a/src/py/spup.c b/src/py/spup.c new file mode 100644 index 0000000..749112f --- /dev/null +++ b/src/py/spup.c @@ -0,0 +1,268 @@ +#include<unistd.h> +#include<malloc.h> +#include<pthread.h> +#include<semaphore.h> +#include<sys/syscall.h> +#include<sys/types.h> +#include<sys/eventfd.h> +#include<linux/aio_abi.h> + +#include<Python.h> + +#define AIO_MAX_EVENT 4096 +#define AIO_MAX_GETEVENT 64 + +struct aio_pack{ + struct aio_pack *next; + + int id; + struct iocb *iocb; + char *data; + char *buf; +}; + +static int io_setup(unsigned int nr_events,aio_context_t *ctx_idp); +static int io_submit(aio_context_t ctx_id,long nr,struct iocb **iocbpp); +static int io_destroy(aio_context_t ctx_id); +static PyObject* spup_apply_mask(PyObject *self,PyObject *args); +static PyObject* spup_aio_get_evtfd(PyObject *self,PyObject *args); +static PyObject* spup_aio_read(PyObject *self,PyObject *args); +static PyObject* spup_aio_write(PyObject *self,PyObject *args); +static PyObject* spup_aio_getevents(PyObject *self,PyObject *args); +static void* spup_aio_handler(void *arg); + +static PyMethodDef method[] = { + {"apply_mask",spup_apply_mask,METH_VARARGS,"apply_mask"}, + + {"aio_get_evtfd",spup_aio_get_evtfd,METH_NOARGS,"aio_get_evtfd"}, + {"aio_read",spup_aio_read,METH_VARARGS,"aio_read"}, + {"aio_write",spup_aio_write,METH_VARARGS,"aio_write"}, + {"aio_getevents",spup_aio_getevents,METH_VARARGS,"aio_getevents"}, + {NULL,NULL,0,NULL} +}; +static struct PyModuleDef module = { + PyModuleDef_HEAD_INIT, + "spup", + NULL, + -1, + method +}; +static PyObject *spup_error; +static aio_context_t spup_aio_ctx; +static int spup_aio_evtfd; +static int spup_aio_id; +static pthread_t spup_aio_thread; +static struct aio_pack *spup_aio_queue[AIO_MAX_EVENT]; +static int spup_aio_head; +static int spup_aio_tail; +static sem_t spup_aio_sem; + +PyMODINIT_FUNC PyInit_spup(void){ + PyObject *m; + + m = PyModule_Create(&module); + if(m == NULL){ + return NULL; + } + + spup_error = PyErr_NewException("spup.error",NULL,NULL); + Py_INCREF(spup_error); + PyModule_AddObject(m,"error",spup_error); + + memset(&spup_aio_ctx,0,sizeof(spup_aio_ctx)); + io_setup(AIO_MAX_EVENT,&spup_aio_ctx); + spup_aio_evtfd = eventfd(0,0); + spup_aio_id = 0; + pthread_create(&spup_aio_thread,NULL,spup_aio_handler,NULL); + spup_aio_head = 0; + spup_aio_tail = 0; + sem_init(&spup_aio_sem,0,0); + + return m; +} + +static int io_setup(unsigned int nr_events,aio_context_t *ctx_idp){ + return syscall(SYS_io_setup,nr_events,ctx_idp); +} +static int io_submit(aio_context_t ctx_id,long nr,struct iocb **iocbpp){ + return syscall(SYS_io_submit,ctx_id,nr,iocbpp); +} +static int io_getevents(aio_context_t ctx_id, + long min_nr, + long nr, + struct io_event *events, + struct timespec *timeout){ + + return syscall(SYS_io_getevents,ctx_id,min_nr,nr,events,timeout); +} +static int io_destroy(aio_context_t ctx_id){ + return syscall(SYS_io_destroy,ctx_id); +} + +static PyObject* spup_apply_mask(PyObject *self,PyObject *args){ + int i; + int j; + + char *mask; + int mask_len; + char *data; + int data_len; + unsigned int mask_val; + char *tmp; + PyObject *ret; + + PyArg_ParseTuple(args,"y#y#",&mask,&mask_len,&data,&data_len); + if(mask_len < 4){ + Py_INCREF(Py_None); + return Py_None; + } + + tmp = malloc(data_len); + if(tmp == NULL){ + Py_INCREF(Py_None); + return Py_None; + } + + mask_val = *(unsigned int*)mask; + for(i = 0;(i + 4) <= data_len;i += 4){ + *((unsigned int*)(tmp + i)) = *((unsigned int*)(data + i)) ^ mask_val; + } + for(j = 0;i < data_len;i++,j++){ + tmp[i] = data[i] ^ mask[j]; + } + + ret = PyBytes_FromStringAndSize(tmp,data_len); + free(tmp); + + return ret; +} + +static PyObject* spup_aio_get_evtfd(PyObject *self,PyObject *args){ + return PyLong_FromLong(spup_aio_evtfd); +} +static PyObject* spup_aio_read(PyObject *self,PyObject *args){ + Py_INCREF(Py_None); + return Py_None; +} +static PyObject* spup_aio_write(PyObject *self,PyObject *args){ + int fd; + char *data; + int data_len; + Py_ssize_t len; + unsigned long off; + + struct aio_pack *pack; + struct iocb *iocb; + char *buf; + + PyArg_ParseTuple(args,"iy#nk",&fd,&data,&data_len,&len,&off); + if(len > data_len){ + Py_INCREF(Py_None); + return Py_None; + } + + pack = malloc(sizeof(*pack)); + iocb = malloc(sizeof(*iocb)); + buf = memalign(512,len); + if(pack == NULL || iocb == NULL || buf == NULL){ + if(pack != NULL){ + free(pack); + } + if(iocb != NULL){ + free(iocb); + } + if(buf != NULL){ + free(buf); + } + + Py_INCREF(Py_None); + return Py_None; + } + + //memcpy(buf,data,len); + + Py_INCREF(Py_None); + return Py_None; + + spup_aio_id += 1; + pack->id = spup_aio_id; + pack->iocb = iocb; + pack->buf = buf; + + memset(iocb,0,sizeof(*iocb)); + iocb->aio_lio_opcode = IOCB_CMD_PWRITE; + iocb->aio_reqprio = 0; + iocb->aio_fildes = fd; + iocb->aio_buf = (unsigned long)buf; + iocb->aio_nbytes = len; + iocb->aio_offset = off; + iocb->aio_flags = IOCB_FLAG_RESFD; + iocb->aio_resfd = spup_aio_evtfd; + iocb->aio_data = (unsigned long)pack; + + spup_aio_queue[spup_aio_tail] = pack; + spup_aio_tail = (spup_aio_tail + 1) % AIO_MAX_EVENT; + sem_post(&spup_aio_sem); + + return PyLong_FromLong(pack->id); +} +static PyObject* spup_aio_getevents(PyObject *self,PyObject *args){ + int i; + + PyObject *ret; + struct io_event *evts; + int nr; + struct aio_pack *pack; + char *buf; + + evts = malloc(sizeof(*evts) * AIO_MAX_GETEVENT); + if(evts == NULL){ + Py_INCREF(Py_None); + return Py_None; + } + + nr = io_getevents(spup_aio_ctx,1,AIO_MAX_GETEVENT,evts,NULL); + for(i = 0;i < nr;i++){ + pack = (struct aio_pack*)evts[i].data; + buf = pack->buf; + + printf(" %lld\n",evts[i].res); + + free(buf); + free(pack); + free((struct iocb*)evts[i].obj); + } + + free(evts); + return ret; +} + +static void* spup_aio_handler(void *arg){ + int count; + struct aio_pack *pack; + struct iocb *iocbs[64]; + + while(1){ + sem_wait(&spup_aio_sem); + + count = 0; + while(1){ + pack = spup_aio_queue[spup_aio_head]; + iocbs[count] = pack->iocb; + + spup_aio_head = (spup_aio_head + 1) % AIO_MAX_EVENT; + count++; + + if(count >= 64){ + break; + } + if(sem_trywait(&spup_aio_sem)){ + break; + } + } + + printf("%d\n",io_submit(spup_aio_ctx,count,iocbs)); + } + + return NULL; +} diff --git a/src/py/square.py b/src/py/square.py index 6ce0d85..677c9cc 100644 --- a/src/py/square.py +++ b/src/py/square.py @@ -35,6 +35,8 @@ class SquareMg: self.get_link = get_link_fn self._sqmod_list = {} + Proxy.instance.register_filter('sq/', self.sqmod_filter) + Proxy.instance.register_call( 'core/square/', 'list_category', self.list_category) Proxy.instance.register_call( @@ -55,6 +57,8 @@ class SquareMg: 'core/square/', 'list_sqmod', self.list_sqmod) def unload(self): + Proxy.instance.unregister_filter('sq/', self.sqmod_filter) + Proxy.instance.unregister_call( 'core/square/', 'list_category') Proxy.instance.unregister_call( @@ -652,6 +656,11 @@ class SquareMg: sqmodname = self.get_sqmodname_by_sqmodid(sqmodid) return sqmodname != None + def sqmod_filter(self, res_path, dst_func): + sqid = int(res_path[0]) + with TOJAuth.change_current_iden(self._idendesc): + self.load_square(sqid) + class Square: def unload(self, Force): pass |