X-Git-Url: https://git.dlugolecki.net.pl/?p=vlp.git;a=blobdiff_plain;f=int%2Fcint.c;h=547a77087b7c0801606b3dcfc8aa36642e5d26c2;hp=b81996c051f7b2e519f02042daeeb5e89fe80685;hb=e702805740237c3ce3bc4c5de9466d3f0d630595;hpb=9db87b545def5d31a64608f2eb082d915ad5efa4 diff --git a/int/cint.c b/int/cint.c index b81996c..547a770 100644 --- a/int/cint.c +++ b/int/cint.c @@ -13,7 +13,7 @@ #include #include #include - +#include #ifndef NO_PROTOTYPES static void load(char *); @@ -26,6 +26,7 @@ int main(); #endif + int internal_sock,graph_sock,net_sock,connected=0; struct sockaddr_un svr; int GraphRes=-1; @@ -427,14 +428,29 @@ void send_to_kernel(MESSAGE *msg) } /* send message to net */ -void send_to_net(MESSAGE *msg) +/* 2010 returns 1 if ok and 0 if node desn't exists */ +int send_to_net(MESSAGE *msg) { int k,len; MESSAGE m; struct sockaddr_in svr; + char addr[256]; - + k = msg->int_msg.control.receiver.node; + + /* 2010 check if node exists */ + m.msg_type = MSG_NET; + m.param.pword[0] = NET_NODE_EXIST; + m.param.pword[1] = k; + m.param.pword[2] = my_ctx.program_id; + write(net_sock,&m,sizeof(MESSAGE)); + bzero(&m,sizeof(MESSAGE)); + while( (m.msg_type!=MSG_NET) && (m.param.pword[0]!=NET_NODE_EXIST) ) + read(net_sock,&m,sizeof(MESSAGE)); + if (m.param.pword[1]!=1) return 0; + strcpy(addr,m.param.pstr); + if (RInstance[k]==-1) { bzero(&m,sizeof(MESSAGE)); @@ -444,34 +460,38 @@ void send_to_net(MESSAGE *msg) m.param.pword[2] = k; send_to_kernel(&m); bzero(&m,sizeof(MESSAGE)); + while(1) { read(internal_sock,&m,sizeof(MESSAGE)); if ( (m.msg_type == MSG_VLP) && (m.param.pword[0]==VLP_REMOTE_INSTANCE_HERE)) break; } - + + /*printf("DEBUG: remote instance made with id: %d addr %s port %d\n",m.param.pword[1],addr, htons(m.param.pword[8]));*/ RInstance[k] = m.param.pword[1]; /* Make direct connection */ DirConn[k] = socket(AF_INET,SOCK_STREAM,0); svr.sin_family = AF_INET; - svr.sin_addr.s_addr = inet_addr(m.param.pstr); + svr.sin_addr.s_addr = inet_addr(addr); svr.sin_port = htons(m.param.pword[8]); len = connect(DirConn[k],(struct sockaddr*)&svr,sizeof(svr)); if (len!=0) { RInstance[k]=-1; + writeln_str("Cannot connect remote instance!"); } else { fcntl(DirConn[k], F_SETFL,O_NONBLOCK | fcntl(DirConn[k], F_GETFL,0)); - - } + } } -if (RInstance[k]!=-1) -{ - write(DirConn[k],&(msg->int_msg),sizeof(message)); -} + if (RInstance[k]!=-1) + { + + write(DirConn[k],&(msg->int_msg),sizeof(message)); + } + return 1; } @@ -486,12 +506,13 @@ void get_internal() int r,max,i; char s[80]; - fd_set rset,wset; + fd_set rset; struct timeval tout={0,0}; - - /* ----------- Direct connection messages -----*/ + + /* 2010 */ + /* ----------- Direct connection messages ----- FD_ZERO(&DirSet); maxDirSet=0; for(i=0;i<255;i++) @@ -517,18 +538,44 @@ void get_internal() } } } - /*-----------------------------------------*/ + -----------------------------------------*/ - FD_ZERO(&rset);FD_ZERO(&wset); + FD_ZERO(&rset); FD_SET(net_sock,&rset); - FD_SET(internal_sock,&rset); - + FD_SET(internal_sock,&rset); if (net_sock>internal_sock) max = net_sock; else max = internal_sock; + /* 2010 */ + for(i=0;i<255;i++) { + if (DirConn[i]!=-1) + { + FD_SET(DirConn[i],&rset); + if (max0) + if (select(max+1,&rset,0,0,(struct timeval *)&tout)>0) { + /* 2010 */ + for(i=0;i<255;i++) + { + if ( (DirConn[i]!=-1) && (FD_ISSET(DirConn[i],&rset)) ) + { + /*printf("DEBUG: Interp has message on direct connection: type %d par %d\n",mx.control.type,mx.control.par); */ + r=read(DirConn[i],&mx,sizeof(mx)); + if (r>0 && r==sizeof(mx)) + { + memcpy(globmsgqueue+msgtail,&mx,sizeof(message)); + msgtail = (msgtail+1) % MAXMSGQUEUE; + msgready++; + } + } + } + + + + if (FD_ISSET(net_sock,&rset)) { r = read(net_sock, &m, sizeof(MESSAGE)); @@ -540,6 +587,7 @@ void get_internal() switch(m.param.pword[0]) { case NET_PROPAGATE: + /*printf("DEBUG: cint net_sock MSG_NET NET_PROPAGATE cmd %d\n",m.param.pword[6]);*/ memcpy(globmsgqueue+msgtail,&m.int_msg,sizeof(message)); msgtail = (msgtail+1) % MAXMSGQUEUE; msgready++; @@ -567,6 +615,7 @@ void get_internal() case MSG_NET: if (m.param.pword[0] == NET_PROPAGATE) { + /*printf("DEBUG: cint internal_sock MSG_NET NET_PROPAGATE cmd %d\n",m.param.pword[6]);*/ memcpy(globmsgqueue+msgtail,&m.int_msg,sizeof(message)); msgtail = (msgtail+1) % MAXMSGQUEUE; msgready++; @@ -578,6 +627,7 @@ void get_internal() } } /* FD_ISSET */ } /* select */ + } @@ -654,8 +704,7 @@ void send_ready() bzero(&svr,sizeof(svr)); DirConn[parent_ctx.node] = accept(sock,(struct sockaddr*)&svr,&len); - fcntl(DirConn[parent_ctx.node], F_SETFL,O_NONBLOCK | - fcntl(DirConn[parent_ctx.node], F_GETFL,0)); + fcntl(DirConn[parent_ctx.node], F_SETFL,O_NONBLOCK | fcntl(DirConn[parent_ctx.node], F_GETFL,0)); } int main(argc, argv) @@ -675,13 +724,13 @@ char **argv; setjmp(contenv); /* set label for continue long jump */ while (TRUE) /* repeat until exit() is called */ { - get_internal(); + get_internal(); schedule(); /* reschedule current process */ - if (ready != 0) - { + if (ready != 0) + { decode(); /* fetch instruction */ execute(); /* and execute it */ - } + } } return 0;