1 #include "../head/genint1.h"
2 #include "../head/comm.h"
4 #include <sys/socket.h>
6 #include <netinet/in.h>
11 #include <sys/types.h>
22 #define REMOTE_PATH "REMOTE"
26 #define CODEPORT1 3800
28 #define FILE_BUFFER_SIZE 2048
32 // ************** Interpreter slot *******************
49 // ******************** Network slot ********************
54 bool connected,code_transmit;
71 for(i=0;i<5;i++) aliases[i]=-1;
80 //********************** NET Module ****************************
85 int kernel_sock,listen_sock;
86 bool all_connected,local_mode;
87 int to_connect,MyNode;
91 QList<INTlink> Interpreters; // List of the Interpeter slots
92 QList<NETlink> Links; // List of the Network slots
96 void load_config(char*);
97 void write_at_console(char*);
98 void send_to_kernel(MESSAGE*);
99 void sock_reopen(NETlink*);
100 void send_connect_info(NETlink*);
101 void send_accept_info(NETlink*);
102 void send_to_node(NETlink*,MESSAGE*);
103 void send_to_int(MESSAGE*);
104 void send_code_ack(NETlink*);
105 void send_to_all(MESSAGE *);
107 NETlink *findNETlink(int node);
108 INTlink *findINTlink(int id);
109 void transmit_file(int ton, char *fname, int fromINT);
110 void propagate_msg(MESSAGE *msg);
111 void check_node(int,int);
114 void exit_sequence();
115 void disconnect_seq();
116 void connect_seq(char*);
117 void accept_connection();
119 void remote_messages();
121 void get_message(NETlink*);
126 NETMOD::NETMOD(char *kernel_name)
129 struct sockaddr_in svr;
130 struct sockaddr_un svr1;
136 Interpreters.clear();
138 bzero(&svr, sizeof(svr));
139 listen_sock = socket(AF_INET, SOCK_STREAM, 0);
140 svr.sin_family = AF_INET;
141 svr.sin_addr.s_addr = INADDR_ANY;
142 svr.sin_port = htons(LOGPORT);
143 bind(listen_sock, (struct sockaddr*)&svr, sizeof(svr));
144 listen(listen_sock,5);
145 fcntl(listen_sock, F_SETFL,O_NONBLOCK | fcntl(listen_sock, F_GETFL,0));
149 load_config("vlp.cfg");
151 kernel_sock = socket(AF_UNIX,SOCK_STREAM,0);
152 bzero(&svr1,sizeof(svr1));
153 svr1.sun_family = AF_UNIX;
154 strcpy(svr1.sun_path,kernel_name);
155 strcpy(kername,kernel_name);
156 len = strlen(svr1.sun_path)+sizeof(svr1.sun_family);
157 i = connect(kernel_sock,(struct sockaddr*)&svr1,len);
159 fcntl(kernel_sock,F_SETFL, O_NONBLOCK|fcntl(kernel_sock,F_GETFL,0));
161 setsockopt(kernel_sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
162 m.msg_type = MSG_NET;
163 m.param.pword[0] = NET_NODE;
164 m.param.pword[1] = MyNode;
167 // if (regme) regme_sequence();
170 write_at_console("Connecting remote VLPs...");
171 while (!all_connected) check_links();
173 sprintf(s,"Local node number %d",MyNode);
177 // ##################### Load configuration ##########################
179 void NETMOD::load_config(char *fname)
182 char line1[80],*line2;
186 f = fopen(fname,"r");
191 fscanf(f,"%s\n",line1);
192 line2 = strtok(line1,"=");
193 if (line2==NULL) { write_at_console("Bad config file\n");exit(1);}
195 if (strcmp(line2,"node_number")==0)
197 line2 = strtok(NULL,"=");
198 MyNode = atoi(line2);
201 if (strcmp(line2,"host")==0)
203 line2 = strtok(NULL,"=");if (line2==NULL) {exit(1);}
205 pomlink = new NETlink;
206 strcpy(pomlink->addr,line2);
207 pomlink->connected = FALSE;
208 pomlink->sock = socket(AF_INET, SOCK_STREAM, 0);
209 fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,
212 setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
213 Links.append(pomlink);
218 if (strcmp(line2,"type")==0)
225 if (k==0) all_connected=TRUE;
227 if (MyNode==-1) {write_at_console("Node number must be specified");exit(1);};
232 void NETMOD::write_at_console(char *s)
236 msg.msg_type = MSG_NET;
237 msg.param.pword[0] = NET_CSWRITELN;
238 strcpy(msg.param.pstr,s);
239 send_to_kernel(&msg);
242 void NETMOD::send_to_kernel(MESSAGE *msg)
244 write(kernel_sock,msg,sizeof(MESSAGE));
247 void NETMOD::send_to_node(NETlink *lnk, MESSAGE *msg)
249 msg->msg_type = MSG_NET;
252 write(lnk->sock,msg,sizeof(MESSAGE));
255 void NETMOD::send_to_int(MESSAGE *msg)
259 pomlink = findINTlink(msg->param.pword[5]);
260 if (pomlink!=NULL) write(pomlink->sock,msg,sizeof(MESSAGE));
265 void NETMOD::accept_connection()
269 struct sockaddr_in svr;
271 struct timeval tout = {0,0};
274 FD_ZERO(&rset);FD_ZERO(&wset);
275 FD_SET(listen_sock,&rset);
277 if (select(listen_sock+1,&rset,&wset,0,(struct timeval *)&tout)>0)
278 if (FD_ISSET(listen_sock,&rset))
281 /* accept connection on listen socket */
283 bzero(&svr, sizeof(svr));
284 nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);
289 /* i<0 someone wants to connect us */
291 pomlink = new NETlink;
292 strcpy(pomlink->addr,inet_ntoa(svr.sin_addr));
293 pomlink->sock = nsock;
294 pomlink->connected = TRUE;
295 fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,
298 setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
299 Links.append(pomlink);
305 void NETMOD::check_node(int n, int sc)
310 m.msg_type = MSG_NET;
311 m.param.pword[0] = NET_NODE_EXIST;
313 pomlink = Links.first();
314 m.param.pword[1] = 0;
315 while (pomlink!=NULL)
317 if ( pomlink->node_number==n )
319 m.param.pword[1] = 1;break;
321 pomlink = Links.next();
323 write(sc,&m,sizeof(MESSAGE));
327 // ************* Internal message from kernel or INT *******************
329 void NETMOD::get_internal()
334 fd_set readset,writeset;
335 struct timeval tout={0,0};
337 struct sockaddr_un svr;
341 FD_ZERO(&readset);FD_ZERO(&writeset);
342 FD_SET(kernel_sock,&readset);
345 pomlink = Interpreters.first();
346 while (pomlink!=NULL)
348 FD_SET(pomlink->sock,&readset);
349 if (nrset<pomlink->sock) nrset=pomlink->sock;
350 pomlink=Interpreters.next();
353 if (select(nrset+1,&readset,&writeset,0,(struct timeval *)&tout)>0)
356 /* Check request sockets */
357 pomlink = Interpreters.first();
358 while (pomlink!=NULL)
360 if (FD_ISSET(pomlink->sock,&readset))
362 nr = read(pomlink->sock,&msg,sizeof(MESSAGE));
365 if (msg.msg_type == MSG_NET)
366 switch(msg.param.pword[0])
368 case NET_PROPAGATE:propagate_msg(&msg);break;
369 case NET_NODE_EXIST:check_node(msg.param.pword[1],pomlink->sock);break;
370 case NET_GET_INFO: conn_info(pomlink->sock);break;
371 case NET_NODES_NUM: msg.param.pword[0]=NET_NODES_NUM_RESPONSE;
372 msg.param.pword[1]=Links.count();
373 write(pomlink->sock,&msg,sizeof(MESSAGE));
378 pomlink=Interpreters.next();
381 /* Check internal socket */
382 if (FD_ISSET(kernel_sock,&readset))
384 nr = read(kernel_sock, &msg, sizeof(MESSAGE));
387 if (msg.msg_type == MSG_NET)
389 switch(msg.param.pword[0])
391 case NET_TRANSMIT_CODE:
392 transmit_file(msg.param.pword[2],msg.param.pstr,msg.param.pword[1]);
394 case NET_EXIT: { disconnect_seq();exit_sequence();}
396 case NET_GET_INFO: conn_info(kernel_sock);break;
397 case NET_PROPAGATE:propagate_msg(&msg);break;
398 case NET_DISCONNECT:disconnect_seq();
400 case NET_NODE_EXIST: check_node(msg.param.pword[1],kernel_sock);break;
401 case NET_CONNECT_TO: connect_seq(msg.param.pstr);
406 if (msg.msg_type == MSG_VLP)
407 switch(msg.param.pword[0])
411 pomlink = new INTlink;
412 pomlink->sock = socket(AF_UNIX,SOCK_STREAM,0);
413 bzero(&svr,sizeof(svr));
414 svr.sun_family = AF_UNIX;
415 strcpy(svr.sun_path,msg.param.pstr);
416 si = strlen(svr.sun_path)+sizeof(svr.sun_family);
417 sj = connect(pomlink->sock,(struct sockaddr*)&svr,si);
419 fcntl(pomlink->sock,F_SETFL, O_NONBLOCK|
420 fcntl(pomlink->sock,F_GETFL,0));
422 setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
423 pomlink->ID = msg.param.pword[1];
424 pomlink->connected=TRUE;
425 Interpreters.append(pomlink);
428 case VLP_INTERPRETER_DOWN:
430 pomlink = findINTlink(msg.param.pword[1]);
433 close(pomlink->sock);
434 Interpreters.remove(pomlink);
447 void NETMOD::get_message(NETlink *lnk)
452 int rdbt,rd,sz,j,psock;
453 struct sockaddr_in svr;
454 unsigned char buffer[FILE_BUFFER_SIZE];
459 nr = read(lnk->sock,&msg, sizeof(MESSAGE));
463 if (msg.msg_type == MSG_NET)
465 switch(msg.param.pword[0])
468 lnk->code_transmit = TRUE;
469 sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
470 strcat(pomstr,".ccd");
471 lnk->CodeFile = fopen(pomstr,"wb");
472 if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
473 lnk->code_transmit=FALSE;}
474 lnk->CodeSize=msg.param.pword[1];
475 psock = socket(AF_INET, SOCK_STREAM, 0);
476 bzero(&svr, sizeof(svr));
477 svr.sin_family = AF_INET;
478 svr.sin_port = htons(CODEPORT);
479 svr.sin_addr.s_addr = inet_addr(lnk->addr);
480 j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
483 //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
485 while (sz<lnk->CodeSize)
487 rd = read(psock,&buffer,sizeof(buffer));
488 rdbt = fwrite(&buffer,sizeof(unsigned char),rd,lnk->CodeFile);
492 fclose(lnk->CodeFile);
497 lnk->code_transmit = TRUE;
498 sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
499 strcat(pomstr,".pcd");
500 lnk->CodeFile = fopen(pomstr,"wb");
501 if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
502 lnk->code_transmit=FALSE;}
503 lnk->CodeSize=msg.param.pword[1];
504 psock = socket(AF_INET, SOCK_STREAM, 0);
505 bzero(&svr, sizeof(svr));
506 svr.sin_family = AF_INET;
507 svr.sin_port = htons(CODEPORT1);
508 svr.sin_addr.s_addr = inet_addr(lnk->addr);
509 j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
512 //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
514 while (sz<lnk->CodeSize)
516 rd = read(psock,&proto,sizeof(proto));
517 rdbt = fwrite(&proto,sizeof(unsigned char),rd,lnk->CodeFile);
521 fclose(lnk->CodeFile);
526 sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
528 lnk->node_number = msg.param.pword[1];
529 write_at_console(pomstr);
530 send_accept_info(lnk);
533 sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
535 lnk->node_number = msg.param.pword[1];
536 write_at_console(pomstr);
539 sprintf(pomstr,"Node: %d disconnected",msg.param.pword[1]);
540 write_at_console(pomstr);
546 if (msg.param.pword[1] == MSG_VLP) send_to_kernel(&msg);
547 else if (msg.param.pword[1] == MSG_INT) send_to_int(&msg);
560 void NETMOD::remote_messages()
564 struct timeval tout={0,0};
567 FD_ZERO(&rset);FD_ZERO(&wset);
570 pomlink = Links.first();
571 while (pomlink!=NULL)
573 if (pomlink->connected)
575 FD_SET(pomlink->sock,&rset);
576 if (max<pomlink->sock) max=pomlink->sock;
578 pomlink=Links.next();
581 if (select(max+1,&rset,&wset,0,(struct timeval *)&tout)>0)
583 pomlink=Links.first();
584 while (pomlink!=NULL)
586 if (FD_ISSET(pomlink->sock,&rset)) get_message(pomlink);
587 pomlink=Links.next();
593 void NETMOD::propagate_msg(MESSAGE *msg)
598 pomlink = findNETlink(msg->param.pword[4]);
599 if ((pomlink!=NULL)&&(pomlink->connected))
600 send_to_node(pomlink,msg);
602 if (msg->param.pword[1]==MSG_INT)
608 sprintf(ss,"Not connected to Node %d",msg->param.pword[4]);
609 write_at_console(ss);
616 void NETMOD::connect_seq(char *a)
619 struct sockaddr_in svr;
626 if (strcmp(pom->addr,a)==0) return;
631 pom->connected = FALSE;
632 pom->sock = socket(AF_INET, SOCK_STREAM, 0);
633 bzero(&svr, sizeof(svr));
634 svr.sin_family = AF_INET;
635 svr.sin_port = htons(LOGPORT);
636 svr.sin_addr.s_addr = inet_addr(pom->addr);
637 j = connect(pom->sock, (struct sockaddr*)&svr, sizeof(svr));
639 { pom->connected = TRUE;
640 fcntl(pom->sock, F_SETFL,O_NONBLOCK | fcntl(pom->sock,
643 setsockopt(pom->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
644 send_connect_info(pom);
647 else write_at_console("Connection failed");
650 void NETMOD::check_links()
654 struct sockaddr_in svr;
657 /* connect to all other nodes */
660 pomlink=Links.first();
661 while (pomlink!=NULL)
663 if ( !(pomlink->connected) )
665 bzero(&svr, sizeof(svr));
666 svr.sin_family = AF_INET;
667 svr.sin_port = htons(LOGPORT);
668 svr.sin_addr.s_addr = inet_addr(pomlink->addr);
670 j = connect(pomlink->sock, (struct sockaddr*)&svr, sizeof(svr));
672 { pomlink->connected = TRUE;
673 fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,
675 send_connect_info(pomlink);
678 if (errno == ECONNREFUSED)
679 sock_reopen(pomlink);
681 pomlink=Links.next();
686 pomlink=Links.first();
689 if (pomlink->connected==FALSE) {all_connected=FALSE;break;}
690 pomlink=Links.next();
695 void NETMOD::sock_reopen(NETlink *lnk)
700 lnk->sock = socket(AF_INET, SOCK_STREAM, 0);
701 fcntl(lnk->sock, F_SETFL,O_NONBLOCK | fcntl(lnk->sock,
703 setsockopt(lnk->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
707 // **************** Acknowledges *************************
709 void NETMOD::send_connect_info(NETlink *lnk)
713 m.param.pword[0] = NET_CONNECT;
714 m.param.pword[1] = MyNode;
715 m.msg_type = MSG_NET;
718 write(lnk->sock,&m,sizeof(MESSAGE));
721 void NETMOD::send_accept_info(NETlink *lnk)
725 m.param.pword[0] = NET_ACCEPT;
726 m.param.pword[1] = MyNode;
727 m.msg_type = MSG_NET;
730 write(lnk->sock,&m,sizeof(MESSAGE));
733 void NETMOD::send_code_ack(NETlink *lnk)
737 m.param.pword[0] = NET_CODESTREAM_OK;
738 m.msg_type = MSG_NET;
741 write(lnk->sock,&m,sizeof(MESSAGE));
745 void NETMOD::send_to_all(MESSAGE *msg)
748 pomlink=Links.first();
749 while (pomlink!=NULL)
751 write(pomlink->sock,msg,sizeof(MESSAGE));
752 pomlink=Links.next();
769 void NETMOD::exit_sequence()
773 ::close(kernel_sock);
774 ::close(listen_sock);
776 pomlink = Links.first();
777 while (pomlink!=NULL)
779 ::close(pomlink->sock);
780 pomlink=Links.next();
785 void NETMOD::disconnect_seq()
790 bzero(&m,sizeof(MESSAGE));
791 m.msg_type = MSG_NET;
792 m.param.pword[0] = NET_DISCONNECT;
793 m.param.pword[1] = MyNode;
810 NETlink *NETMOD::findNETlink(int node)
813 pomlink=Links.first();
816 if (pomlink->node_number == node) return(pomlink);
817 pomlink=Links.next();
822 INTlink *NETMOD::findINTlink(int id)
825 pomlink=Interpreters.first();
828 if (pomlink->ID == id) return(pomlink);
829 pomlink=Interpreters.next();
835 /* ---------------- Sending code to a remote node -------------- */
837 void NETMOD::transmit_file(int ton, char *fname, int fromINT)
843 unsigned char buffer[FILE_BUFFER_SIZE];
848 struct sockaddr_in svr;
852 // **************** CCD FILE
860 msg.param.pword[1] = ftell(f);
864 strcpy(b,rindex(fname,'/'));
865 for(i=0;i<strlen(b);i++)
867 msg.param.pword[0] = NET_CCD_START;
868 strcpy(msg.param.pstr,b);
870 outlink = findNETlink(ton);
871 if (outlink==NULL) exit(1);
872 bzero(&svr,sizeof(svr));
873 sock = socket(AF_INET,SOCK_STREAM,0);
874 svr.sin_family = AF_INET;
875 svr.sin_addr.s_addr = INADDR_ANY;
876 svr.sin_port = htons(CODEPORT);
877 bind(sock, (struct sockaddr*)&svr, sizeof(svr));
879 send_to_node(outlink, &msg);
881 FD_ZERO(&rset);FD_ZERO(&wset);
883 if (select(sock+1,&rset,&wset,0,0))
884 if (FD_ISSET(sock,&rset))
885 tsock = accept(sock, (struct sockaddr*)&svr,&sz );
891 i = fread(&buffer,1,sizeof(buffer),f);
892 write(tsock,&buffer,i);
893 FD_ZERO(&rset);FD_ZERO(&wset);
895 select(tsock+1,&rset,&wset,0,0);
903 sprintf(b,"Cannot open file to send %s\n",fname);
908 // *************** PCD FILE
916 msg.param.pword[1] = ftell(f);
920 strcpy(b,rindex(fname,'/'));
921 for(i=0;i<strlen(b);i++)
923 msg.param.pword[0] = NET_PCD_START;
924 strcpy(msg.param.pstr,b);
926 outlink = findNETlink(ton);
927 if (outlink==NULL) exit(1);
928 bzero(&svr,sizeof(svr));
929 sock = socket(AF_INET,SOCK_STREAM,0);
930 svr.sin_family = AF_INET;
931 svr.sin_addr.s_addr = INADDR_ANY;
932 svr.sin_port = htons(CODEPORT1);
933 bind(sock, (struct sockaddr*)&svr, sizeof(svr));
935 send_to_node(outlink, &msg);
937 FD_ZERO(&rset);FD_ZERO(&wset);
939 if (select(sock+1,&rset,&wset,0,0))
940 if (FD_ISSET(sock,&rset))
941 tsock = accept(sock, (struct sockaddr*)&svr,&sz );
947 i = fread(&proto,1,sizeof(proto),f);
948 write(tsock,&proto,i);
949 FD_ZERO(&rset);FD_ZERO(&wset);
951 select(tsock+1,&rset,&wset,0,0);
959 sprintf(b,"Cannot open file to send %s\n",fname);
964 msg.msg_type = MSG_NET;
965 msg.param.pword[0] = NET_TRANSMITTED;
966 msg.param.pword[1] = fromINT;
967 send_to_kernel(&msg);
974 void NETMOD::conn_info(int sk)
981 m.msg_type = MSG_NET;
982 m.param.pword[0] = NET_INFO;
983 strcpy(m.param.pstr,"");
987 sprintf(poms,"%d=%s;",pom->node_number,pom->addr);
988 strcat(m.param.pstr,poms);
993 write(sk,&m,sizeof(MESSAGE));
1001 write(sk,&m,sizeof(MESSAGE));
1003 m.msg_type = MSG_NET;
1004 m.param.pword[0] = NET_INFO_END;
1005 write(sk,&m,sizeof(MESSAGE));
1008 int main(int argc,char **argv)
1010 NETMOD netter(argv[1]);