aboutsummaryrefslogtreecommitdiffstats
path: root/judge/judge_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'judge/judge_server.cpp')
-rw-r--r--judge/judge_server.cpp519
1 files changed, 519 insertions, 0 deletions
diff --git a/judge/judge_server.cpp b/judge/judge_server.cpp
new file mode 100644
index 0000000..2bea17c
--- /dev/null
+++ b/judge/judge_server.cpp
@@ -0,0 +1,519 @@
+#ifdef HAVE_CONFIG_H
+# include<judge_conf.h>
+#endif
+
+#include<stdio.h>
+#include<stdlib.h>
+#include<string.h>
+#include<unistd.h>
+#include<fcntl.h>
+#include<dlfcn.h>
+#include<signal.h>
+#include<limits.h>
+#include<pthread.h>
+#include<semaphore.h>
+#include<errno.h>
+#include<sys/types.h>
+#include<sys/socket.h>
+#include<sys/epoll.h>
+#include<sys/eventfd.h>
+#include<sys/sendfile.h>
+#include<sys/mman.h>
+#include<netinet/in.h>
+#include<arpa/inet.h>
+#include<map>
+#include<queue>
+
+#include<judge_def.h>
+#include"netio.h"
+#include"tpool.h"
+#include"judge.h"
+#include"center_com.h"
+#include"judgm_line.h"
+#include"judgm_lib.h"
+#include"judge_server.h"
+
+server_epevdata::server_epevdata(int fd,int type,void *data){
+ this->fd = fd;
+ this->type = type;
+ this->data = data;
+}
+
+
+server_conn::server_conn(int fd):netio(fd){
+ this->recv_dispatch_fn = new netio_iofn<server_conn>(this,&server_conn::recv_dispatch);
+ this->recv_setid_fn = new netio_iofn<server_conn>(this,&server_conn::recv_setid);
+ this->recv_submit_fn = new netio_iofn<server_conn>(this,&server_conn::recv_submit);
+ this->recv_setpro_fn = new netio_iofn<server_conn>(this,&server_conn::recv_setpro);
+ this->recv_sendpro_fn = new netio_iofn<server_conn>(this,&server_conn::recv_sendpro);
+ this->done_sendpro_fn = new netio_iofn<server_conn>(this,&server_conn::done_sendpro);
+ this->recv_setjmod_fn = new netio_iofn<server_conn>(this,&server_conn::recv_setjmod);
+ this->recv_sendjmod_fn = new netio_iofn<server_conn>(this,&server_conn::recv_sendjmod);
+ this->done_sendjmod_fn = new netio_iofn<server_conn>(this,&server_conn::done_sendjmod);
+ this->recv_sendcode_fn = new netio_iofn<server_conn>(this,&server_conn::recv_sendcode);
+ this->done_sendcode_fn = new netio_iofn<server_conn>(this,&server_conn::done_sendcode);
+ this->tp_unpackjmod_thfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackjmod_th);
+ this->tp_unpackjmod_cbfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackjmod_cb);
+
+ send_setid();
+}
+char* server_conn::create_combuf(int code,int size,int &len,void **data){
+ char *buf;
+ center_com_header *header;
+
+ buf = new char[sizeof(center_com_header) + size];
+ header = (center_com_header*)buf;
+ header->code = code;
+ header->size = size;
+ len = sizeof(center_com_header) + size;
+ *data = (void*)(buf + sizeof(center_com_header));
+
+ return buf;
+}
+int server_conn::send_setid(){
+ char *write_buf;
+ int write_len;
+ center_com_setid *setid;
+
+ write_buf = create_combuf(CENTER_COMCODE_SETID,sizeof(center_com_setid),write_len,(void**)&setid);
+ setid->id = server_id;
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::send_setinfo(){
+ char *write_buf;
+ int write_len;
+ center_com_setinfo *setinfo;
+
+ write_buf = create_combuf(CENTER_COMCODE_SETINFO,sizeof(center_com_setinfo),write_len,(void**)&setinfo);
+ setinfo->avail = server_avail;
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::send_result(int subid,char *res_data,size_t res_len){
+ char *write_buf;
+ int write_len;
+ center_com_result *result;
+
+ if(res_len > JUDGE_RES_DATAMAX){
+ return -1;
+ }
+
+ write_buf = create_combuf(CENTER_COMCODE_RESULT,sizeof(center_com_result) + res_len,write_len,(void**)&result);
+ result->subid = subid;
+ memcpy((void*)(write_buf + sizeof(center_com_header) + sizeof(center_com_result)),res_data,res_len);
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::send_reqpro(int proid,int cacheid){
+ char *write_buf;
+ int write_len;
+ center_com_reqpro *reqpro;
+
+ write_buf = create_combuf(CENTER_COMCODE_REQPRO,sizeof(center_com_reqpro),write_len,(void**)&reqpro);
+ reqpro->proid = proid;
+ reqpro->cacheid = cacheid;
+ writebytes(write_buf,write_len,NULL,NULL);
+}
+int server_conn::send_setpro(std::vector<std::pair<int,int> > &pro_list,int type){
+ int i;
+ int count;
+ char *write_buf;
+ int write_len;
+ center_com_setpro *setpro;
+ judge_pro_info *pro_info;
+
+ count = pro_list.size();
+ write_buf = create_combuf(CENTER_COMCODE_SETPRO,sizeof(center_com_setpro) * count,write_len,(void**)&setpro);
+
+ for(i = 0;i < count;i++){
+ setpro[i].proid = pro_list[i].first;
+ setpro[i].cacheid = pro_list[i].second;
+ setpro[i].type = type;
+ }
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::send_setjmod(char **jmod_name,int *cacheid,int type,int count){
+ int i;
+
+ char *write_buf;
+ int write_len;
+ center_com_setjmod *setjmod;
+
+ write_buf = create_combuf(CENTER_COMCODE_SETJMOD,sizeof(center_com_setjmod) * count,write_len,(void**)&setjmod);
+ for(i = 0;i < count;i++){
+ setjmod[i].jmod_name[0] = '\0';
+ strncat(setjmod[i].jmod_name,jmod_name[i],sizeof(setjmod[i].jmod_name));
+ setjmod[i].cacheid = cacheid[i];
+ setjmod[i].type = type;
+ }
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::send_reqcode(int subid){
+ char *write_buf;
+ int write_len;
+ center_com_reqcode *reqcode;
+
+ write_buf = create_combuf(CENTER_COMCODE_REQCODE,sizeof(center_com_reqcode),write_len,(void**)&reqcode);
+ reqcode->subid = subid;
+ writebytes(write_buf,write_len,NULL,NULL);
+
+ return 0;
+}
+int server_conn::readidle(){
+ readbytes(new center_com_header,sizeof(center_com_header),recv_dispatch_fn,NULL);
+ return 0;
+}
+void server_conn::recv_dispatch(void *buf,size_t len,void *data){
+ center_com_header *header;
+ char *readbuf;
+
+ header = (center_com_header*)buf;
+ readbuf = new char[header->size];
+ printf("code:%d size:%d\n",header->code,header->size);
+ switch(header->code){
+ case CENTER_COMCODE_SETID:
+ readbytes(readbuf,header->size,recv_setid_fn,NULL);
+ break;
+ case CENTER_COMCODE_SUBMIT:
+ readbytes(readbuf,header->size,recv_submit_fn,NULL);
+ break;
+ case CENTER_COMCODE_SETPRO:
+ readbytes(readbuf,header->size,recv_setpro_fn,NULL);
+ break;
+ case CENTER_COMCODE_SENDPRO:
+ readbytes(readbuf,header->size,recv_sendpro_fn,NULL);
+ break;
+ case CENTER_COMCODE_SETJMOD:
+ readbytes(readbuf,header->size,recv_setjmod_fn,NULL);
+ break;
+ case CENTER_COMCODE_SENDJMOD:
+ readbytes(readbuf,header->size,recv_sendjmod_fn,NULL);
+ break;
+ case CENTER_COMCODE_SENDCODE:
+ readbytes(readbuf,header->size,recv_sendcode_fn,NULL);
+ break;
+ }
+
+ delete header;
+}
+void server_conn::recv_setid(void *buf,size_t len,void *data){
+ center_com_setid *setid;
+
+ setid = (center_com_setid*)buf;
+ server_id = setid->id;
+ printf("server_id:%d\n",server_id);
+
+ //judge server init
+ send_setinfo();
+
+ delete setid;
+}
+void server_conn::recv_submit(void *buf,size_t len,void *data){
+ center_com_submit *sub;
+
+ sub = (center_com_submit*)buf;
+ judge_manage_submit(sub->subid,sub->proid,sub->lang,(char*)((char*)buf + sizeof(center_com_submit)),len - sizeof(center_com_submit));
+
+ delete sub;
+}
+void server_conn::recv_setpro(void *buf,size_t len,void *data){
+ int ret;
+ int i;
+ int count;
+ center_com_setpro *setpro;
+
+ char tpath[PATH_MAX + 1];
+ FILE *f;
+ int cacheid;
+ std::vector<std::pair<int,int> > pro_list;
+
+ count = len / sizeof(center_com_setpro);
+ setpro = (center_com_setpro*)buf;
+ for(i = 0;i < count;i++){
+ if(setpro[i].type == 0){
+ ret = judge_manage_updatepro(setpro[i].proid,setpro[i].cacheid,true,NULL);
+ if(ret == 0){
+ if(server_fileconn == NULL){
+ server_fileconn = server_connect();
+ }
+ server_fileconn->send_reqpro(setpro[i].proid,setpro[i].cacheid);
+ }else if(ret == 1){
+ pro_list.push_back(std::make_pair(setpro[i].proid,setpro[i].cacheid));
+ }
+ }else if(setpro[i].type == 1){
+
+ }
+ }
+
+ if(!pro_list.empty()){
+ this->send_setpro(pro_list,0);
+ }
+
+ delete setpro;
+}
+void server_conn::recv_sendpro(void *buf,size_t len,void *data){
+ center_com_sendpro *sendpro;
+ judge_pro_info *pro_info;
+ char tpath[PATH_MAX + 1];
+ int fd;
+
+ sendpro = (center_com_sendpro*)buf;
+
+ if(judge_manage_updatepro(sendpro->proid,sendpro->cacheid,false,&pro_info) == 0){
+ snprintf(tpath,sizeof(tpath),"tmp/propack/%d_%d.tar.bz2",sendpro->proid,sendpro->cacheid);
+ fd = open(tpath,O_WRONLY | O_CREAT,0644);
+ readfile(fd,sendpro->filesize,done_sendpro_fn,pro_info);
+ }
+
+ delete sendpro;
+}
+void server_conn::done_sendpro(void *buf,size_t len,void *data){
+ judge_pro_info *pro_info;
+
+ close(*(int*)buf);
+
+ pro_info = (judge_pro_info*)data;
+ judge_manage_done_updatepro(pro_info);
+}
+void server_conn::recv_setjmod(void *buf,size_t len,void *data){
+ int i;
+ int count;
+ center_com_setjmod *setjmod;
+
+ char tpath[PATH_MAX + 1];
+ FILE *f;
+ int cacheid;
+ std::vector<char*> sl_jmod_name;
+ std::vector<int> sl_cacheid;
+
+ char *write_buf;
+ int write_len;
+ center_com_reqjmod *reqjmod;
+
+ count = len / sizeof(center_com_setjmod);
+ setjmod = (center_com_setjmod*)buf;
+ for(i = 0;i < count;i++){
+ if(setjmod[i].type == 0){
+ snprintf(tpath,sizeof(tpath),"tmp/jmod/%s/cacheinfo",setjmod[i].jmod_name);
+ f = fopen(tpath,"r");
+ if(f != NULL){
+ fscanf(f,"%d",&cacheid);
+ fclose(f);
+
+ if(cacheid == setjmod[i].cacheid){
+ sl_jmod_name.push_back(setjmod[i].jmod_name);
+ sl_cacheid.push_back(setjmod[i].cacheid);
+ continue;
+ }
+ }
+
+ if(server_fileconn == NULL){
+ server_fileconn = server_connect();
+ }
+
+ write_buf = create_combuf(CENTER_COMCODE_REQJMOD,sizeof(center_com_reqjmod),write_len,(void**)&reqjmod);
+ reqjmod->jmod_name[0] = '\0';
+ strncat(reqjmod->jmod_name,setjmod[i].jmod_name,sizeof(reqjmod->jmod_name));
+ server_fileconn->writebytes(write_buf,write_len,NULL,NULL);
+ }else if(setjmod[i].type == 1){
+
+ }
+ }
+
+ if(!sl_jmod_name.empty()){
+ this->send_setjmod(&sl_jmod_name[0],&sl_cacheid[0],0,sl_jmod_name.size());
+ }
+
+ delete setjmod;
+}
+void server_conn::recv_sendjmod(void *buf,size_t len,void *data){
+ center_com_sendjmod *sendjmod;
+ char tpath[PATH_MAX + 1];
+ int fd;
+
+ sendjmod = (center_com_sendjmod*)buf;
+ snprintf(tpath,sizeof(tpath),"tmp/jmodpack/%s.tar.bz2",sendjmod->jmod_name);
+ fd = open(tpath,O_WRONLY | O_CREAT,0644);
+ readfile(fd,sendjmod->filesize,done_sendjmod_fn,sendjmod);
+}
+void server_conn::done_sendjmod(void *buf,size_t len,void *data){
+ center_com_sendjmod *sendjmod;
+
+ close(*(int*)buf);
+
+ sendjmod = (center_com_sendjmod*)data;
+ server_packtp->add(tp_unpackjmod_thfn,sendjmod,tp_unpackjmod_cbfn,sendjmod);
+}
+void server_conn::recv_sendcode(void *buf,size_t len,void *data){
+ center_com_sendcode *sendcode;
+ char tpath[PATH_MAX + 1];
+ int fd;
+
+ sendcode = (center_com_sendcode*)buf;
+ snprintf(tpath,sizeof(tpath),"tmp/codepack/%d.tar.bz2",sendcode->subid);
+ fd = open(tpath,O_WRONLY | O_CREAT,0644);
+ readfile(fd,sendcode->filesize,done_sendcode_fn,sendcode);
+}
+void server_conn::done_sendcode(void *buf,size_t len,void *data){
+ center_com_sendcode *sendcode;
+
+ close(*(int*)buf);
+
+ sendcode = (center_com_sendcode*)data;
+ judge_manage_done_code(sendcode->subid);
+
+ delete sendcode;
+}
+void server_conn::tp_unpackjmod_th(void *data){
+ center_com_sendjmod *sendjmod;
+ char pack_path[PATH_MAX + 1];
+ char dir_path[PATH_MAX + 1];
+ char tpath[PATH_MAX + 1];
+ FILE *f;
+
+ sendjmod = (center_com_sendjmod*)data;
+
+ snprintf(pack_path,sizeof(pack_path),"tmp/jmodpack/%s.tar.bz2",sendjmod->jmod_name);
+ snprintf(dir_path,sizeof(dir_path),"tmp/jmod/%s",sendjmod->jmod_name);
+ mkdir(dir_path,0755);
+ tool_cleardir(dir_path);
+ tool_unpack(pack_path,dir_path);
+
+ snprintf(tpath,sizeof(tpath),"tmp/jmod/%s/cacheinfo",sendjmod->jmod_name);
+ f = fopen(tpath,"w");
+ fprintf(f,"%d",sendjmod->cacheid);
+ fclose(f);
+}
+void server_conn::tp_unpackjmod_cb(void *data){
+ center_com_sendjmod *sendjmod;
+ char *jmod_name;
+
+ sendjmod = (center_com_sendjmod*)data;
+ jmod_name = sendjmod->jmod_name;
+ send_setjmod(&jmod_name,&sendjmod->cacheid,0,1);
+
+ delete sendjmod;
+}
+
+
+
+int judge_server_addtpool(tpool *tpinfo){
+ server_addepev(tpinfo->fd,EPOLLIN | EPOLLET,SERVER_EPEV_TPOOL,tpinfo);
+ return 0;
+}
+int judge_server_setpro(std::vector<std::pair<int,int> > &pro_list){
+ server_mainconn->send_setpro(pro_list,0);
+ return 0;
+}
+int judge_server_reqcode(int subid){
+ if(server_codeconn == NULL){
+ server_codeconn = server_connect();
+ }
+ server_codeconn->send_reqcode(subid);
+
+ return 0;
+}
+int judge_server_result(int subid,char *res_data,int res_len){
+ server_mainconn->send_result(subid,res_data,res_len);
+ return 0;
+}
+
+static int server_addepev(int fd,unsigned int flag,int type,void *data){
+ server_epevdata *epevdata;
+ epoll_event epev;
+
+ epevdata = new server_epevdata(fd,type,data);
+ epev.events = flag;
+ epev.data.ptr = epevdata;
+ epoll_ctl(server_epfd,EPOLL_CTL_ADD,fd,&epev);
+
+ return 0;
+}
+static int server_delepev(server_epevdata *epevdata){
+ epoll_ctl(server_epfd,EPOLL_CTL_DEL,epevdata->fd,NULL);
+ delete epevdata;
+ return 0;
+}
+static server_conn* server_connect(){
+ int cfd;
+ sockaddr_in caddr;
+ epoll_event epev;
+ server_conn *cinfo;
+
+ cfd = socket(AF_INET,SOCK_STREAM | SOCK_NONBLOCK,6);
+ caddr.sin_family = AF_INET;
+ caddr.sin_port = htons(SERVER_JUDGE_PORT);
+ //caddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ caddr.sin_addr.s_addr = inet_addr("10.8.0.2");
+
+ cinfo = new server_conn(cfd);
+ server_addepev(cfd,EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET,SERVER_EPEV_JUDGECLIENT,cinfo);
+ connect(cfd,(sockaddr*)&caddr,sizeof(caddr));
+
+ return cinfo;
+}
+int main(){
+ int i;
+
+ epoll_event epev;
+ epoll_event epevs[SERVER_EPOLL_MAXEVENT];
+ int nevs;
+
+ int ev_flag;
+ server_epevdata *epevdata;
+ server_conn *cinfo;
+ tpool *tpinfo;
+
+ signal(SIGPIPE,SIG_IGN);
+ server_epfd = epoll_create1(0);
+
+ server_id = 0;
+ server_avail = JUDGE_THREAD_JUDGEMAX;
+ server_mainconn = server_connect();
+ server_fileconn = NULL;
+ server_codeconn = NULL;
+
+ judge_manage_init();
+
+ server_packtp = new tpool(4);
+ judge_server_addtpool(server_packtp);
+ server_packtp->start();
+
+
+ while(true){
+ nevs = epoll_wait(server_epfd,epevs,SERVER_EPOLL_MAXEVENT,-1);
+ for(i = 0;i < nevs;i++){
+ ev_flag = epevs[i].events;
+ epevdata = (server_epevdata*)epevs[i].data.ptr;
+
+ if(epevdata->type == SERVER_EPEV_JUDGECLIENT){
+ cinfo = (server_conn*)epevdata->data;
+ if(ev_flag & EPOLLIN){
+ cinfo->readio();
+ }
+ if(ev_flag & EPOLLOUT){
+ cinfo->writeio();
+ }
+
+ server_packtp->done();
+ }else if(epevdata->type = SERVER_EPEV_TPOOL){
+ tpinfo = (tpool*)epevdata->data;
+ if(ev_flag & EPOLLIN){
+ tpinfo->done();
+ }
+ }
+ }
+ }
+
+ close(server_epfd);
+
+ return 0;
+}