aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-07-29 01:55:58 +0800
committerpzread <netfirewall@gmail.com>2013-07-29 01:55:58 +0800
commite6e1c5fa269cac26fa15ac15c2c24be05f2ac72c (patch)
treeada51b6e8685bfb44f08ba4f02ffa734e91cdb53
parent456b6867161c1f1295993c61b3726ff44a3c809e (diff)
downloadtaiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.tar.gz
taiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.tar.zst
taiwan-online-judge-e6e1c5fa269cac26fa15ac15c2c24be05f2ac72c.zip
Add websocket sendfile
-rw-r--r--src/css/index.less13
-rw-r--r--src/js/com.js143
-rw-r--r--src/js/imc.js220
-rw-r--r--src/pmod/pmod_test/css/manage.less23
-rw-r--r--src/pmod/pmod_test/html/manage.html91
-rw-r--r--src/pmod/pmod_test/js/pmod_test.js140
-rw-r--r--src/pmod/pmod_test/py/pmod_test.py186
-rwxr-xr-xsrc/py/backend_server.py211
-rwxr-xr-xsrc/py/center_server.py15
-rw-r--r--src/py/com.py12
-rw-r--r--src/py/dispatch.py110
-rw-r--r--src/py/imc/async.py4
-rwxr-xr-xsrc/py/imc/blobclient.py287
-rwxr-xr-xsrc/py/imc/blobhandle.py76
-rwxr-xr-xsrc/py/imc/blobserver.py341
-rwxr-xr-xsrc/py/imc/blobtable.py12
-rwxr-xr-xsrc/py/imc/proxy.py106
-rw-r--r--src/py/mod.py2
-rwxr-xr-xsrc/py/netio.py257
-rw-r--r--src/py/spup.c268
-rw-r--r--src/py/square.py9
21 files changed, 1986 insertions, 540 deletions
diff --git a/src/css/index.less b/src/css/index.less
index 4cd496a..04f75eb 100644
--- a/src/css/index.less
+++ b/src/css/index.less
@@ -17,20 +17,18 @@ body{
overflow:hidden;
&.active{
- height:41px;
+ height:33px;
opacity:1;
display:block;
}
}
.index_tag{
- height:32px;
- line-height:32px;
+ height:33px;
+ line-height:33px;
opacity:0.2;
transition:opacity @fast;
&.active,&:hover{
- height:41px;
- line-height:41px;
opacity:1;
}
@@ -84,7 +82,7 @@ body{
div.tagblock{
width:100%;
- height:41px;
+ height:33px;
box-shadow:0px 3px 2px -2px fade(@black,10%);
}
div.menu{
@@ -92,7 +90,6 @@ body{
li > a{
height:48px;
padding-left:32px;
- padding-right:-32px;
font-size:@NormalFontSize;
font-weight:bold;
line-height:48px;
@@ -114,7 +111,7 @@ body{
div.tagblock{
width:100%;
- height:41px;
+ height:33px;
box-shadow:0px 3px 2px -2px fade(@black,10%);
}
div.notice{
diff --git a/src/js/com.js b/src/js/com.js
index 1c8af3c..1a6d732 100644
--- a/src/js/com.js
+++ b/src/js/com.js
@@ -10,14 +10,75 @@ var ACCESS_DELETE = 0x8;
var ACCESS_SETPER = 0x10;
var ACCESS_EXECUTE = 0x20;
-var WebSocketConnection = function(link,ws){
+var WebSocketConnection = function(link,ws,file_addr){
var that = this;
+ var sendfile_filekeymap = {};
that.__super__(link);
that.send_msg = function(data){
ws.send(new Blob([data],{'type':'application/octet-stream'}))
};
+ that.send_file = function(filekey,blob,callback){
+ var i;
+ var file_ws = new Array(4);
+ var filesize = blob.size;
+ var partsize = Math.ceil(filesize / 4);
+ var count = 0;
+
+ function _callback(err){
+ if(!(filekey in sendfile_filekeymap)){
+ return;
+ }
+
+ delete sendfile_filekeymap[filekey];
+
+ for(i = 0;i < 4;i++){
+ if(file_ws[i] != undefined){
+ file_ws[i].close();
+ }
+ }
+
+ callback(err);
+ }
+
+ for(i = 0;i < 4;i++){
+ file_ws[i] = new WebSocket('ws://' + file_addr + '/conn');
+ file_ws[i].onopen = function(idx){return function(){
+ var ws = file_ws[idx];
+ var off = idx * partsize;
+ var end = Math.min(filesize,off + partsize);
+
+ ws.onmessage = function(e){
+ if(off >= end){
+ count += 1;
+ if(count == 4){
+ _callback();
+ }
+ }else{
+ ws.send(blob.slice(off,Math.min(end,off + 524288),
+ 'application/octet-stream'));
+ off += 524288;
+ }
+ };
+
+ console.log('file open ' + off);
+
+ ws.send(JSON.stringify({
+ 'conntype':'file',
+ 'filekey':filekey
+ }));
+
+ ws.send(new Blob([JSON.stringify({'off':off})],
+ {'type':'application/octet-stream'}));
+ }}(i);
+ }
+ };
+ that.abort_file = function(filekey,err){
+ if(filekey in sendfile_filekeymap){
+ sendfile_filekeymap[filekey]('Eabort');
+ }
+ };
that.start_recv = function(recv_callback){
ws.onmessage = function(e){
var reader = new FileReader;
@@ -197,7 +258,7 @@ var com = new function(){
}else{
return false;
}
- }
+ };
$.fn.tagbox = function(option){
var tagbox = this.data('tagbox');
@@ -206,7 +267,16 @@ var com = new function(){
}
return tagbox;
- }
+ };
+ $.fn.codebox = function(option){
+ var codebox = this.data('codebox');
+
+ if(option != undefined){
+ codebox = that.create_codebox(this,option.mode,option.readonly);
+ }
+
+ return codebox;
+ };
};
that.url_push = function(url){
@@ -533,7 +603,7 @@ var com = new function(){
ex($('[exheight="true"]'),'height');
ex($('[exminheight="true"]'),'min-height');
- $('.modal-body').css('max-height',(winheight * 0.5) + 'px');
+ $('.modal-body').css('max-height',(winheight * 0.9 - 192) + 'px');
};
that.get_cookie = function(){
var ret;
@@ -994,6 +1064,41 @@ var com = new function(){
var reto;
var idendesc;
var ws;
+ var addr;
+
+ function x(idx){
+ var tws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn');
+ var offset;
+ var end;
+ var blob;
+
+ tws.onmessage = function(){
+ console.log(offset)
+
+ if(offset < end){
+ console.log(offset);
+ tws.send(blob.slice(offset,Math.min(end,offset + 524288 * 1),'application/octet-stream'));
+ offset += (524288 * 1);
+ }else{
+ console.log(new Date().getTime());
+ }
+ };
+ tws.onopen = function(){
+ tws.send('filestream');
+ };
+ $('#test_fs').on('change',function(e){
+ blob = this.files[0];
+ offset = Math.floor(blob.size / 4) * idx;
+ if(idx != 3){
+ end = offset + Math.floor(blob.size / 4);
+ }else{
+ end = blob.size;
+ }
+
+ console.log(new Date().getTime());
+ tws.send('start');
+ });
+ }
if(res[0] != 'E'){
reto = JSON.parse(res)
@@ -1001,8 +1106,9 @@ var com = new function(){
that.link = reto.client_link;
that.backend_link = reto.backend_link;
idendesc = reto.client_idendesc;
+ addr = reto.ip + ':' + reto.port;
- ws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn');
+ ws = new WebSocket('ws://' + addr + '/conn');
ws.onopen = function(){
var i;
var conn;
@@ -1011,10 +1117,11 @@ var com = new function(){
console.log('open');
ws.send(JSON.stringify({
+ 'conntype':'main',
'client_link':that.link
}));
- conn = new WebSocketConnection(reto.backend_link,ws);
+ conn = new WebSocketConnection(reto.backend_link,ws,addr);
new imc.Auth();
new imc.Proxy(that.link,imc.Auth.instance,function(link,callback){
@@ -1037,12 +1144,30 @@ var com = new function(){
}else{
that.conn_callback.fire();
}
+
+ $('#test_fs').on('change',function(e){
+ var blob = this.files[0];
+
+ console.log(new Date().getTime());
+
+ that.sendfile_backend(blob,function(filekey){
+ that.call_backend('test/','test_dst',function(result){
+ console.log(result);
+ },filekey);
+ console.log(filekey);
+ },function(result){
+ console.log(result);
+ });
+
+ });
+
};
}else{
setTimeout(that.conn_backend,5000);
}
});
}
+
that.call_backend = function(path,func_name,callback){
var i;
var params = new Array()
@@ -1053,5 +1178,9 @@ var com = new function(){
}
imc.Proxy.instance.call.apply(undefined,params);
- }
+ };
+ that.sendfile_backend = function(blob,filekey_callback,result_callback){
+ return imc.Proxy.instance.sendfile(that.backend_link,blob,
+ filekey_callback,result_callback);
+ };
};
diff --git a/src/js/imc.js b/src/js/imc.js
index ae81684..05ce6be 100644
--- a/src/js/imc.js
+++ b/src/js/imc.js
@@ -15,6 +15,7 @@ var imc = new function(){
that.link = link;
that.send_msg = function(data){};
+ that.send_file = function(filekey,blob,callback){};
that.start_recv = function(recv_callback){};
that.close = function(){
@@ -29,14 +30,18 @@ var imc = new function(){
that.Proxy = function(self_link,auth,conn_link){
var MSGTYPE_CALL = 'call';
var MSGTYPE_RET = 'ret';
+ var MSGTYPE_SENDFILE = 'sendfile';
+ var MSGTYPE_ABORTFILE = 'abortfile';
var that = this;
var caller_retid_count = 0;
var conn_linkmap = {};
var conn_retidmap = {};
var callpath_root = {'child':{},'name':{},'filt':[]};
+ var info_filekeymap = {};
+ var conn_filekeymap = {};
- var walk_path = function(path,create){
+ function walk_path(path,create){
var i;
var parts;
var part;
@@ -67,9 +72,9 @@ var imc = new function(){
}
return cnode;
- };
+ }
- var route_call = function(caller_link,caller_retid,idendesc,dst,func_name,timeout,callback,param){
+ function route_call(caller_link,caller_retid,idendesc,dst,func_name,timeout,callback,param){
var i;
var j;
var part;
@@ -80,12 +85,12 @@ var imc = new function(){
var dpart;
var func;
- var _add_wait_caller = function(conn_link){
+ function _add_wait_caller(conn_link){
conn_retidmap[conn_link][caller_retid] = {
'timeout':timeout,
'callback':callback
}
- };
+ }
part = dst.split('/');
dst_link = part.slice(0,3).join('/') + '/'
@@ -114,13 +119,20 @@ var imc = new function(){
_add_wait_caller(self_link);
func.apply(undefined,[function(data){
- if(self_link in conn_retidmap && caller_retid in conn_retidmap[self_link]){
+ if(self_link in conn_retidmap &&
+ caller_retid in conn_retidmap[self_link]){
+
delete conn_retidmap[self_link][caller_retid];
- callback({'stat':true,'data':data});
+
+ if(callback != null && callback != undefined){
+ callback({'stat':true,'data':data});
+ }
}
}].concat(param));
}else{
- callback({'stat':false,'data':'Enoexist'});
+ if(callback != null && callback != undefined){
+ callback({'stat':false,'data':'Enoexist'});
+ }
}
}else{
that.request_conn(dst_link,function(conn){
@@ -128,22 +140,109 @@ var imc = new function(){
_add_wait_caller(conn.link);
}
- send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param);
+ send_msg_call(conn,caller_link,caller_retid,idendesc,dst,
+ func_name,timeout,param);
});
}
- };
+ }
+ function ret_call(conn_link,caller_link,caller_retid,result){
+ var wait;
+
+ if(conn_link in conn_retidmap &&
+ caller_retid in conn_retidmap[conn_link]){
+
+ wait = conn_retidmap[conn_link][caller_retid];
+ delete conn_retidmap[conn_link][caller_retid];
+ }else{
+ return;
+ }
+
+ if(caller_link == self_link){
+ wait.callback(result);
+ }else{
+ request_conn(caller_link,function(conn){
+ send_msg_ret(conn,caller_link,caller_retid,result);
+ });
+ }
+ }
+ function route_sendfile(out_conn,src_link,filekey,filesize){
+ var info;
+
+ function _send_cb(err){
+ if(del_wait_filekey(out_conn,filekey)){
+ return;
+ }
+
+ if(err != undefined){
+ out_conn.abort_file(filekey,err);
+ send_msg_abortfile(out_conn,filekey,err);
+ }
+
+ ret_sendfile(filekey,err);
+ }
+
+ if(src_link != self_link){
+ //TODO
+ return;
+ }
+
+ if((info = info_filekeymap[filekey]) == undefined){
+ return;
+ }
+
+ info.callback = _send_cb;
+ add_wait_filekey(out_conn.link,filekey,info.blob.size,_send_cb);
+ out_conn.send_file(filekey,info.blob,_send_cb);
+ }
+ function ret_sendfile(filekey,err){
+ var info;
+
+ if((info = info_filekeymap[filekey]) == undefined){
+ return false;
+ }
+
+ delete info_filekeymap[filekey];
+
+ if(err == undefined){
+ info.result_callback('Success');
+ }else{
+ info.result_callback(err);
+ }
+
+ return true;
+ }
+ function add_wait_filekey(conn_link,filekey,filesize,callback){
+ conn_filekeymap[conn_link][filekey] = {
+ 'callback':callback
+ };
+ }
+ function del_wait_filekey(conn_link,filekey){
+ if(conn_link in conn_filekeymap &&
+ filekey in conn_filekeymap[conn_link]){
- var recv_dispatch = function(conn,data){
+ delete conn_filekeymap[conn_link][filekey];
+
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ function recv_dispatch(conn,data){
var msgo = JSON.parse(data);
if(msgo.type == MSGTYPE_CALL){
recv_msg_call(conn,msgo);
}else if(msgo.type == MSGTYPE_RET){
recv_msg_ret(conn,msgo);
+ }else if(msgo.type == MSGTYPE_SENDFILE){
+ recv_msg_sendfile(conn,msgo);
+ }else if(msgo.type == MSGTYPE_ABORTFILE){
+ recv_msg_abortfile(conn,msgo);
}
- };
+ }
- var send_msg_call = function(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param){
+ function send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param){
var msg = {
'type':MSGTYPE_CALL,
'caller_link':caller_link,
@@ -156,8 +255,8 @@ var imc = new function(){
};
conn.send_msg(JSON.stringify(msg));
- };
- var recv_msg_call = function(conn,msg){
+ }
+ function recv_msg_call(conn,msg){
var caller_link = msg.caller_link
var caller_retid = msg.caller_retid;
var timeout = msg.timeout;
@@ -171,9 +270,9 @@ var imc = new function(){
send_msg_ret(conn,caller_link,caller_retid,result);
});
},param);
- };
+ }
- var send_msg_ret = function(conn,caller_link,caller_retid,result){
+ function send_msg_ret(conn,caller_link,caller_retid,result){
var msg = {
'type':MSGTYPE_RET,
'caller_link':caller_link,
@@ -182,30 +281,43 @@ var imc = new function(){
};
conn.send_msg(JSON.stringify(msg));
- };
- var recv_msg_ret = function(conn,msg){
+ }
+ function recv_msg_ret(conn,msg){
var caller_link = msg['caller_link'];
var caller_retid = msg['caller_retid'];
var result = msg['result'];
- var wait;
- if(caller_link == self_link){
- if(conn.link in conn_retidmap && caller_retid in conn_retidmap[conn.link]){
- wait = conn_retidmap[conn.link][caller_retid];
- delete conn_retidmap[conn.link][caller_retid];
+ ret_call(conn.link,caller_link,caller_retid,result);
+ }
- wait.callback(result);
- }
- }else{
- request_conn(caller_link,function(conn){
- send_msg_ret(conn,caller_link,caller_retid,result);
- });
+ function recv_msg_sendfile(conn,msg){
+ route_sendfile(conn,msg.src_link,msg.filekey,msg.filesize);
+ }
+
+ function send_msg_abortfile(conn,filekey,err){
+ var msg = {
+ 'type':MSGTYPE_ABORTFILE,
+ 'filekey':filekey,
+ 'error':err
+ };
+
+ conn.send_msg(JSON.stringify(msg));
+ }
+ function recv_msg_abortfile(conn,msg){
+ var filekey = msg.filekey;
+ var err = msg.error;
+
+ if(conn.link in conn_filekeymap &&
+ filekey in conn_filekeymap[conn.link]){
+
+ conn_filekeymap[conn.link][filekey].callback(err);
}
- };
+ }
that.add_conn = function(conn){
conn_linkmap[conn.link] = conn;
conn_retidmap[conn.link] = {};
+ conn_filekeymap[conn.link] = {};
conn.start_recv(recv_dispatch);
};
that.link_conn = function(link,conn){
@@ -218,7 +330,19 @@ var imc = new function(){
delete conn.link_linkmap[link];
};
that.del_conn = function(conn){
+ retids = conn_retidmap[conn.link];
+ for(retid in retids){
+ ret_call(conn.link,caller_link,caller_retid,result);
+ }
+
+ filekeys = conn_filekeymap[conn.link];
+ for(filekey in filekeys){
+ filekeys[filekey].callback('Eclose');
+ }
+
+ delete conn_retidmap[conn.link];
delete conn_linkmap[conn.link];
+ delete conn_filekeymap[conn.link];
};
that.request_conn = function(link,callback){
var conn = conn_linkmap[link];
@@ -251,6 +375,32 @@ var imc = new function(){
route_call(self_link,caller_retid,imc.Auth.get_current_idendesc(),dst,func_name,timeout,callback,params);
};
+ that.sendfile = function(dst_link,
+ blob,
+ filekey_callback,
+ result_callback){
+
+ var filekey = self_link + '_' + Math.random();
+
+ info_filekeymap[filekey] = {
+ 'blob':blob,
+ 'result_callback':result_callback,
+ 'callback':function(err){
+ if(ret_sendfile(filekey,err) && err != undefined){
+ that.call(dst_link + 'imc/','abort_sendfile',65536,null,filekey,err);
+ }
+ }
+ };
+
+ that.call(dst_link + 'imc/','pend_recvfile',65536,function(result){
+ filekey_callback(filekey);
+ },self_link,filekey,blob.size);
+ };
+ that.abortfile = function(filekey){
+ if((info = info_filekeymap[filekey]) != undefined){
+ info.callback('Eabort');
+ }
+ };
that.register_call = function(path,func_name,func){
var cnode;
@@ -284,7 +434,15 @@ var imc = new function(){
cnode.filt.remove(func);
};
+ that.register_call('imc/','abort_sendfile',
+ function(callback,filekey,err){
+
+ callback('Success');
+ ret_sendfile(filekey,'Eabort');
+ });
+
conn_retidmap[self_link] = {};
+ conn_filekeymap[self_link] = {};
imc.Proxy.instance = that;
};
diff --git a/src/pmod/pmod_test/css/manage.less b/src/pmod/pmod_test/css/manage.less
index d0b1d59..3c7fd79 100644
--- a/src/pmod/pmod_test/css/manage.less
+++ b/src/pmod/pmod_test/css/manage.less
@@ -1,14 +1,25 @@
+@import 'mixin.less';
+@import 'color.less';
+
#index_page{
div.create_mode,div.set_mode{
- div.content{
- height:256px;
+ div.block{
+ margin-bottom:@MediumPad;
+
+ div.data{
+ height:256px;
+ }
}
}
table.table{
- td.oper{
- div.btn-group{
- position:absolute;
+ tr.item{
+ height:48px;
+
+ td.oper{
+ div.btn-group{
+ position:absolute;
+ }
}
- }
+ }
}
}
diff --git a/src/pmod/pmod_test/html/manage.html b/src/pmod/pmod_test/html/manage.html
index e3b1816..b4ab4d8 100644
--- a/src/pmod/pmod_test/html/manage.html
+++ b/src/pmod/pmod_test/html/manage.html
@@ -12,13 +12,48 @@
<select name="testmode"></select>
</div>
</div>
- <div class="row-fluid">
- <div class="span13">
- <label>題目內容</label>
- <div class="content"></div>
+
+ <div class="block content">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">題目內容</span>
+ <input class="span2 title" type="text" value="內容" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
</div>
</div>
- </div>
+
+ <div class="block format">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">格式說明</span>
+ <input class="span2 title" type="text" value="I/O格式" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
+ </div>
+ </div>
+
+ <div class="block testdata">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">範例資料</span>
+ <input class="span2 title" type="text" value="範例測資" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
+ </div>
+ </div> </div>
<div class="modal-footer">
<button class="btn btn-primary submit">確定</button>
<button class="btn cancel">取消</button>
@@ -36,10 +71,46 @@
<select name="testmode"></select>
</div>
</div>
- <div class="row-fluid">
- <div class="span13">
- <label>題目內容</label>
- <div class="content"></div>
+
+ <div class="block content">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">題目內容</span>
+ <input class="span2 title" type="text" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
+ </div>
+ </div>
+
+ <div class="block format">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">格式說明</span>
+ <input class="span2 title" type="text" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
+ </div>
+ </div>
+
+ <div class="block testdata">
+ <div class="row-fluid">
+ <div class="input-prepend span13">
+ <span class="add-on">範例資料</span>
+ <input class="span2 title" type="text" placeholder="區塊標題">
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span13">
+ <div class="data"></div>
+ </div>
</div>
</div>
</div>
@@ -97,7 +168,7 @@
<div class="row">
<div class="span3">
- sdfsdfs
+
</div>
<div class="span4">
<h3>模式</h3>
diff --git a/src/pmod/pmod_test/js/pmod_test.js b/src/pmod/pmod_test/js/pmod_test.js
index 3d43c54..93565b4 100644
--- a/src/pmod/pmod_test/js/pmod_test.js
+++ b/src/pmod/pmod_test/js/pmod_test.js
@@ -86,7 +86,7 @@ var pmod_test = function(proid,pro_node){
}
}
function _mode_create(modeid,testmodeid){
- var j_item = $('<tr><td class="id"></td><td class="testmode"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>')
+ var j_item = $('<tr class="item"><td class="id"></td><td class="testmode"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>')
_mode_set(j_item,modeid,testmodeid);
@@ -160,7 +160,7 @@ var pmod_test = function(proid,pro_node){
});
}
function _testmode_create(testmodeid,testmodename){
- var j_item = $('<tr><td class="id"></td><td class="name"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>')
+ var j_item = $('<tr class="item"><td class="id"></td><td class="name"></td><td class="oper"><div class="btn-group"><button class="btn btn-small set"><i class="icon-cog"></i></button><button class="btn btn-small del"><i class="icon-trash"></i></button></div></td></tr>')
_testmode_set(j_item,testmodeid,testmodename);
@@ -198,6 +198,79 @@ var pmod_test = function(proid,pro_node){
function _update(){
_testmode_update().done(_mode_update);
}
+ function _mix_content(j_box){
+ var content_title = j_box.find('div.content input.title').val();
+ var content = j_box.find('div.content div.data').data('codebox').getValue();
+ var format_title = j_box.find('div.format input.title').val();
+ var format = j_box.find('div.format div.data').data('codebox').getValue();
+ var testdata_title = j_box.find('div.testdata input.title').val();
+ var testdata = j_box.find('div.testdata div.data').data('codebox').getValue();
+
+ console.log(content_title);
+
+ return '<!--content_title_start--><h4>' + content_title + '</h4><!--content_title_end-->' +
+ '<!--content_start-->' + content + '<!--content_end-->' +
+ '<!--format_title_start--><h4>' + format_title + '</h4><!--format_title_end-->' +
+ '<!--format_start-->' + format + '<!--format_end-->' +
+ '<!--testdata_title_start--><h4>' + testdata_title + '</h4><!--testdata_title_end-->' +
+ '<!--testdata_start-->' + testdata + '<!--testdata_end-->';
+ }
+ function _parse_content(j_box,mix_content){
+ var part;
+ var content_title;
+ var content;
+ var format_title;
+ var format;
+ var testdata_title;
+ var testdata;
+
+ console.log(mix_content);
+ part = mix_content.match(/<!--content_title_start--><h4>([\s\S.]*)<\/h4><!--content_title_end-->/);
+ if(part != null){
+ content_title = part[1];
+ }else{
+ content_title = '';
+ }
+ part = mix_content.match(/<!--content_start-->([\s\S.]*)<!--content_end-->/);
+ if(part != null){
+ content = part[1];
+ }else{
+ content = '';
+ }
+
+ part = mix_content.match(/<!--format_title_start--><h4>([\s\S.]*)<\/h4><!--format_title_end-->/);
+ if(part != null){
+ format_title = part[1];
+ }else{
+ format_title = '';
+ }
+ part = mix_content.match(/<!--format_start-->([\s\S.]*)<!--format_end-->/);
+ if(part != null){
+ format = part[1];
+ }else{
+ format = '';
+ }
+
+ part = mix_content.match(/<!--testdata_title_start--><h4>([\s\S.]*)<\/h4><!--testdata_title_end-->/);
+ if(part != null){
+ testdata_title = part[1];
+ }else{
+ testdata_title = '';
+ }
+ part = mix_content.match(/<!--testdata_start-->([\s\S.]*)<!--testdata_end-->/);
+ if(part != null){
+ testdata = part[1];
+ }else{
+ testdata = '';
+ }
+
+ j_box.find('div.content input.title').val(content_title);
+ j_box.find('div.content div.data').codebox().setValue(content);
+ j_box.find('div.format input.title').val(format_title);
+ j_box.find('div.format div.data').codebox().setValue(format);
+ j_box.find('div.testdata input.title').val(testdata_title);
+ j_box.find('div.testdata div.data').codebox().setValue(testdata);
+ }
if(direct == 'in'){
com.loadpage('/toj/pmod/pmod_test/html/manage.html').done(function(){
@@ -205,17 +278,37 @@ var pmod_test = function(proid,pro_node){
j_testmode_list = j_index_page.find('table.testmode > tbody');
j_create_mode = j_index_page.find('div.create_mode');
- com.create_codebox(j_create_mode.find('div.content'),'text/html');
+ j_create_mode.find('div.content div.data').codebox({'mode':'text/html'});
+ j_create_mode.find('div.format div.data').codebox({'mode':'text/html'});
+ j_create_mode.find('div.testdata div.data').codebox({'mode':'text/html'});
j_create_mode.on('shown',function(e){
- j_create_mode.find('div.content').data('codebox').refresh();
+ var i;
+ var codeboxs;
+
+ codeboxs = j_create_mode.find('div.block div.data');
+ for(i = 0;i < codeboxs.length;i++){
+ $(codeboxs[i]).data('codebox').refresh();
+ }
});
j_create_mode.on('hide',function(e){
- j_create_mode.find('div.content').data('codebox').setValue('');
+ var i;
+ var codeboxs;
+
+ j_create_mode.find('div.content input.title').val('內容');
+ j_create_mode.find('div.format input.title').val('I/O格式');
+ j_create_mode.find('div.testdata input.title').val('範例測資');
+
+ codeboxs = j_create_mode.find('div.block div.data');
+ for(i = 0;i < codeboxs.length;i++){
+ $(codeboxs[i]).data('codebox').setValue('');
+ }
});
j_create_mode.find('button.submit').on('click',function(e){
- var content = j_create_mode.find('div.content').data('codebox').getValue();
var testmodeid = parseInt(j_create_mode.find('[name="testmode"]').val());
+ var mix_content;
+
+ mix_content = _mix_content(j_create_mode);
com.call_backend(callpath,'add_mode',function(result){
if(com.is_callerr(result)){
@@ -226,7 +319,7 @@ var pmod_test = function(proid,pro_node){
_update();
}
- },content,testmodeid);
+ },mix_content,testmodeid);
});
j_create_mode.find('button.cancel').on('click',function(e){
j_create_mode.modal('hide');
@@ -237,16 +330,20 @@ var pmod_test = function(proid,pro_node){
});
j_set_mode = j_index_page.find('div.set_mode');
- com.create_codebox(j_set_mode.find('div.content'),'text/html');
+ j_set_mode.find('div.content div.data').codebox({'mode':'text/html'});
+ j_set_mode.find('div.format div.data').codebox({'mode':'text/html'});
+ j_set_mode.find('div.testdata div.data').codebox({'mode':'text/html'});
j_set_mode.on('show',function(e){
com.call_backend(callpath,'get_mode',function(result){
var data = result.data;
+ var parse_content;
if(com.is_callerr(result)){
index.add_alert('','警告','管理發生錯誤');
}else{
- j_set_mode.find('div.content').data('codebox').setValue(data.content);
+ parse_content = _parse_content(j_set_mode,data.content);
+
if(data.testmodeid == null){
j_set_mode.find('[name="testmode"]').val(0);
}else{
@@ -256,20 +353,37 @@ var pmod_test = function(proid,pro_node){
},set_mode_id);
});
j_set_mode.on('shown',function(e){
- j_set_mode.find('div.content').data('codebox').refresh();
+ var i;
+ var codeboxs;
+
+ codeboxs = j_set_mode.find('div.block div.data');
+ for(i = 0;i < codeboxs.length;i++){
+ $(codeboxs[i]).data('codebox').refresh();
+ }
});
j_set_mode.on('hide',function(e){
+ var i;
+ var codeboxs;
+
set_mode_id = null;
- j_set_mode.find('div.content').data('codebox').setValue('');
+
+ j_set_mode.find('div.block input.title').val('');
+
+ codeboxs = j_set_mode.find('div.block div.data');
+ for(i = 0;i < codeboxs.length;i++){
+ $(codeboxs[i]).data('codebox').setValue('');
+ }
});
j_set_mode.find('button.submit').on('click',function(e){
- var content = j_set_mode.find('div.content').data('codebox').getValue();
var testmodeid = parseInt(j_set_mode.find('[name="testmode"]').val());
+ var mix_content;
if(testmodeid == 0){
testmodeid = null;
}
+ mix_content = _mix_content(j_set_mode);
+
com.call_backend(callpath,'set_mode',function(result){
if(com.is_callerr(result)){
index.add_alert('','警告','管理發生錯誤');
@@ -279,7 +393,7 @@ var pmod_test = function(proid,pro_node){
_update();
}
- },set_mode_id,content,testmodeid);
+ },set_mode_id,mix_content,testmodeid);
});
j_set_mode.find('button.cancel').on('click',function(e){
j_set_mode.modal('hide');
diff --git a/src/pmod/pmod_test/py/pmod_test.py b/src/pmod/pmod_test/py/pmod_test.py
index 33e4a0e..ac96f19 100644
--- a/src/pmod/pmod_test/py/pmod_test.py
+++ b/src/pmod/pmod_test/py/pmod_test.py
@@ -46,9 +46,19 @@ class pmod_test(Problem):
Proxy.instance.register_call(
self._reg_path, 'list_testmode', self.list_testmode)
Proxy.instance.register_call(
- self._reg_path, 'set_testdata', self.set_testdata)
+ self._reg_path, 'create_testdata', self.create_testdata)
+ Proxy.instance.register_call(
+ self._reg_path, 'delete_testdata', self.delete_testdata)
Proxy.instance.register_call(
self._reg_path, 'get_testdata', self.get_testdata)
+ Proxy.instance.register_call(
+ self._reg_path, 'list_testdata', self.list_testdata)
+ Proxy.instance.register_call(
+ self._reg_path, 'set_testdata', self.set_testdata)
+ Proxy.instance.register_call(
+ self._reg_path, 'set_testmode_testdata', self.set_testdata)
+ Proxy.instance.register_call(
+ self._reg_path, 'get_testmode_testdata', self.get_testdata)
def unload(self, force):
Proxy.instance.unregister_call(
@@ -72,9 +82,19 @@ class pmod_test(Problem):
Proxy.instance.unregister_call(
self._reg_path, 'get_testmode')
Proxy.instance.unregister_call(
- self._reg_path, 'set_testdata')
+ self._reg_path, 'create_testdata')
+ Proxy.instance.unregister_call(
+ self._reg_path, 'delete_testdata')
Proxy.instance.unregister_call(
self._reg_path, 'get_testdata')
+ Proxy.instance.unregister_call(
+ self._reg_path, 'list_testdata')
+ Proxy.instance.unregister_call(
+ self._reg_path, 'set_testdata')
+ Proxy.instance.unregister_call(
+ self._reg_path, 'set_testmode_testdata')
+ Proxy.instance.unregister_call(
+ self._reg_path, 'get_testmode_testdata')
@staticmethod
@TOJAuth.check_access(mod.ProblemMg._accessid, TOJAuth.ACCESS_CREATE)
@@ -398,7 +418,161 @@ class pmod_test(Problem):
return testmode_list
@imc.async.caller
- def set_testdata(self, testmodeid, testdata):
+ def create_testdata(self, info, filekey, expire = None):
+ if expire != None:
+ expire = com.isoptime(expire)
+ if expire == None:
+ return 'Eparameter'
+
+ if(
+ type(info) != str or
+ type(filekey) != str
+ ):
+ return 'Eparameter'
+
+ testid = self._create_testdata(info, filekey, expire)
+
+ if testid == None:
+ return 'Eupload'
+
+ return {'testid': testid}
+
+ def _create_testdata(self, info, filekey, expire):
+ TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
+
+ # Upload file
+
+ blobname = 'TEST_BLOBNAME'
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ testid = mod.TestdataMg.instance._add_testdata(
+ blobname, expire, self._proid, info)
+
+ return testid
+
+ @imc.async.caller
+ def delete_testdata(self, testid):
+ if(
+ type(testid) != int
+ ):
+ return 'Eparameter'
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ test = mod.TestdataMg.instance._get_testdata(testid)
+
+ if test == None:
+ return 'Etestid'
+
+ if test['proid'] != self._proid:
+ return 'Eother_proid'
+
+ self._delete_testdata(testid)
+
+ return 'Success'
+
+ def _delete_testdata(self, testid):
+ TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_DELETE)
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ mod.TestdataMg.instance._del_testdata(testid)
+
+ @imc.async.caller
+ def get_testdata(self, testid):
+ if(
+ type(testid) != int
+ ):
+ return 'Eparameter'
+
+ TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ test = mod.TestdataMg.instance._get_testdata(testid)
+
+ if test == None:
+ return 'Etestid'
+
+ if test['proid'] != self._proid:
+ return 'Eother_proid'
+
+ del test['blobname']
+ del test['proid']
+
+ return test
+
+ @imc.async.caller
+ def list_testdata(self):
+ testdata_list = self._list_testdata()
+
+ return testdata_list
+
+ def _list_testdata(self):
+ TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ testdata_list = mod.TestdataMg.instance._list_testdata(self._proid)
+
+ for test in testdata_list:
+ del test['blobname']
+ del test['proid']
+
+ return testdata_list
+
+ @imc.async.caller
+ def set_testdata(self, testid, info, filekey = None, expire = None):
+ if expire != None:
+ expire = com.isoptime(expire)
+ if expire == None:
+ return 'Eparameter'
+
+ if(
+ type(testid) != int or
+ type(info) != str or
+ (filekey != None and type(filekey) != str)
+ ):
+ return 'Eparameter'
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ test = mod.TestdataMg.instance._get_testdata(testid)
+
+ if test == None:
+ return 'Etestid'
+
+ if test['proid'] != self._proid:
+ return 'Eother_proid'
+
+ result = self._set_testdata(testid, info, filekey, expire)
+
+ if result == None:
+ return 'Efailed'
+
+ if result == False:
+ return 'Eupload'
+
+ return 'Success'
+
+ def _set_testdata(self, testid, info, filekey, expire):
+ TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ test = mod.TestdataMg.instance._get_testdata(testid)
+
+ blobname = test['blobname']
+ if test['proid'] != self._proid:
+ return None
+
+ if filekey != None:
+ # Upload file
+ # Update blob 'blobname'
+ # If failed return False
+
+ with TOJAuth.change_current_iden(self._idendesc):
+ mod.TestdataMg.instance._update_testdata(
+ testid, blobname, expire, self._proid, info)
+
+ return True
+
+ @imc.async.caller
+ def set_testmode_testdata(self, testmodeid, testdata):
if(
type(testmodeid) != int or
type(testdata) != list
@@ -432,7 +606,7 @@ class pmod_test(Problem):
return 'Success'
- def _set_testdata(self, testmodeid, testdata):
+ def _set_testmode_testdata(self, testmodeid, testdata):
TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
cur = self.db.cursor()
@@ -472,7 +646,7 @@ class pmod_test(Problem):
cur.execute(sqlstr, sqlarr)
@imc.async.caller
- def get_testdata(self, testmodeid):
+ def get_testmode_testdata(self, testmodeid):
if(
type(testmodeid) != int
):
@@ -485,7 +659,7 @@ class pmod_test(Problem):
return testdata
- def _get_testdata(self, testmodeid):
+ def _get_testmode_testdata(self, testmodeid):
TOJAuth.check_access_func(self._accessid, TOJAuth.ACCESS_WRITE)
cur = self.db.cursor()
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 782b000..2e56b52 100755
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -1,5 +1,6 @@
#! /usr/bin/env python
+import os
import traceback
import sys
import socket
@@ -7,6 +8,8 @@ import json
import datetime
import time
import random
+import uuid
+from collections import deque
from multiprocessing import Process
import tornado.ioloop
@@ -16,14 +19,15 @@ import tornado.websocket
import imc.async
from imc.proxy import Proxy,Connection
+from imc.blobclient import BlobClient
import mod
import netio
-from netio import SocketStream,SocketConnection,WebSocketConnection
+from netio import SocketStream,SocketConnection
+from netio import WebSocketStream,WebSocketConnection
from tojauth import TOJAuth
from test_blob import TOJBlobTable,TOJBlobHandle
-from imc.blobclient import BlobClient
class StdLogger(object):
def __init__(self,callback):
@@ -48,7 +52,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._link = None
self._idendesc = None
self._pend_mainconn_linkmap = {}
- self._pend_filestream_filekeymap = {}
+ self._pend_filekeymap = {}
self._client_linkmap = {}
def start(self):
@@ -66,7 +70,12 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def handle_stream(self,stream,addr):
def _recv_conn_info(data):
info = json.loads(data.decode('utf-8'))
- conntype = info['conntype']
+
+ try:
+ conntype = info['conntype']
+
+ except KeyError:
+ socket_stream.close()
if conntype == 'main':
self._handle_mainconn(sock_stream,addr,info)
@@ -74,13 +83,16 @@ class BackendWorker(tornado.tcpserver.TCPServer):
elif conntype == 'file':
self._handle_fileconn(sock_stream,addr,info)
+ else:
+ socket_stream.close()
+
fd = stream.fileno()
self._ioloop.remove_handler(fd)
sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0))
netio.recv_pack(sock_stream,_recv_conn_info)
- def add_client(self,link,handler):
+ def add_client(self,link,main_stream):
@imc.async.caller
def _call():
with TOJAuth.change_current_iden(self._idendesc):
@@ -88,14 +100,13 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._client_linkmap[link] = {}
- conn = netio.WebSocketConnection(link,handler)
+ conn = netio.WebSocketConnection(link,main_stream,self.pend_filestream,
+ self.del_pend_filestream)
conn.add_close_callback(lambda conn : self.del_client(conn.link))
Proxy.instance.add_conn(conn)
_call()
- return conn
-
def del_client(self,link):
@imc.async.caller
def _call():
@@ -105,6 +116,34 @@ class BackendWorker(tornado.tcpserver.TCPServer):
del self._client_linkmap[link]
_call()
+
+ def pend_filestream(self,streamtype,filekey,callback,count = 1):
+ assert(filekey not in self._pend_filekeymap)
+
+ self._pend_filekeymap[filekey] = {
+ 'streamtype':streamtype,
+ 'count':count,
+ 'stream':[],
+ 'callback':tornado.stack_context.wrap(callback)
+ }
+
+ def add_filestream(self,streamtype,filekey,stream):
+ try:
+ pend = self._pend_filekeymap[filekey]
+
+ except KeyError:
+ raise
+
+ assert(pend['streamtype'] == streamtype)
+
+ pend['count'] -= 1
+ if pend['count'] == 0:
+ self._pend_filekeymap.pop(filekey)
+
+ pend['callback'](stream)
+
+ def del_pend_filestream(self,filekey):
+ self._pend_filekeymap.pop(filekey,None)
def _conn_center(self):
def __retry(conn):
@@ -123,14 +162,17 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._link = info['worker_link']
Proxy(self._link,TOJAuth.instance,self._idendesc,self._conn_link)
- self.center_conn = SocketConnection(info['center_link'],stream,self.center_addr,self._add_pend_filestream)
+ self.center_conn = SocketConnection(info['center_link'],stream,
+ self.center_addr,
+ self.pend_filestream,
+ self.del_pend_filestream)
self.center_conn.add_close_callback(__retry)
Proxy.instance.add_conn(self.center_conn)
- self._init_blobclient()
+ #self._init_blobclient()
#Proxy.instance.register_call('test/','get_client_list',self._test_get_client_list)
- #Proxy.instance.register_call('test/','test_dst',self._test_dst)
+ Proxy.instance.register_call('test/','test_dst',self._test_dst)
#Proxy.instance.register_filter('test/',self._test_filter)
try:
@@ -143,8 +185,8 @@ class BackendWorker(tornado.tcpserver.TCPServer):
except Exception as e:
print(e)
- #if self._link == '/backend/2/':
- # self._test_call(None)
+ if self._link == '/backend/2/':
+ self._test_call(None)
sock_ip,sock_port = self.sock_addr
netio.send_pack(stream,bytes(json.dumps({
@@ -171,19 +213,52 @@ class BackendWorker(tornado.tcpserver.TCPServer):
'blobtmp/' + str(self.ws_port - 79),
TOJBlobTable(self.ws_port - 79),
TOJBlobHandle)
-
- blobclient.open_container('test','ACTIVE')
- try:
+
+ print(self.ws_port, "open cantainer test")
+ print(blobclient.open_container('test','ACTIVE'))
+ # if False:
+ if self.ws_port == 81:
handle = blobclient.open(
'test','testblob',
TOJBlobHandle.WRITE | TOJBlobHandle.CREATE
)
- except:
- pass
- print(handle._fileno)
- handle.write(bytes('Hello Data','utf-8'),0)
- handle.commit(False);
+ print(handle._fileno)
+ handle.write(bytes('Hello Data','utf-8'),0)
+ print('create commit:', handle.commit(False))
+ handle.close()
+ print("#########################################################")
+ # print("wait for 3 secs...")
+ # time.sleep(3)
+ # try:
+ # handle = blobclient.open(
+ # 'test', 'testblob',
+ # TOJBlobHandle.CREATE
+ # )
+ # except ValueError as e:
+ # print("catch ValueError:", str(e))
+ # print("#########################################################")
+ # print("wait for 3 secs...")
+ # time.sleep(3)
+ # handle = blobclient.open(
+ # 'test', 'testblob',
+ # TOJBlobHandle.WRITE
+ # )
+ # handle.write(bytes('Hello new line\n','utf-8'),30)
+ # print('write commit:', handle.commit(False))
+ # handle.close()
+ # print("#########################################################")
+ # print("wait for 3 secs...")
+ # time.sleep(3)
+ # handle = blobclient.open(
+ # 'test', 'testblob',
+ # TOJBlobHandle.WRITE | TOJBlobHandle.DELETE
+ # )
+ # handle.delete()
+ # print('delete commit:', handle.commit(False))
+ # handle.close()
+ blobclient.clean()
+ blobclient.show_status()
def _conn_link(self,link):
def __handle_pend(conn):
@@ -216,7 +291,9 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def __recv_cb(data):
stat = json.loads(data.decode('utf-8'))
if stat == True:
- conn = SocketConnection(worker_link,main_stream,sock_addr,self._add_pend_filestream)
+ conn = SocketConnection(worker_link,main_stream,sock_addr,
+ self.pend_filestream,
+ self.del_pend_filestream)
Proxy.instance.add_conn(conn)
__handle_pend(conn)
@@ -255,9 +332,6 @@ class BackendWorker(tornado.tcpserver.TCPServer):
return imc.async.switch_top()
- def _add_pend_filestream(self,filekey,callback):
- self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback)
-
def _handle_mainconn(self,main_stream,addr,info):
link = info['link']
sock_ip = info['sock_ip']
@@ -268,7 +342,9 @@ class BackendWorker(tornado.tcpserver.TCPServer):
return
if (link not in self._pend_mainconn_linkmap) or self._link > link:
- conn = SocketConnection(link,main_stream,(sock_ip,sock_port),self._add_pend_filestream)
+ conn = SocketConnection(link,main_stream,(sock_ip,sock_port),
+ self.pend_filestream,
+ self.del_pend_filestream)
Proxy.instance.add_conn(conn)
netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8'))
@@ -283,10 +359,10 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _handle_fileconn(self,file_stream,addr,info):
try:
- self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream)
+ self.add_filestream('socket',info['filekey'],file_stream)
- except KeyError:
- pass
+ except Exception:
+ file_stream.close()
def _get_link(self,linkclass,uid = 0):
if linkclass == 'center':
@@ -319,6 +395,10 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_call(self,param):
with TOJAuth.change_current_iden(self._idendesc):
+ ret = Proxy.instance.call('/backend/3/test/','test_dst',1000,'Hello')
+ print(ret)
+
+ '''
st = time.perf_counter()
for i in range(0,2):
dst = '/backend/' + str((i % 2) + 2) + '/'
@@ -332,33 +412,17 @@ class BackendWorker(tornado.tcpserver.TCPServer):
print(time.perf_counter() - st)
print(self._link)
-
- return
-
- pend = []
- for i in range(0,32):
- if str((i % 16) + 2) == self._link:
- continue
-
- fileres = Proxy.instance.sendfile('/backend/' + str((i % 16) + 2) + '/','Fedora-18-x86_64-DVD.iso')
-
- dst = '/backend/' + str((i % 16) + 2) + '/'
- ret = Proxy.instance.call(self._idendesc,dst,'test_dst',fileres.filekey)
-
- pend.append(fileres)
-
- for p in pend:
- print(self._link + ' ' + p.wait())
-
- print(self._link)
+ '''
@imc.async.caller
def _test_dst(self,filekey):
print(filekey)
- self._ioloop.add_timeout(datetime.timedelta(milliseconds = 2000),lambda : Proxy.instance.abortfile(filekey))
- #Proxy.instance.abortfile(filekey)
fileres = Proxy.instance.recvfile(filekey,'data')
+
+ #self._ioloop.add_timeout(datetime.timedelta(milliseconds = 500),lambda : Proxy.instance.abortfile(filekey))
+ #Proxy.instance.abortfile(filekey)
+ #fileres = Proxy.instance.recvfile(filekey,'data')
#print('recv ' + fileres.wait())
print(fileres.wait())
@@ -375,24 +439,47 @@ class WebSocketConnHandler(tornado.websocket.WebSocketHandler):
def on_message(self,msg):
global backend_worker
- if hasattr(self,'backend_conn'):
- self.backend_conn.recv_msg(msg)
-
+ if hasattr(self,'conntype'):
+ self.stream.recv_msg(msg)
+
else:
- try:
- info = json.loads(msg)
- print(info)
- self.backend_conn = backend_worker.add_client(info['client_link'],self)
+ info = json.loads(msg)
+ self.conntype = info['conntype']
+ self.stream = WebSocketStream(self)
+
+ if self.conntype == 'main':
+ self._handle_mainconn(self.stream,info)
- except Exception:
- self.close()
+ elif self.conntype == 'file':
+ self._handle_fileconn(self.stream,info)
+
+ else:
+ self.stream.close()
def on_close(self):
- global backend_backend
+ if hasattr(self,'conntype'):
+ self.stream.close()
+
+ def _handle_mainconn(self,main_stream,info):
+ global backend_worker
+
+ try:
+ backend_worker.add_client(info['client_link'],main_stream)
- if hasattr(self,'backend_conn'):
- self.backend_conn.close()
+ except Exception:
+ main_stream.close()
+ def _handle_fileconn(self,file_stream,info):
+ global backend_worker
+
+ try:
+ backend_worker.add_filestream('websocket',info['filekey'],
+ file_stream)
+ print('test')
+
+ except Exception as err:
+ file_stream.close()
+
def start_backend_worker(ws_port):
global backend_worker
diff --git a/src/py/center_server.py b/src/py/center_server.py
index c6e81b0..fed7bb5 100755
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -4,6 +4,7 @@ import random
import json
import uuid
import socket
+import time
import tornado.ioloop
import tornado.tcpserver
@@ -142,13 +143,13 @@ class CenterServer(tornado.tcpserver.TCPServer):
@imc.async.caller
def _init_blobserver(self):
- BlobServer(Proxy.instance,
- TOJAuth.instance,
- self._idendesc,
- self._link,
- 'blobtmp/1',
- TOJBlobTable(1),
- TOJBlobHandle)
+ blobserver = BlobServer(Proxy.instance,
+ TOJAuth.instance,
+ self._idendesc,
+ self._link,
+ 'blobtmp/1',
+ TOJBlobTable(1),
+ TOJBlobHandle)
def _create_link(self,linkclass):
linkid = uuid.uuid1()
diff --git a/src/py/com.py b/src/py/com.py
new file mode 100644
index 0000000..91b21a6
--- /dev/null
+++ b/src/py/com.py
@@ -0,0 +1,12 @@
+import datetime
+
+def isoptime(time):
+ try:
+ if time[-1] == 'Z':
+ return datetime.datetime.strptime(time,'%Y-%m-%dT%H:%M:%S.%fZ')
+
+ else:
+ return datetime.datetime.strptime(time,'%Y-%m-%dT%H:%M:%S.%f')
+
+ except ValueError:
+ return None
diff --git a/src/py/dispatch.py b/src/py/dispatch.py
new file mode 100644
index 0000000..9484a5c
--- /dev/null
+++ b/src/py/dispatch.py
@@ -0,0 +1,110 @@
+import json
+
+import config
+import imc.async
+from tojauth import TOJAuth
+from asyncdb import AsyncDB
+
+class Data:
+ def __init__(self,dataid,datatype,source,target,status,data,gid,gcount):
+ self.dataid = dataid
+ self.datatype = datatype
+ self.source = source
+ self.target = target
+ self.data = data
+ self.gid = gid
+ self,gcount = gcount
+
+class DispatchMg:
+ _accessid = -1
+
+ def __init__(self,mod_idendesc,get_link_fn):
+ self.DATATYPE_CHALLENGE = 1
+ self.DATATYPE_STATUS = 2
+
+ self.DATASTSTUS_PEND = 1
+ self.DATASTSTUS_WAIT = 2
+ self.DATASTSTUS_DONE = 3
+
+ self._idendesc = mod_idendesc
+ self.get_link = get_link_fn
+ self.db = AsyncDB(config.CORE_DBNAME,config.CORE_DBUSER,
+ config.CORE_DBPASSWORD)
+
+ self.collector_namemap = {}
+
+ @imc.async.caller
+ @TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE)
+ def _add_challenge(self,source,target,data,gid = None,gcount = 1):
+ cur = self.db.cursor()
+
+ cur.execute(('INSERT INTO "DATA_POOL" '
+ '("type","source","target","gid","gcount","status","data") '
+ 'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'),
+ (self.DATATYPE_CHALLENGE,source,target,gid,gcount,
+ self.DATASTSTUS_PEND,json.dumps(data,'utf-8')))
+
+ if cur.rowcount == 0:
+ return 'Efailed'
+
+ dataid = cur.fetchone()[0]
+
+ data = Data(dataid,self.DATATYPE_CHALLENGE,source,target,
+ self.DATASTATUS_PEND,data,gid,gcount)
+
+ return {'dataid':dataid}
+
+ @imc.async.caller
+ def _register_challenge_collector(self,name):
+ link = TOJAuth.get_current_iden()
+ linkclass = link.split('/')[1]
+ if linkclass != 'backend':
+ return 'Efailed'
+
+ return self._register_collector(link,''.join(['challenge/',name]))
+
+ @imc.async.caller
+ def _register_status_collector(self,name):
+ link = TOJAuth.get_current_iden()
+ linkclass = link.split('/')[1]
+ if linkclass != 'backend':
+ return 'Efailed'
+
+ return self._register_collector(link,''.join(['status/',name]))
+
+ @imc.async.caller
+ def _unregister_challenge_collector(self,name):
+ link = TOJAuth.get_current_iden();
+ linkclass = link.split('/')[1]
+ if linkclass != 'backend':
+ return 'Efailed'
+
+ return self._unregister_collector(link,''.join(['challenge/',name]))
+
+ @imc.async.caller
+ def _unregister_status_collector(self,name):
+ link = TOJAuth.get_current_iden();
+ linkclass = link.split('/')[1]
+ if linkclass != 'backend':
+ return 'Efailed'
+
+ return self._unregister_collector(link,''.join(['challenge/',name]))
+
+ def _register_collector(self,link,name):
+ if name not in self.collector_namemap:
+ self.collector_namemap[name] = {}
+
+ self.collector_namemap[name][link] = {}
+
+ return 'Success'
+
+ def _unregister_collector(self,link,name):
+ if name not in self.collector_namemap:
+ return 'Success'
+
+ self.collector_namemap[name].pop(link)
+
+ return 'Success'
+
+ def _dispatch_data(self,datatype,target):
+ pass
diff --git a/src/py/imc/async.py b/src/py/imc/async.py
index a9dd048..1642ad3 100644
--- a/src/py/imc/async.py
+++ b/src/py/imc/async.py
@@ -101,7 +101,7 @@ def ret(retid,value = None,err = None):
global gr_idmap
global ret_idmap
- assert greenlet.getcurrent() == gr_main
+ #assert greenlet.getcurrent() == gr_main
try:
gr = ret_idmap.pop(retid)
@@ -122,7 +122,7 @@ def ret(retid,value = None,err = None):
except Exception as err:
traceback.print_stack()
- print(err)
+ print(type(err))
finally:
tornado.stack_context._state.contexts = old_contexts
diff --git a/src/py/imc/blobclient.py b/src/py/imc/blobclient.py
index 8a7baea..1def7b6 100755
--- a/src/py/imc/blobclient.py
+++ b/src/py/imc/blobclient.py
@@ -1,13 +1,13 @@
#! /usr/bin/env python
import os
+import uuid
+from collections import Counter
from imc.auth import Auth
import imc.async
from imc.proxy import Proxy
-from collections import Counter
-
class BlobClient:
def __init__(self, proxy, auth, idendesc, link, serverlink,
location, cachetable, BlobHandle):
@@ -26,42 +26,53 @@ class BlobClient:
self.connect_server()
self._proxy.register_call('blobclient/', 'get_update',
self.get_update)
+ self._proxy.register_call('blobclient/', 'get_request',
+ self.get_request)
+
+ self.clean()
+##################debug code##########################
+ print('blobclient: init')
+ # self.show_status()
+
+ def show_status(self):
+ print('blobclient: containers=>\n', self._containers)
+ print('blobclient: blobs=>\n', self._cachetable.get_blob_list())
+ print('blobclient: opencounts=>\n', self._opencounts)
+ print('blobclient: deltags=>\n', self._deltags)
+######################################################
def __del__(self):
self._proxy.unregister_call('blobclient/', 'get_update')
+ self._proxy.unregister_call('blobclient/', 'get_request')
def _server_call(self, func, *args):
+ if not self._is_connected:
+ if not self.connect_server():
+ pass
+
server = self._server + 'blobserver/'
with Auth.change_current_iden(self._idendesc):
- for i in range(5):
- sta, ret = self._proxy.call(server, func, 10000,
- self._link, *args)
- if sta or (not sta and ret == 'Enoexist'):
- break
+ sta, ret = self._proxy.call(server, func, 10000,
+ self._link, *args)
+ if not sta:
+ self._is_connected = False
return (sta, ret)
def _client_call(self, otherclient, func, *args):
otherclient += 'blobclient/'
with TOJAuth.change_current_iden(self._idendesc):
- for i in range(5):
- sta, ret = self._proxy.call(otherclient, func, 10000,
- self._link, *args)
- if sta or (not sta and ret == 'Enoexist'):
- break
+ sta, ret = self._proxy.call(otherclient, func, 10000,
+ self._link, *args)
return (sta, ret)
def connect_server(self, serverlink=None):
if serverlink:
self._server = serverlink
- sta, ret = self._server_call('connect_client',
- self._cachetable.get_blob_list())
- if sta:
- if ret:
- self._is_connected = True
- else:
- pass
- else:
- pass
+ server = self._server + 'blobserver/'
+ with Auth.change_current_iden(self._idendesc):
+ sta, ret = self._proxy.call(server, 'connect_client', 10000,
+ self._link)
+ self._is_connected = sta
if self._is_connected:
# TODO:
pass
@@ -72,7 +83,7 @@ class BlobClient:
if not sta:
# TODO:
# pend operation when client can't imc call server
- return None
+ return 'Edisconnected'
if ret:
self._containers[container] = method
return True
@@ -94,114 +105,94 @@ class BlobClient:
# TODO:
# periodically call this function to clean old data and do something else
# ex: send pending operation
- def sync(self):
- for blobname_rev in self._deltags:
- self.del_real_blob(blobname_rev)
+ def clean(self):
+ del_list = list(self._deltags)
+ for rev in del_list:
+ self.del_real_blob(rev)
+ del_list = []
+ for blob, rev in (self._cachetable.get_blob_list().items()):
+ if not self.blob_exists(rev):
+ del_list.append(blob)
+ for container, name in del_list:
+ self.del_blob(container, name)
# for container in self._containers:
# if self._containers[container] == 'ALWAYS':
# for blob in self._cachetable.get_blob_list(container):
# self.update(blob)
@imc.async.caller
- def get_update(self, blobname, info, filekey=None):
- if info is None:
- self.del_blob(blobname)
- elif filekey is not None:
- rev = info['rev']
- if self.recv_blob(filekey, blobname, rev).wait() == 'Success':
- self.update_blob(blobname, info)
- sta, ret = self._server_call('recv_update_result',
- blobname, "Success", rev)
- if not sta:
- # TODO:
- pass
- else:
- self.update_blob(blobname, info)
- return self._link
-
- def update(self, blobname):
- cacherev = self._cachetable.get_blob_info(blobname, 'rev')
- if cacherev == None:
- cacherev = 0
+ def get_update(self, container, name):
+ self.update(container, name)
+ # pass
- sta, ret = self._server_call('check_blob', blobname, cacherev)
+ def update(self, container, name, force=False):
+ info = self._cachetable.get_blob_info(container, name)
+ if info and not force and self.blob_exists(info['rev']):
+ cacherev = info['rev']
+ cachesha1 = info['sha1']
+ else:
+ cacherev = None
+ cachesha1 = None
+ sta, ret = self._server_call('check_blob', container, name,
+ cacherev, cachesha1)
if not sta:
# TODO:
# pend operation when client can't imc call server
- pass
- elif ret == 'up_to_date':
- pass
- elif ret == 'no_exist':
- self._cachetable.del_blob(blobname)
+ return 'Efailed'
+ elif ret[0] == 'up_to_date':
+ return info
+ elif ret[0] == 'no_exist':
+ self._cachetable.del_blob(container, name)
if cacherev:
- self.del_real_blob(''.join([blobname, '_', str(cacherev)]))
+ self.del_real_blob(cacherev)
+ return None
else:
- info = ret['info']
- rev = info['rev']
- for i in range(4):
- rst = self.recv_blob(ret['filekey'], blobname, rev).wait()
- sta, ret = self._server_call('recv_update_result', blobname,
- rst, rev, True)
-
- if 'Success' == ret:
- self.update_blob(blobname, info)
- break
+ filekey, info = ret
+ if filekey:
+ rst = self.recv_blob(filekey, info['rev']).wait()
+ else:
+ if self.copy_blob(cacherev, info['rev']):
+ rst = 'Success'
+ else:
+ rst = None
+ if 'Success' == rst:
+ self.update_blob(info)
+ return info
+ else:
+ return 'Efailed'
def commit(self, commit_info, force_flag, blobhandle):
- filekey = None
- if not commit_info['deltag'] and commit_info['written']:
- result = self.send_blob(blobhandle._tmpfile)
- filekey = result.filekey
- sta, ret = self._server_call('recv_commit', commit_info,
- force_flag, filekey)
+ sta, ret = self._server_call('recv_commit', commit_info, force_flag)
if not sta:
# TODO:
# pend operation when client can't imc call server
+ # local commit
return False
else:
- # TODO:
- # if commit success , copy tmpfile to location
if ret:
- blobhandle.copy_tmp()
-
- # TODO:
- # opencounts ?
- def send_blob(self, blobpath, otherclient=None):
- if otherclient is None:
- return self._proxy.sendfile(self._server, blobpath)
- else:
- return self._proxy.sendfile(otherclient, blobpath)
-
- def recv_blob(self, filekey, blobname, rev):
- blobpath = os.path.join(self._location, blobname + '_' + str(rev))
- return self._proxy.recvfile(filekey, blobpath)
-
- def update_blob(self, blobname, info):
- rev = self._cachetable.get_blob_info(blobname, 'rev')
- blobname_rev = ''.join([blobname, '_', str(rev)])
- self.del_real_blob(blobname_rev)
- self._cachetable.update_blob(blobname, info)
-
- def del_blob(self, blobname):
- rev = self._cachetable.get_blob_info(blobname, 'rev')
- blobname_rev = ''.join([blobname, '_', str(rev)])
- self._cachetable.del_blob(blobname)
- self.del_real_blob(blobname_rev)
-
- def del_real_blob(self, blobname_rev):
- if self._opencounts[blobname_rev] == 0:
- path = os.path.join(self._location, blobname_rev)
- self.BlobHandle.del_blob(path)
- else:
- self._deltags.add(blobname_rev)
+ if ret['rev']:
+ if blobhandle.copy_tmp(ret['rev']):
+ self.update_blob(ret)
+ return True
+ else:
+ pass
+ else:
+ self.del_blob(ret['container'], ret['name'])
+ return True
+ else:
+ return False
- def open(self, container, blobname, flag):
+ def open(self, container, name, flag):
if container not in self._containers:
raise Exception("this container isn't open")
- blob = ''.join([container, '_', blobname])
- self.update(blob)
- info = self._cachetable.get_blob_info(blob)
+ if (flag & self.BlobHandle.CREATE and
+ not flag & self.BlobHandle.WRITE):
+ raise ValueError("invalid flag")
+ info = self.update(container, name)
+ info = 'Efailed'
+ if info == 'Efailed':
+ info = self._cachetable.get_blob_info(container, name)
if info is None:
if (not flag & self.BlobHandle.WRITE or
not flag & self.BlobHandle.CREATE):
@@ -209,21 +200,83 @@ class BlobClient:
"add a create flag")
else:
info = {'container': container,
- 'rev': 0,
+ 'name': name,
+ 'rev': None,
+ 'sha1': None,
'metadata': '',
'size': None,
'commit_time': None}
+ else:
+ self._opencounts[info['rev']] += 1
+ handle = self.BlobHandle(info, flag, self)
+ return handle
+
+ def close(self, rev):
+ if self._opencounts[rev] > 0:
+ self._opencounts[rev] -= 1
+
+ # @imc.async.caller
+ # def send_to_other(self, info, otherclient):
+ # pass
+
+ @imc.async.caller
+ def get_request(self, target):
+ result = self.send_blob(target)
+ if result:
+ return result.filekey
+ else:
+ return None
+
+ # def request_blob(self, container, name):
+ # sta, ret = self._server_call('get_request', container, name)
+ # if not sta:
+ # return False
+ # else:
+ # return ret
+
+ def send_blob(self, blobpath, otherclient=None):
try:
- handle = self.BlobHandle(blob, info, flag, self)
- except ValueError:
- raise
+ if otherclient:
+ ret = self._proxy.sendfile(otherclient, blobpath)
+ else:
+ ret = self._proxy.sendfile(self._server, blobpath)
+ except ConnectionError:
+ return 'Efailtosend'
else:
- blob = ''.join(blob, '_', str(handle.get_rev()))
- self._opencounts[blob] += 1
- return handle
+ return ret
+
+ def recv_blob(self, filekey, filename):
+ blobpath = os.path.join(self._location, filename)
+ return self._proxy.recvfile(filekey, blobpath)
+
+ def update_blob(self, info):
+ rev = self._cachetable.get_blob_info(info['container'],
+ info['name'], 'rev')
+ if rev:
+ self.del_real_blob(rev)
+ self._cachetable.update_blob(info)
+
+ def del_blob(self, container, name):
+ rev = self._cachetable.get_blob_info(container, name, 'rev')
+ self._cachetable.del_blob(container, name)
+ if rev:
+ self.del_real_blob(rev)
+
+ def del_real_blob(self, rev):
+ self._deltags.add(rev)
+ if self._opencounts[rev] == 0:
+ self._deltags.remove(rev)
+ path = os.path.join(self._location, rev)
+ self.BlobHandle.del_blob(path)
- def close(self, blobhandle):
- blob = ''.join([blobhandle._name, '_',
- str(blobhandle.get_rev())])
- self._opencounts[blob] -= 1
+ def copy_blob(self, rev, newrev):
+ path = os.path.join(self._location, rev)
+ newpath = os.path.join(self._location, newrev)
+ return self.BlobHandle.copy_file(path, newpath)
+ def vertify_blob(self, rev, sha1):
+ blobpath = os.path.join(self._location, rev)
+ return self.BlobHandle._sha1(blobpath) == sha1
+
+ def blob_exists(self, rev):
+ return self.BlobHandle.file_exists(os.path.join(self._location, rev))
diff --git a/src/py/imc/blobhandle.py b/src/py/imc/blobhandle.py
index 40ff523..683026e 100755
--- a/src/py/imc/blobhandle.py
+++ b/src/py/imc/blobhandle.py
@@ -1,6 +1,7 @@
#! /usr/bin/env python
from abc import abstractmethod
+import os
class BlobHandle:
READ = 0x1
@@ -8,36 +9,28 @@ class BlobHandle:
CREATE = 0x4
DELETE = 0x8
WRITEMETA = 0x10
- def __init__(self, name, info, flag, blobclient):
- self._name = name
+ def __init__(self, info, flag, blobclient):
self._info = info
self._flag = flag
self._blobclient = blobclient
self._location = self._blobclient._location
self._is_closed = False
self._deltag = False
- self._written = False
self._createtag = False
- self._need_commit = False
self._tmpfile = None
- self._blobpath = ''.join([self._location, self._name,
- '_', str(self.get_rev())])
+ if info['rev']:
+ self._blobpath = os.path.join(self._location, self._info['rev'])
+ else:
+ self._blobpath = None
if flag & BlobHandle.CREATE:
- if not flag & BlobHandle.WRITE:
- raise ValueError("invalid flag")
- else:
- self._need_commit = True
- self._createtag = True
- self._written = True
+ self._createtag = True
if flag & BlobHandle.WRITE:
self._tmpfile = self.gen_tmp()
def __del__(self):
- self.del_tmp()
- self._blobclient.close(self)
-
- def create(self):
- self._create(self.location + self._name)
+ if self._flag & BlobHandle.WRITE:
+ self.del_tmp()
+ self.close()
def read(self, length, offset):
if self._is_closed:
@@ -51,9 +44,7 @@ class BlobHandle:
raise Exception("This Blob is closed")
if not self._flag & BlobHandle.WRITE:
raise Exception("Permission Denied")
- self._need_commit = True
written_bytes = self._write(data, offset)
- self._written = bool(written_bytes)
self._info['size'] = self._get_size()
return written_bytes
@@ -62,20 +53,18 @@ class BlobHandle:
raise Exception("This Blob is closed")
if not self._flag & BlobHandle.DELETE:
raise Exception("Permission Denied")
- self._need_commit = True
+ self._info['name'] = newname
def delete(self, deltag=True):
if self._is_closed:
raise Exception("This Blob is closed")
if not self._flag & BlobHandle.DELETE:
raise Exception("Permission Denied")
- self._need_commit = True
self._deltag = deltag
def close(self):
self._is_closed = True
- if self._flag != BlobHandle.READ:
- self._blobclient.close(self)
+ self._blobclient.close(self._info['rev'])
def get_metadata(self):
if self._is_closed:
@@ -86,9 +75,8 @@ class BlobHandle:
if self._is_closed:
raise Exception("This Blob is closed")
if not self._flag & BlobHandle.WRITEMETA:
- raise Exception("Permission Deniedd")
+ raise Exception("Permission Denied")
self._info['metadata'] = metadata
- self._need_commit = True
def get_rev(self):
if self._is_closed:
@@ -105,29 +93,29 @@ class BlobHandle:
raise Exception("This Blob is closed")
return self._info['size']
+ def get_sha1(self):
+ if self._is_closed:
+ raise Exception("This Blob is closed")
+ return self._info['sha1']
+
def commit(self, flag):
if self._is_closed:
raise Exception("This Blob is closed")
- if not self._need_commit:
- return False
+ self._info['sha1'] = self._sha1(self._tmpfile)
commit_info = dict()
- commit_info['blobname'] = self._name
+ commit_info['info'] = self._info
+ commit_info['target'] = self._tmpfile
if self._deltag:
commit_info['deltag'] = True
+ commit_info['createtag'] = False
else:
commit_info['deltag'] = False
commit_info['createtag'] = self._createtag
- commit_info['info'] = self._info
- commit_info['written'] = self._written
return self._blobclient.commit(commit_info, flag, self)
- def copy_tmp(self):
- BlobHandle.copy_file(
- self._tmpfile,
- ''.join([self._location, self._info['container'], '_',
- self._name, '_', str(self._info['rev'])])
- )
- pass
+ def copy_tmp(self, rev):
+ path = os.path.join(self._location, rev)
+ return self.copy_file(self._tmpfile, path)
@abstractmethod
def gen_tmp(self):
@@ -152,7 +140,15 @@ class BlobHandle:
@staticmethod
@abstractmethod
+ def _sha1(file):
+ # calculate the sha1 of file
+ # return string
+ pass
+
+ @staticmethod
+ @abstractmethod
def copy_file(source, dest):
+ # return Boolean
pass
@staticmethod
@@ -160,4 +156,8 @@ class BlobHandle:
def del_blob(blobpath):
pass
-
+ @staticmethod
+ @abstractmethod
+ def file_exists(path):
+ # return Boolean
+ pass
diff --git a/src/py/imc/blobserver.py b/src/py/imc/blobserver.py
index b876833..1ec7de8 100755
--- a/src/py/imc/blobserver.py
+++ b/src/py/imc/blobserver.py
@@ -1,17 +1,16 @@
#! /usr/bin/env python
import os
+import uuid
+from collections import Counter
from imc.auth import Auth
import imc.async
from imc.proxy import Proxy
-from collections import Counter
-
class BlobServer:
def __init__(self, proxy, auth, idendesc, link,
location, blobtable, BlobHandle):
-
self._proxy = proxy
self._auth = auth
self._idendesc = idendesc
@@ -19,7 +18,9 @@ class BlobServer:
self._location = location
self._blobtable = blobtable
self.BlobHandle = BlobHandle
- self._clients = {}
+ self._clients = set()
+ self._opencounts = Counter()
+ self._deltags = set()
self._containers = dict.fromkeys(self._blobtable.get_container_list(),
dict())
self._proxy.register_call('blobserver/', 'connect_client',
@@ -30,55 +31,55 @@ class BlobServer:
self.close_container)
self._proxy.register_call('blobserver/', 'check_blob',
self.check_blob)
- self._proxy.register_call('blobserver/', 'recv_update_result',
- self.recv_update_result)
self._proxy.register_call('blobserver/', 'recv_commit',
self.recv_commit)
+ self.clean()
+##################debug code##########################
+ print('blobserver: init')
+ self.show_status()
+
+ def show_status(self):
+ print('blobserver: containers =>\n', self._containers)
+ print('blobserver: blobs =>\n', self._blobtable.get_blob_list())
+######################################################
+
def __del__(self):
self._proxy.unregister_call('blobserver/', 'connect_client')
self._proxy.unregister_call('blobserver/', 'open_container')
self._proxy.unregister_call('blobserver/', 'close_container')
self._proxy.unregister_call('blobserver/', 'check_blob')
- self._proxy.unregister_call('blobserver/', 'recv_update_result')
self._proxy.unregister_call('blobserver/', 'recv_commit')
- def _client_call(self, client, func, timeout=10000, *args):
+ def _client_call(self, client, func, *args, timeout=10000):
client += 'blobclient/'
with Auth.change_current_iden(self._idendesc):
- for i in range(5):
- sta, ret = self._proxy.call(client, func, timeout, *args)
- if sta or (not sta and ret == 'Enoexist'):
- break
+ sta, ret = self._proxy.call(client, func, timeout, *args)
+ if not sta:
+ self.disconnect_client(client)
return (sta, ret)
def _client_call_async(self, client, func, callback,
- timeout=10000, *args, **kwargs):
+ *args, timeout=10000):
client += 'blobclient/'
with Auth.change_current_iden(self._idendesc):
- for i in range(5):
- sta, ret = self._proxy.call_async(client, func, timeout,
- callback, *args)
- if sta or (not sta and ret == 'Enoexist'):
- break
- return (sta, ret)
+ self._proxy.call_async(client, func, timeout, callback, *args)
@imc.async.caller
- def connect_client(self, client, cache_list):
+ def connect_client(self, client):
if client not in self._clients:
- self._clients.update({client: cache_list})
- else:
- self._clients[client] = cache_list
+ self._clients.add(client)
def disconnect_client(self, client):
try:
- self._clients.pop[client]
- except ValueError:
- raise Exception("this client doesn't exist")
+ self._clients.remove(client)
+ except KeyError:
+ print("client", client, "doesn't exist")
def create_container(self, container):
- self._blobtable.create_container(container)
- self._containers[container] = dict()
+ if container not in self._containers:
+ self._blobtable.create_container(container)
+ self._containers[container] = dict()
def del_container(self, container):
try:
@@ -101,163 +102,175 @@ class BlobServer:
try:
self._containers[container].pop(client)
except KeyError:
- raise
-
- def update_blob(self, blobname, info):
- self._blobtable.update_blob(blobname, info)
-
- def del_blob(self, blobname):
- rev = self._blobtable.get_blob_info(blobname, 'rev')
- blobname_rev = ''.join([blobname, '_', str(rev)])
- self._blobtable.del_blob(blobname)
- self.del_real_blob(blobname_rev)
-
- def del_real_blob(self, blobname_rev):
- blobpath = self._location + blobname_rev
- self.BlobHandle.del_blob(blobpath)
-
- def send_blob(self, client, blobname):
- rev = str(self._blobtable.get_blob_info(blobname, 'rev'))
- blobpath = os.path.join(self._location, blobname + '_' + rev)
- return self._proxy.sendfile(client, blobpath)
+ return False
+ else:
+ return True
- def recv_blob(self, filekey, blobname, rev):
- blobpath = os.path.join(self._location, blobname + '_' + str(rev))
- ret = self._proxy.recvfile(filekey, blobpath)
+ def clean(self):
+ del_list = list(self._deltags)
+ for rev in del_list:
+ self.del_real_blob(rev)
+ del_list = []
+ for blob, rev in (self._blobtable.get_blob_list().items()):
+ if not self.blob_exists(rev):
+ del_list.append(blob)
+ for container, name in del_list:
+ self.del_blob(container, name)
- return ret
@imc.async.caller
- def check_blob(self, client, blobname, cacherev):
- rev = self._blobtable.get_blob_info(blobname, 'rev')
- if rev is None:
- return 'no_exist'
- elif cacherev < rev:
- result = self.send_blob(client, blobname)
- response = {'filekey': result.filekey,
- 'info': self._blobtable.get_blob_info(blobname)}
- return response
+ def check_blob(self, client, container, name, cacherev, cachesha1):
+ info = self._blobtable.get_blob_info(container, name)
+ if info is None:
+ return ('no_exist', None)
+ elif cacherev != info['rev']:
+ if cachesha1 != info['sha1']:
+ result = self.send_blob(container, name, client)
+ return (result.filekey, info)
+ else:
+ return (None, info)
else:
- return 'up_to_date'
+ return ('up_to_date', None)
- @imc.async.caller
- def recv_update_result(self, client, blobname, result,
- cacherev, retry=False):
- if client not in self._clients:
- return None
- else:
- if result == 'Success':
- self._clients[client].append({blobname: cacherev})
- return 'Success'
- elif retry:
- result = self.send_blob(client, blobname)
- response = {'filekey': result.filekey,
- 'info': self._blobtable.get_blob_info(blobname)}
- return response
- else:
- return 'Finish'
+ def get_update_list(self, container):
+ clients = set()
+ for client, method in (self._containers[container].items()):
+ if method == "ACTIVE":
+ clients.add(client)
+ return clients
- def send_update(self, clients, blobname, info, written):
- result_table = dict.fromkeys(clients)
+ def send_update(self, client, container, name):
def recv_result(result):
- nonlocal result_table
- nonlocal blobname
- nonlocal info
- sta, client = result
+ nonlocal client
+ nonlocal container
+ nonlocal name
+ sta, ret = result
# TODO:
# limit retry
if not sta:
- self._client_call_async(client, 'get_update',
- recv_result,
- blobname, info,
- result_table[client].filekey)
- else:
- if result_table[client] is None:
- result_table.pop(client)
- elif result_table[client].wait() != 'Success':
- result_table[client] = self.send_blob(client, blobname)
- self._client_call_async(client, 'get_update',
- recv_result,
- blobname, info,
- result_table[client].filekey)
- else:
- result_table.pop(client)
+ # self._client_call_async(client, 'get_update',
+ # recv_result, container, name)
+ self.disconnect_client(client)
+ self._containers[container].pop(client)
- for client in clients:
- if written:
- result_table[client] = self.send_blob(client, blobname)
- else:
- result_table[client] = None
- sta, ret = self._client_call(client, 'get_update',
- recv_result,
- blobname, info,
- result_table[client].filekey)
- if not sta:
- # TODO:
- pass
+ self._client_call_async(client, 'get_update',
+ recv_result, container, name)
@imc.async.caller
- def recv_commit(self, client, commit_info, force_flag, filekey=None):
- blobname = commit_info['blobname']
- info = commit_info['info']
- rev = self._blobtable.get_blob_info(blobname, 'rev')
- if rev is None:
- if commit_info['createtag']:
- rev = 0
- else:
+ def recv_commit(self, client, commit_info, force_flag):
+ clientinfo = commit_info['info']
+ container = clientinfo['container']
+ name = clientinfo['name']
+ info = self._blobtable.get_blob_info(container, name)
+ if info is None:
+ if not commit_info['createtag']:
+##################debug code##########################
+ print("blob doesn't exist, please add createtag")
+##################debug code##########################
return False
- elif info['rev'] < rev and not force_flag:
+ elif info['rev'] != clientinfo['rev'] and not force_flag:
+##################debug code##########################
+ print("revision conflict")
+##################debug code##########################
return False
if commit_info['deltag']:
- self.del_blob(blobname)
- clients = set()
- for needed_client, method in (
- self._containers[info['container']].items()
- ):
- if method == "ACTIVE":
- clients.add(needed_client)
- clients.discard(client)
- self.send_update(clients, blobname, None, False)
- result = True
+ clientinfo['rev'] = None
+ self.del_blob(container, name)
+ return clientinfo
else:
- info['rev'] = rev + 1
- if commit_info['written']:
- status = self.recv_blob(filekey, blobname, rev + 1)
+ clientinfo['rev'] = str(uuid.uuid4())
+ if not info or info['sha1'] != clientinfo['sha1']:
+ filekey = self.request_blob(client, commit_info['target'])
+ if (filekey == 'Efailtosend' or
+ filekey == 'Edisconnected'):
+ return False
+ else:
+ filekey = None
+ if filekey:
+ status = self.recv_blob(filekey, clientinfo['rev'])
result = status.wait()
- if rev:
- self.del_real_blob(''.join([blobname, '_', str(rev)]))
else:
- result = True
- if result:
- self.update_blob(blobname, info)
- clients = set()
- for needed_client, method in (
- self._containers[info['container']].items()):
- if method == "ACTIVE":
- clients.add(needed_client)
- clients.discard(client)
- self.send_update(clients, blobname,
- info, commit_info['written'])
+ if info:
+ result = self.copy_blob(info['rev'], clientinfo['rev'])
+ else:
+ result = False
+ if result:
+ if clientinfo['rev']:
+ self.update_blob(clientinfo)
+ if info:
+ self.del_real_blob(info['rev'])
+ clients = self.get_update_list(container)
+ clients.discard(client)
+ for cli in clients:
+ self.send_update(cli, container, name)
+ return clientinfo
+ else:
+ return False
- return True
+ # @imc.async.caller
+ # def get_request(self, client, container, name):
+ # result = self.send_blob(client, container, name)
+ # if result:
+ # return result.filekey
+ # else:
+ # return None
+
+ def request_blob(self, client, target):
+ sta, ret = self._client_call(client, 'get_request', target)
+ if not sta:
+ return 'Edisconnected'
else:
- return False
-
-
-
-################### Testing Code #######################
-'''
-if __name__ == '__main__':
- global blob_serv
-
- blob_serv = BlobServer()
- blob_serv.listen(5730)
+ return ret
- #http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([
- # ('/conn',WebConnHandler),
- #]))
- #http_serv.listen(83)
-
- tornado.ioloop.IOLoop.instance().start()
-'''
+ def send_blob(self, container, name, client):
+ rev = self._blobtable.get_blob_info(container, name, 'rev')
+ if not rev:
+ return False
+ else:
+ def send_finish(result):
+ nonlocal rev
+ if self._opencounts[rev] > 0:
+ self._opencounts[rev] -= 1
+
+ blobpath = os.path.join(self._location, rev)
+ try:
+ ret = self._proxy.sendfile(client, blobpath)
+ except ConnectionError:
+ return False
+ else:
+ self._opencounts[rev] += 1
+ ret.wait_async(send_finish)
+ return ret
+
+ def recv_blob(self, filekey, filename):
+ blobpath = os.path.join(self._location, filename)
+ return self._proxy.recvfile(filekey, blobpath)
+
+ def update_blob(self, info):
+ self._blobtable.update_blob(info)
+
+ def del_blob(self, container, name):
+ rev = self._blobtable.get_blob_info(container, name, 'rev')
+ self._blobtable.del_blob(container, name)
+ if rev:
+ self.del_real_blob(rev)
+
+ def del_real_blob(self, rev):
+ self._deltags.add(rev)
+ if self._opencounts[rev] == 0:
+ self._deltags.remove(rev)
+ path = os.path.join(self._location, rev)
+ self.BlobHandle.del_blob(path)
+
+ def copy_blob(self, rev, newrev):
+ path = os.path.join(self._location, rev)
+ newpath = os.path.join(self._location, newrev)
+ return self.BlobHandle.copy_file(path, newpath)
+
+ def vertify_blob(self, rev, sha1):
+ blobpath = os.path.join(self._location, rev)
+ return self.BlobHandle._sha1(blobpath) == sha1
+
+ def blob_exists(self, rev):
+ return self.BlobHandle.file_exists(os.path.join(self._location, rev))
diff --git a/src/py/imc/blobtable.py b/src/py/imc/blobtable.py
index 34e2d01..e6cd7cd 100755
--- a/src/py/imc/blobtable.py
+++ b/src/py/imc/blobtable.py
@@ -18,7 +18,7 @@ class BlobTable:
# client
# server
@abstractmethod
- def get_blob_info(self, blobname, attr=None):
+ def get_blob_info(self, container, name, attr=None):
# if the blobname doesn't exist, return None
if attr is None:
# return blob info
@@ -46,19 +46,23 @@ class BlobTable:
# client
# server
@abstractmethod
- def update_blob(self, blobname, info):
+ def update_blob(self, info):
pass
# client
# server
@abstractmethod
- def del_blob(self, blobname):
+ def del_blob(self, container, name):
+ # return the info
+ # if the blob doesn't exist, return None
pass
"""
info:
- rev (int)
container (str)
+ name (str)
+ rev (str)
+ sha1 (str)
metadata (str)
size (???)
commit_time (???)
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 5846bd2..960c82a 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -37,7 +37,7 @@ class Connection:
def start_recv(self,recv_callback):
pass
- def abort_file(self,filekey):
+ def abort_file(self,filekey,err):
pass
def add_close_callback(self,callback):
@@ -54,10 +54,12 @@ class Connection:
class FileResult():
def __init__(self,filekey):
- self.filekey = filekey
+ self._ioloop = tornado.ioloop.IOLoop.instance()
self._retid = None
self._result = None
+ self.filekey = filekey
+
def ret_result(self,res):
if self._result != None:
return
@@ -73,6 +75,16 @@ class FileResult():
return self._result
+ def wait_async(self,callback):
+ @async.caller
+ def _call():
+ result = self.wait()
+
+ if callback != None:
+ callback(result)
+
+ self._ioloop.add_callback(tornado.stack_context.wrap(_call))
+
class Proxy:
def __init__(self,link,auth,idendesc,conn_link_fn = lambda link : None):
self.MSGTYPE_CALL = 'call'
@@ -124,7 +136,8 @@ class Proxy:
def del_conn(self,conn):
waits = list(self._conn_retidmap[conn.link].values())
for wait in waits:
- wait['callback']((False,'Eclose'))
+ self._ret_call(wait['caller_link'],wait['caller_retid'],
+ (False,'Eclose'))
waits = list(self._conn_filekeymap[conn.link].values())
for wait in waits:
@@ -175,10 +188,11 @@ class Proxy:
self._ioloop.add_callback(tornado.stack_context.wrap(_call))
def sendfile(self,dst_link,filepath):
- def _abort_cb():
- if self._ret_sendfile(filekey,'Eabort'):
+ def _callback(err):
+ if self._ret_sendfile(filekey,err) and err != None:
with Auth.change_current_iden(self._idendesc,self._auth):
- self.call(dst_link + 'imc/','abort_sendfile',65536,filekey)
+ self.call(dst_link + 'imc/','abort_sendfile',65536,
+ filekey,err)
filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
filesize = os.stat(filepath).st_size
@@ -189,8 +203,9 @@ class Proxy:
'filesize':filesize,
'filepath':filepath,
'fileresult':fileresult,
- 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')),
- 'abort_callback':tornado.stack_context.wrap(_abort_cb)
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),
+ lambda : self._ret_sendfile(filekey,'Etimeout')),
+ 'callback':tornado.stack_context.wrap(_callback)
}
with Auth.change_current_iden(self._idendesc,self._auth):
@@ -211,7 +226,7 @@ class Proxy:
if err != None:
if not in_conn.closed():
- in_conn.abort_file(filekey)
+ in_conn.abort_file(filekey,err)
self._send_msg_abortfile(in_conn,filekey,err)
self._ioloop.add_callback(self._ret_sendfile,filekey,err)
@@ -224,7 +239,7 @@ class Proxy:
in_conn = self._request_conn(src_link)
if filekey in self._info_filekeymap:
- info['abort_callback'] = tornado.stack_context.wrap(lambda : _callback('Eabort'))
+ info['callback'] = tornado.stack_context.wrap(_callback)
self._add_wait_filekey(in_conn.link,filekey,filesize,_callback)
in_conn.recv_file(filekey,filesize,filepath,_callback)
@@ -234,7 +249,7 @@ class Proxy:
def abortfile(self,filekey):
try:
- self._info_filekeymap[filekey]['abort_callback']()
+ self._info_filekeymap[filekey]['callback']('Eabort')
except:
pass
@@ -274,10 +289,13 @@ class Proxy:
def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param):
def __add_wait_caller(conn_link):
- callback = tornado.stack_context.wrap(lambda result : self._ret_call(caller_link,caller_retid,result))
self._conn_retidmap[conn_link][caller_retid] = {
- 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback((False,'Etimeout'))),
- 'callback':callback
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(
+ milliseconds = timeout),
+ lambda : self._ret_call(caller_link,caller_retid,
+ (False,'Etimeout'))),
+ 'caller_link':caller_link,
+ 'caller_retid':caller_retid
}
def __del_wait_caller(conn_link):
@@ -333,8 +351,8 @@ class Proxy:
except KeyError:
return __ret((False,'Enoexist'))
- #except Exception:
- # return __ret((False,'Einternal'))
+ except Exception:
+ return __ret((False,'Einternal'))
if Auth.get_current_idendesc() == idendesc:
result = func(*param)
@@ -355,7 +373,9 @@ class Proxy:
else:
if caller_link == self._link:
__add_wait_caller(conn.link)
- self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param)
+
+ self._send_msg_call(conn,caller_link,caller_retid,idendesc,
+ dst,func_name,timeout,param)
result = async.switch_top()
@@ -364,7 +384,8 @@ class Proxy:
return __ret(result)
else:
- self._send_msg_call(conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param)
+ self._send_msg_call(conn,caller_link,caller_retid,idendesc,
+ dst,func_name,timeout,param)
return
@@ -391,7 +412,7 @@ class Proxy:
if err != None:
if not out_conn.closed():
- out_conn.abort_file(filekey)
+ out_conn.abort_file(filekey,err)
self._send_msg_abortfile(out_conn,filekey,err)
self._ioloop.add_callback(self._ret_sendfile,filekey,err)
@@ -402,7 +423,7 @@ class Proxy:
if err != None:
if not in_conn.closed():
- in_conn.abort_file(filekey)
+ in_conn.abort_file(filekey,err)
self._send_msg_abortfile(in_conn,filekey,err)
except KeyError:
@@ -413,7 +434,7 @@ class Proxy:
if err != None:
if not out_conn.closed():
- out_conn.abort_file(filekey)
+ out_conn.abort_file(filekey,err)
self._send_msg_abortfile(out_conn,filekey,err)
except KeyError:
@@ -425,13 +446,19 @@ class Proxy:
if info['filesize'] != filesize:
raise ValueError
- except (KeyError,ValueError):
+ except KeyError:
+ self._send_msg_abortfile(out_conn,filekey,'Enoexist')
+
+ return
+
+ except ValueError:
self._send_msg_abortfile(out_conn,filekey,'Enoexist')
self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
+
return
- info['abort_callback'] = tornado.stack_context.wrap(lambda : __send_cb('Eabort'))
- self._add_wait_filekey(out_conn.link,filekey,filesize,__send_cb)
+ info['callback'] = tornado.stack_context.wrap(__send_cb)
+ self._add_wait_filekey(out_conn.link,filekey,__send_cb)
out_conn.send_file(filekey,info['filepath'],__send_cb)
else:
@@ -440,8 +467,10 @@ class Proxy:
self._send_msg_abortfile(out_conn,filekey,'Enoexist')
else:
- self._add_wait_filekey(in_conn.link,filekey,filesize,__bridge_cb)
- self._add_wait_filekey(out_conn.link,filekey,filesize,__bridge_cb)
+ self._add_wait_filekey(in_conn.link,filekey,filesize,
+ __bridge_cb)
+ self._add_wait_filekey(out_conn.link,filekey,filesize,
+ __bridge_cb)
send_fn = out_conn.send_filedata(filekey,filesize,__bridge_cb)
in_conn.recv_filedata(filekey,filesize,send_fn)
@@ -450,8 +479,11 @@ class Proxy:
def _add_wait_filekey(self,conn_link,filekey,filesize,callback):
callback = tornado.stack_context.wrap(callback)
+
self._conn_filekeymap[conn_link][filekey] = {
- 'timer':self._ioloop.add_timeout(datetime.timedelta(milliseconds = max(filesize,10000)),lambda : callback('Etimeout')),
+ 'timer':self._ioloop.add_timeout(
+ datetime.timedelta(milliseconds = max(filesize,10000)),
+ lambda : callback('Etimeout')),
'callback':callback
}
@@ -607,20 +639,26 @@ class Proxy:
@async.caller
def _pend_recvfile(self,src_link,filekey,filesize):
- def __abort_cb():
- if self._ret_sendfile(filekey,'Eabort'):
+ def __abort_cb(err):
+ if self._ret_sendfile(filekey,err):
with Auth.change_current_iden(self._idendesc,self._auth):
- self.call(src_link + 'imc/','abort_sendfile',65536,filekey)
+ self.call(src_link + 'imc/','abort_sendfile',65536,
+ filekey,err)
self._info_filekeymap[filekey] = {
'src_link':src_link,
'filesize':filesize,
'fileresult':FileResult(filekey),
- 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile(filekey,'Etimeout')),
- 'abort_callback':tornado.stack_context.wrap(__abort_cb)
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),
+ lambda : self._ret_sendfile(filekey,'Etimeout')),
+ 'callback':tornado.stack_context.wrap(__abort_cb)
}
+
+ return 'Success'
@async.caller
- def _abort_sendfile(self,filekey):
+ def _abort_sendfile(self,filekey,err):
if filekey in self._info_filekeymap:
- self._ioloop.add_callback(self._ret_sendfile,filekey,'Eabort')
+ self._ioloop.add_callback(self._ret_sendfile,filekey,err)
+
+ return 'Success'
diff --git a/src/py/mod.py b/src/py/mod.py
index af24577..73c3e1e 100644
--- a/src/py/mod.py
+++ b/src/py/mod.py
@@ -26,7 +26,7 @@ def unload(name):
del mod_list[name]
def load_sqmod(sqmodname):
- instance = import_module(''.join(['/srv/py/sqmod/',sqmodname,'/py/',sqmodname]))
+ instance = import_module(''.join(['/srv/http/toj/sqmod/',sqmodname,'/py/',sqmodname]))
return getattr(instance,sqmodname)
diff --git a/src/py/netio.py b/src/py/netio.py
index b896452..dac8273 100755
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -1,5 +1,5 @@
import os
-import traceback
+import math
import json
import struct
import socket
@@ -26,6 +26,7 @@ class SocketStream:
self.DATA_BUF = 0
self.DATA_NOBUF = 1
self.DATA_FILE = 2
+ self.STREAMTYPE = 'socket'
self._ioloop = tornado.ioloop.IOLoop.current()
self._sock = sock
@@ -294,20 +295,27 @@ class SocketStream:
self._ioloop.update_handler(fd,stat)
class SocketConnection(Connection):
- def __init__(self,link,main_stream,file_addr,add_pend_filestream_fn = None):
+ def __init__(self,
+ link,
+ main_stream,
+ file_addr,
+ pend_filestream_fn = None,
+ del_pend_filestream_fn = None):
+
super().__init__(link)
self._ioloop = tornado.ioloop.IOLoop.current()
self._sendfile_filekeymap = {}
self.main_stream = main_stream
- self.main_stream.set_close_callback(lambda conn : self.close())
+ self.main_stream.set_close_callback(lambda stream : self.close())
self.file_addr = file_addr
- self.add_pend_filestream = add_pend_filestream_fn
+ self.pend_filestream = pend_filestream_fn
+ self.del_pend_filestream = del_pend_filestream_fn
def _check_close(f):
def wrap(self,*args):
- if self._closed == True:
+ if self.closed():
raise ConnectionError
return f(self,*args)
@@ -343,7 +351,13 @@ class SocketConnection(Connection):
callback(err)
- fd = os.open(filepath,os.O_RDONLY)
+ try:
+ fd = os.open(filepath,os.O_RDONLY)
+
+ except FileNotFoundError:
+ callback('Eabort')
+ return
+
filesize = os.fstat(fd).st_size
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
@@ -357,7 +371,6 @@ class SocketConnection(Connection):
file_stream = stream
file_stream.set_close_callback(lambda stream : _callback('Eabort'))
- self._add_wait_filekey(filekey,_callback)
file_stream.recvfile(fd,filesize,_callback)
@@ -368,8 +381,13 @@ class SocketConnection(Connection):
except KeyError:
return
- file_stream.set_close_callback(None)
- file_stream.close()
+ if file_stream == None:
+ self.del_pend_filestream(filekey)
+
+ else:
+ file_stream.set_close_callback(None)
+ file_stream.close()
+
os.close(fd)
if err != None:
@@ -383,7 +401,8 @@ class SocketConnection(Connection):
file_stream = None
- self.add_pend_filestream(filekey,_conn_cb)
+ self._add_wait_filekey(filekey,_callback)
+ self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb)
fd = os.open(filepath,os.O_WRONLY | os.O_CREAT)
@_check_close
@@ -436,7 +455,6 @@ class SocketConnection(Connection):
file_stream = stream
file_stream.set_close_callback(lambda stream : _callback('Eabort'))
- self._add_wait_filekey(filekey,_callback)
file_stream.read_bytes(filesize,send_fn,nonbuf = True)
@@ -446,18 +464,23 @@ class SocketConnection(Connection):
except KeyError:
return
+
+ if file_stream == None:
+ self.del_pend_filestream(filekey)
- file_stream.set_close_callback(None)
- file_stream.close()
+ else:
+ file_stream.set_close_callback(None)
+ file_stream.close()
file_stream = None
- self.add_pend_filestream(filekey,_conn_cb)
+ self._add_wait_filekey(filekey,_callback)
+ self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb)
@_check_close
- def abort_file(self,filekey):
+ def abort_file(self,filekey,err):
try:
- self._sendfile_filekeymap[filekey]('Eabort')
+ self._sendfile_filekeymap[filekey](err)
except KeyError:
pass
@@ -476,42 +499,216 @@ class SocketConnection(Connection):
self.main_stream.read_bytes(8,_recv_size)
def close(self):
- if self._closed == True:
+ if self.closed():
return
- self._closed = True
+ super().close()
self.main_stream.close()
callbacks = list(self._sendfile_filekeymap.values())
for callback in callbacks:
callback('Eabort')
- super().close()
-
def _add_wait_filekey(self,filekey,fail_cb):
self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
def _del_wait_filekey(self,filekey):
del self._sendfile_filekeymap[filekey]
+class WebSocketStream:
+ def __init__(self,handler):
+ self._closed = False
+ self._handler = handler
+ self._recv_callback = None
+ self._close_callback = None
+
+ def _check_close(f):
+ def wrap(self,*args):
+ if self._closed == True:
+ raise ConnectionError
+
+ return f(self,*args)
+
+ return wrap
+
+ def set_close_callback(self,callback):
+ if callback == None:
+ self._close_callback = None
+
+ else:
+ self._close_callback = tornado.stack_context.wrap(callback)
+
+ def close(self):
+ if self._closed == True:
+ return
+
+ self._closed = True
+ try:
+ self._handler.close()
+
+ except Exception:
+ pass
+
+ if self._close_callback != None:
+ self._close_callback(self)
+
+ @_check_close
+ def set_recv_callback(self,callback):
+ self._recv_callback = tornado.stack_context.wrap(callback)
+
+ @_check_close
+ def send_msg(self,data):
+ self._handler.write_message(data,True)
+
+ @_check_close
+ def recv_msg(self,data):
+ if self._recv_callback != None:
+ self._recv_callback(data)
+
+ @_check_close
+ def recv_file(self,fd,filesize,callback = None):
+ def _recv_info(data):
+ nonlocal off
+ nonlocal partsize
+
+ info = json.loads(data.decode('utf-8'))
+ off = info['off']
+
+ partsize = max(0,min(math.ceil(filesize / 4),filesize - off))
+ self.set_recv_callback(_recv_cb)
+ self.send_msg('Success')
+
+ def _recv_cb(data):
+ nonlocal count
+
+ if count + len(data) > partsize:
+ if callback != None:
+ callback('Eillegal')
+
+ return
+
+ self.send_msg('Success')
+
+ os.pwrite(fd,data,off + count)
+ count += len(data)
+
+ if count == partsize:
+ if callback != None:
+ callback()
+
+ count = 0
+ off = 0
+ partsize = 0
+
+ self.set_recv_callback(_recv_info)
+
class WebSocketConnection(Connection):
- def __init__(self,link,handler):
+ def __init__(self,
+ link,
+ main_stream,
+ pend_filestream_fn = None,
+ del_pend_filestream_fn = None):
+ self.STREAMTYPE = 'websocket'
+
super().__init__(link)
self._ioloop = tornado.ioloop.IOLoop.current()
- self.handler = handler
+ self._sendfile_filekeymap = {}
- def send_msg(self,data):
- if self._closed == True:
- raise ConnectionError
+ self.main_stream = main_stream
+ self.main_stream.set_close_callback(lambda stream : self.close())
+ self.pend_filestream = pend_filestream_fn
+ self.del_pend_filestream = del_pend_filestream_fn
+
+ def _check_close(f):
+ def wrap(self,*args):
+ if self._closed == True:
+ raise ConnectionError
- self.handler.write_message(data,True)
+ return f(self,*args)
- def recv_msg(self,data):
+ return wrap
+
+ def close(self):
if self._closed == True:
- raise ConnectionError
+ return
- self._recv_callback(self,data)
+ self.main_stream.close()
+ super().close()
+
+ @_check_close
+ def send_msg(self,data):
+ self.main_stream.send_msg(data)
+ @_check_close
def start_recv(self,recv_callback):
- self._recv_callback = tornado.stack_context.wrap(recv_callback)
+ self.main_stream.set_recv_callback(
+ lambda data : recv_callback(self,data))
+
+ @_check_close
+ def recv_file(self,filekey,filesize,filepath,callback):
+ def _conn_cb(stream):
+ nonlocal file_streams
+
+ file_streams.append(stream)
+ stream.recv_file(fd,filesize,_stream_cb)
+ stream.set_close_callback(lambda stream : _stream_cb('Eabort'))
+
+ def _stream_cb(err = None):
+ nonlocal count
+
+ count += 1
+
+ if err != None:
+ _callback(err)
+
+ if count == 4:
+ _callback()
+
+ def _callback(err = None):
+ try:
+ self._del_wait_filekey(filekey)
+
+ except KeyError:
+ return
+
+ self.del_pend_filestream(filekey)
+
+ for stream in file_streams:
+ stream.set_close_callback(None)
+ stream.close()
+
+ os.close(fd)
+
+ if err != None:
+ try:
+ os.remove(filepath)
+
+ except FileNotFoundError:
+ pass
+
+ callback(err)
+
+ file_streams = []
+ count = 0
+
+ self._add_wait_filekey(filekey,_callback)
+ self.pend_filestream(self.STREAMTYPE,filekey,_conn_cb,4)
+
+ fd = os.open(filepath,os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+ os.ftruncate(fd,filesize);
+
+ @_check_close
+ def abort_file(self,filekey,err):
+ try:
+ self._sendfile_filekeymap[filekey](err)
+
+ except KeyError:
+ pass
+
+ def _add_wait_filekey(self,filekey,fail_cb):
+ self._sendfile_filekeymap[filekey] = tornado.stack_context.wrap(fail_cb)
+
+ def _del_wait_filekey(self,filekey):
+ del self._sendfile_filekeymap[filekey]
+
diff --git a/src/py/spup.c b/src/py/spup.c
new file mode 100644
index 0000000..749112f
--- /dev/null
+++ b/src/py/spup.c
@@ -0,0 +1,268 @@
+#include<unistd.h>
+#include<malloc.h>
+#include<pthread.h>
+#include<semaphore.h>
+#include<sys/syscall.h>
+#include<sys/types.h>
+#include<sys/eventfd.h>
+#include<linux/aio_abi.h>
+
+#include<Python.h>
+
+#define AIO_MAX_EVENT 4096
+#define AIO_MAX_GETEVENT 64
+
+struct aio_pack{
+ struct aio_pack *next;
+
+ int id;
+ struct iocb *iocb;
+ char *data;
+ char *buf;
+};
+
+static int io_setup(unsigned int nr_events,aio_context_t *ctx_idp);
+static int io_submit(aio_context_t ctx_id,long nr,struct iocb **iocbpp);
+static int io_destroy(aio_context_t ctx_id);
+static PyObject* spup_apply_mask(PyObject *self,PyObject *args);
+static PyObject* spup_aio_get_evtfd(PyObject *self,PyObject *args);
+static PyObject* spup_aio_read(PyObject *self,PyObject *args);
+static PyObject* spup_aio_write(PyObject *self,PyObject *args);
+static PyObject* spup_aio_getevents(PyObject *self,PyObject *args);
+static void* spup_aio_handler(void *arg);
+
+static PyMethodDef method[] = {
+ {"apply_mask",spup_apply_mask,METH_VARARGS,"apply_mask"},
+
+ {"aio_get_evtfd",spup_aio_get_evtfd,METH_NOARGS,"aio_get_evtfd"},
+ {"aio_read",spup_aio_read,METH_VARARGS,"aio_read"},
+ {"aio_write",spup_aio_write,METH_VARARGS,"aio_write"},
+ {"aio_getevents",spup_aio_getevents,METH_VARARGS,"aio_getevents"},
+ {NULL,NULL,0,NULL}
+};
+static struct PyModuleDef module = {
+ PyModuleDef_HEAD_INIT,
+ "spup",
+ NULL,
+ -1,
+ method
+};
+static PyObject *spup_error;
+static aio_context_t spup_aio_ctx;
+static int spup_aio_evtfd;
+static int spup_aio_id;
+static pthread_t spup_aio_thread;
+static struct aio_pack *spup_aio_queue[AIO_MAX_EVENT];
+static int spup_aio_head;
+static int spup_aio_tail;
+static sem_t spup_aio_sem;
+
+PyMODINIT_FUNC PyInit_spup(void){
+ PyObject *m;
+
+ m = PyModule_Create(&module);
+ if(m == NULL){
+ return NULL;
+ }
+
+ spup_error = PyErr_NewException("spup.error",NULL,NULL);
+ Py_INCREF(spup_error);
+ PyModule_AddObject(m,"error",spup_error);
+
+ memset(&spup_aio_ctx,0,sizeof(spup_aio_ctx));
+ io_setup(AIO_MAX_EVENT,&spup_aio_ctx);
+ spup_aio_evtfd = eventfd(0,0);
+ spup_aio_id = 0;
+ pthread_create(&spup_aio_thread,NULL,spup_aio_handler,NULL);
+ spup_aio_head = 0;
+ spup_aio_tail = 0;
+ sem_init(&spup_aio_sem,0,0);
+
+ return m;
+}
+
+static int io_setup(unsigned int nr_events,aio_context_t *ctx_idp){
+ return syscall(SYS_io_setup,nr_events,ctx_idp);
+}
+static int io_submit(aio_context_t ctx_id,long nr,struct iocb **iocbpp){
+ return syscall(SYS_io_submit,ctx_id,nr,iocbpp);
+}
+static int io_getevents(aio_context_t ctx_id,
+ long min_nr,
+ long nr,
+ struct io_event *events,
+ struct timespec *timeout){
+
+ return syscall(SYS_io_getevents,ctx_id,min_nr,nr,events,timeout);
+}
+static int io_destroy(aio_context_t ctx_id){
+ return syscall(SYS_io_destroy,ctx_id);
+}
+
+static PyObject* spup_apply_mask(PyObject *self,PyObject *args){
+ int i;
+ int j;
+
+ char *mask;
+ int mask_len;
+ char *data;
+ int data_len;
+ unsigned int mask_val;
+ char *tmp;
+ PyObject *ret;
+
+ PyArg_ParseTuple(args,"y#y#",&mask,&mask_len,&data,&data_len);
+ if(mask_len < 4){
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ tmp = malloc(data_len);
+ if(tmp == NULL){
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ mask_val = *(unsigned int*)mask;
+ for(i = 0;(i + 4) <= data_len;i += 4){
+ *((unsigned int*)(tmp + i)) = *((unsigned int*)(data + i)) ^ mask_val;
+ }
+ for(j = 0;i < data_len;i++,j++){
+ tmp[i] = data[i] ^ mask[j];
+ }
+
+ ret = PyBytes_FromStringAndSize(tmp,data_len);
+ free(tmp);
+
+ return ret;
+}
+
+static PyObject* spup_aio_get_evtfd(PyObject *self,PyObject *args){
+ return PyLong_FromLong(spup_aio_evtfd);
+}
+static PyObject* spup_aio_read(PyObject *self,PyObject *args){
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+static PyObject* spup_aio_write(PyObject *self,PyObject *args){
+ int fd;
+ char *data;
+ int data_len;
+ Py_ssize_t len;
+ unsigned long off;
+
+ struct aio_pack *pack;
+ struct iocb *iocb;
+ char *buf;
+
+ PyArg_ParseTuple(args,"iy#nk",&fd,&data,&data_len,&len,&off);
+ if(len > data_len){
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ pack = malloc(sizeof(*pack));
+ iocb = malloc(sizeof(*iocb));
+ buf = memalign(512,len);
+ if(pack == NULL || iocb == NULL || buf == NULL){
+ if(pack != NULL){
+ free(pack);
+ }
+ if(iocb != NULL){
+ free(iocb);
+ }
+ if(buf != NULL){
+ free(buf);
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ //memcpy(buf,data,len);
+
+ Py_INCREF(Py_None);
+ return Py_None;
+
+ spup_aio_id += 1;
+ pack->id = spup_aio_id;
+ pack->iocb = iocb;
+ pack->buf = buf;
+
+ memset(iocb,0,sizeof(*iocb));
+ iocb->aio_lio_opcode = IOCB_CMD_PWRITE;
+ iocb->aio_reqprio = 0;
+ iocb->aio_fildes = fd;
+ iocb->aio_buf = (unsigned long)buf;
+ iocb->aio_nbytes = len;
+ iocb->aio_offset = off;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = spup_aio_evtfd;
+ iocb->aio_data = (unsigned long)pack;
+
+ spup_aio_queue[spup_aio_tail] = pack;
+ spup_aio_tail = (spup_aio_tail + 1) % AIO_MAX_EVENT;
+ sem_post(&spup_aio_sem);
+
+ return PyLong_FromLong(pack->id);
+}
+static PyObject* spup_aio_getevents(PyObject *self,PyObject *args){
+ int i;
+
+ PyObject *ret;
+ struct io_event *evts;
+ int nr;
+ struct aio_pack *pack;
+ char *buf;
+
+ evts = malloc(sizeof(*evts) * AIO_MAX_GETEVENT);
+ if(evts == NULL){
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ nr = io_getevents(spup_aio_ctx,1,AIO_MAX_GETEVENT,evts,NULL);
+ for(i = 0;i < nr;i++){
+ pack = (struct aio_pack*)evts[i].data;
+ buf = pack->buf;
+
+ printf(" %lld\n",evts[i].res);
+
+ free(buf);
+ free(pack);
+ free((struct iocb*)evts[i].obj);
+ }
+
+ free(evts);
+ return ret;
+}
+
+static void* spup_aio_handler(void *arg){
+ int count;
+ struct aio_pack *pack;
+ struct iocb *iocbs[64];
+
+ while(1){
+ sem_wait(&spup_aio_sem);
+
+ count = 0;
+ while(1){
+ pack = spup_aio_queue[spup_aio_head];
+ iocbs[count] = pack->iocb;
+
+ spup_aio_head = (spup_aio_head + 1) % AIO_MAX_EVENT;
+ count++;
+
+ if(count >= 64){
+ break;
+ }
+ if(sem_trywait(&spup_aio_sem)){
+ break;
+ }
+ }
+
+ printf("%d\n",io_submit(spup_aio_ctx,count,iocbs));
+ }
+
+ return NULL;
+}
diff --git a/src/py/square.py b/src/py/square.py
index 6ce0d85..677c9cc 100644
--- a/src/py/square.py
+++ b/src/py/square.py
@@ -35,6 +35,8 @@ class SquareMg:
self.get_link = get_link_fn
self._sqmod_list = {}
+ Proxy.instance.register_filter('sq/', self.sqmod_filter)
+
Proxy.instance.register_call(
'core/square/', 'list_category', self.list_category)
Proxy.instance.register_call(
@@ -55,6 +57,8 @@ class SquareMg:
'core/square/', 'list_sqmod', self.list_sqmod)
def unload(self):
+ Proxy.instance.unregister_filter('sq/', self.sqmod_filter)
+
Proxy.instance.unregister_call(
'core/square/', 'list_category')
Proxy.instance.unregister_call(
@@ -652,6 +656,11 @@ class SquareMg:
sqmodname = self.get_sqmodname_by_sqmodid(sqmodid)
return sqmodname != None
+ def sqmod_filter(self, res_path, dst_func):
+ sqid = int(res_path[0])
+ with TOJAuth.change_current_iden(self._idendesc):
+ self.load_square(sqid)
+
class Square:
def unload(self, Force):
pass