#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
-
+#include <qthread.h>
#ifndef NO_PROTOTYPES
static void load(char *);
#endif
+
int internal_sock,graph_sock,net_sock,connected=0;
struct sockaddr_un svr;
int GraphRes=-1;
}
/* send message to net */
-void send_to_net(MESSAGE *msg)
+/* 2010 returns 1 if ok and 0 if node desn't exists */
+int send_to_net(MESSAGE *msg)
{
int k,len;
MESSAGE m;
struct sockaddr_in svr;
+ char addr[256];
-
+
k = msg->int_msg.control.receiver.node;
+
+ /* 2010 check if node exists */
+ m.msg_type = MSG_NET;
+ m.param.pword[0] = NET_NODE_EXIST;
+ m.param.pword[1] = k;
+ m.param.pword[2] = my_ctx.program_id;
+ write(net_sock,&m,sizeof(MESSAGE));
+ bzero(&m,sizeof(MESSAGE));
+ while( (m.msg_type!=MSG_NET) && (m.param.pword[0]!=NET_NODE_EXIST) )
+ read(net_sock,&m,sizeof(MESSAGE));
+ if (m.param.pword[1]!=1) return 0;
+ strcpy(addr,m.param.pstr);
+
if (RInstance[k]==-1)
{
bzero(&m,sizeof(MESSAGE));
m.param.pword[2] = k;
send_to_kernel(&m);
bzero(&m,sizeof(MESSAGE));
+
while(1)
{
read(internal_sock,&m,sizeof(MESSAGE));
if ( (m.msg_type == MSG_VLP) && (m.param.pword[0]==VLP_REMOTE_INSTANCE_HERE)) break;
}
-
+
+ /*printf("DEBUG: remote instance made with id: %d addr %s port %d\n",m.param.pword[1],addr, htons(m.param.pword[8]));*/
RInstance[k] = m.param.pword[1];
/* Make direct connection */
DirConn[k] = socket(AF_INET,SOCK_STREAM,0);
svr.sin_family = AF_INET;
- svr.sin_addr.s_addr = inet_addr(m.param.pstr);
+ svr.sin_addr.s_addr = inet_addr(addr);
svr.sin_port = htons(m.param.pword[8]);
len = connect(DirConn[k],(struct sockaddr*)&svr,sizeof(svr));
if (len!=0)
{
RInstance[k]=-1;
+
writeln_str("Cannot connect remote instance!");
}
else
{
fcntl(DirConn[k], F_SETFL,O_NONBLOCK | fcntl(DirConn[k], F_GETFL,0));
-
- }
+ }
}
-if (RInstance[k]!=-1)
-{
- write(DirConn[k],&(msg->int_msg),sizeof(message));
-}
+ if (RInstance[k]!=-1)
+ {
+
+ write(DirConn[k],&(msg->int_msg),sizeof(message));
+ }
+ return 1;
}
int r,max,i;
char s[80];
- fd_set rset,wset;
+ fd_set rset;
struct timeval tout={0,0};
-
- /* ----------- Direct connection messages -----*/
+
+ /* 2010 */
+ /* ----------- Direct connection messages -----
FD_ZERO(&DirSet);
maxDirSet=0;
for(i=0;i<255;i++)
}
}
}
- /*-----------------------------------------*/
+ -----------------------------------------*/
- FD_ZERO(&rset);FD_ZERO(&wset);
+ FD_ZERO(&rset);
FD_SET(net_sock,&rset);
- FD_SET(internal_sock,&rset);
-
+ FD_SET(internal_sock,&rset);
if (net_sock>internal_sock) max = net_sock; else max = internal_sock;
+ /* 2010 */
+ for(i=0;i<255;i++) {
+ if (DirConn[i]!=-1)
+ {
+ FD_SET(DirConn[i],&rset);
+ if (max<DirConn[i]) max=DirConn[i];
+ }
+ } /* for */
- if (select(max+1,&rset,&wset,0,(struct timeval *)&tout)>0)
+ if (select(max+1,&rset,0,0,(struct timeval *)&tout)>0)
{
+ /* 2010 */
+ for(i=0;i<255;i++)
+ {
+ if ( (DirConn[i]!=-1) && (FD_ISSET(DirConn[i],&rset)) )
+ {
+ /*printf("DEBUG: Interp has message on direct connection: type %d par %d\n",mx.control.type,mx.control.par); */
+ r=read(DirConn[i],&mx,sizeof(mx));
+ if (r>0 && r==sizeof(mx))
+ {
+ memcpy(globmsgqueue+msgtail,&mx,sizeof(message));
+ msgtail = (msgtail+1) % MAXMSGQUEUE;
+ msgready++;
+ }
+ }
+ }
+
+
+
+
if (FD_ISSET(net_sock,&rset))
{
r = read(net_sock, &m, sizeof(MESSAGE));
switch(m.param.pword[0])
{
case NET_PROPAGATE:
+ /*printf("DEBUG: cint net_sock MSG_NET NET_PROPAGATE cmd %d\n",m.param.pword[6]);*/
memcpy(globmsgqueue+msgtail,&m.int_msg,sizeof(message));
msgtail = (msgtail+1) % MAXMSGQUEUE;
msgready++;
case MSG_NET:
if (m.param.pword[0] == NET_PROPAGATE)
{
+ /*printf("DEBUG: cint internal_sock MSG_NET NET_PROPAGATE cmd %d\n",m.param.pword[6]);*/
memcpy(globmsgqueue+msgtail,&m.int_msg,sizeof(message));
msgtail = (msgtail+1) % MAXMSGQUEUE;
msgready++;
}
} /* FD_ISSET */
} /* select */
+
}
bzero(&svr,sizeof(svr));
DirConn[parent_ctx.node] = accept(sock,(struct sockaddr*)&svr,&len);
- fcntl(DirConn[parent_ctx.node], F_SETFL,O_NONBLOCK |
- fcntl(DirConn[parent_ctx.node], F_GETFL,0));
+ fcntl(DirConn[parent_ctx.node], F_SETFL,O_NONBLOCK | fcntl(DirConn[parent_ctx.node], F_GETFL,0));
}
int main(argc, argv)
setjmp(contenv); /* set label for continue long jump */
while (TRUE) /* repeat until exit() is called */
{
- get_internal();
+ get_internal();
schedule(); /* reschedule current process */
- if (ready != 0)
- {
+ if (ready != 0)
+ {
decode(); /* fetch instruction */
execute(); /* and execute it */
- }
+ }
}
return 0;