fce3aee3f843a0576fb00a0f8fdf271d032c299c
[vlp.git] / src / net / lognet.cpp
1 #include "genint1.h"
2 #include "comm.h"
3
4 #include <sys/socket.h>
5 #include <sys/un.h>
6 #include <netinet/in.h>
7 #include <errno.h>
8 #include <netdb.h>
9 #include <arpa/inet.h>
10
11 #include <sys/types.h>
12 #include <sys/time.h>
13 #include <stdlib.h>
14 #include <stdio.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/stat.h>
18 #include <string.h>
19 #include <qlist.h>
20 #include <qfile.h>
21 #include <qstring.h>
22 #include <qstringlist.h>
23 #include <unistd.h>
24
25 #include <libconfig.h>
26 #include "AppConfiguration.h"
27
28 #define REMOTE_PATH "REMOTE"
29 #define MAXLINKS 30
30 #define LOGPORT 3600
31 #define CODEPORT 3700
32 #define CODEPORT1 3800
33 #define MAXINTERP 10
34 #define FILE_BUFFER_SIZE 2048
35  
36
37
38 // ************** Interpreter slot *******************
39 class INTlink
40 {
41  public:
42   int sock,ID;
43   bool connected;
44   INTlink();
45
46 };
47
48 INTlink::INTlink()
49 {
50  connected=FALSE;
51  sock=0;
52 }
53
54
55 // ********************  Network slot ********************
56 class NETlink
57 {
58  public:
59  int sock;
60  bool connected,code_transmit;
61  char addr[255];
62
63  int node_number;
64  int aliases[5];
65
66  FILE *CodeFile;
67  char CodeName[255];
68  long CodeSize;
69
70
71  NETlink();
72 };
73
74 NETlink::NETlink()
75 {
76  int i;
77  for(i=0;i<5;i++) aliases[i]=-1;
78  connected=FALSE;
79  sock=0;
80  code_transmit=FALSE;
81 }
82
83
84
85
86 //********************** NET Module ****************************
87
88 class NETMOD
89 {
90 public:
91  int kernel_sock,listen_sock;
92  bool all_connected,local_mode;
93  int to_connect,MyNode;
94  char kername[256];
95
96
97  QList<INTlink> Interpreters; // List of the Interpeter slots
98  QList<NETlink> Links;       //  List of the Network slots
99
100  NETMOD(char*);
101
102  void load_config(char*);    
103  void write_at_console(char*);
104  void send_to_kernel(MESSAGE*);
105  void sock_reopen(NETlink*);
106  void send_connect_info(NETlink*);
107  void send_accept_info(NETlink*);
108  void send_to_node(NETlink*,MESSAGE*);
109  void send_to_int(MESSAGE*);
110  void send_code_ack(NETlink*);
111  void send_to_all(MESSAGE *);
112
113  NETlink *findNETlink(int node);
114  INTlink *findINTlink(int id);
115  void transmit_file(int ton, char *fname, int fromINT);
116  void propagate_msg(MESSAGE *msg);
117  void check_node(int,int);
118
119  void run();
120  void exit_sequence();
121  void disconnect_seq();
122  void connect_seq(char*);
123  void accept_connection();
124  void get_internal();
125  void remote_messages(); 
126  void check_links();
127  void get_message(NETlink*);
128  void conn_info(int);
129
130  void doitall(); // 2010
131 };
132
133
134 NETMOD::NETMOD(char *kernel_name)
135 {
136  int i,len,on;
137  struct sockaddr_in svr;
138  struct sockaddr_un svr1;
139  MESSAGE m;
140  char s[256];
141
142
143  Links.clear();
144  Interpreters.clear();
145
146  bzero(&svr, sizeof(svr)); 
147  listen_sock = socket(AF_INET, SOCK_STREAM, 0);
148  svr.sin_family = AF_INET;
149  svr.sin_addr.s_addr = INADDR_ANY;
150  svr.sin_port = htons(LOGPORT);
151  bind(listen_sock, (struct sockaddr*)&svr, sizeof(svr));
152  listen(listen_sock,5);
153  fcntl(listen_sock, F_SETFL,O_NONBLOCK | fcntl(listen_sock, F_GETFL,0));
154
155  to_connect=0;
156  all_connected=FALSE;
157  load_config("vlp.cfg");
158
159  kernel_sock = socket(AF_UNIX,SOCK_STREAM,0);
160  bzero(&svr1,sizeof(svr1));
161  svr1.sun_family = AF_UNIX;
162  strcpy(svr1.sun_path,kernel_name);
163  strcpy(kername,kernel_name);
164  len = strlen(svr1.sun_path)+sizeof(svr1.sun_family);
165  i = connect(kernel_sock,(struct sockaddr*)&svr1,len);
166  if (i==0)
167  fcntl(kernel_sock,F_SETFL, O_NONBLOCK|fcntl(kernel_sock,F_GETFL,0));
168  on=1;
169  setsockopt(kernel_sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
170  m.msg_type = MSG_NET;
171  m.param.pword[0] = NET_NODE;
172  m.param.pword[1] = MyNode;
173  send_to_kernel(&m);
174
175  // if (regme) regme_sequence();
176
177  if (to_connect > 0){
178  write_at_console("Connecting remote VLPs...");  
179  while (!all_connected) check_links();
180                     }
181  sprintf(s,"Local node number %d",MyNode);
182  write_at_console(s);
183 }
184
185 // #####################  Load configuration ##########################
186
187 void NETMOD::load_config(char *fname)
188 {
189   config_t cfg;
190   const char *str;
191   int on,k=0;
192   NETlink *pomlink;
193
194   config_init(&cfg);
195   
196   
197   /* Hack for checking if file exists without using external libs.*/
198   FILE * file = fopen(fname, "r");
199   if (!file) {
200     fprintf(stderr, "Error: Cannot load configuration file %s!\n", fname);
201     write_at_console("Cannot load configuration file!");
202     exit(3);
203   }
204   /* File exists, so file has been locked. Release it. */
205   fclose(file);
206   
207   /* Read the file. If there is an error, report it and exit. */
208   if(!config_read_file(&cfg, fname)) 
209   {
210     AppConfiguration::error(&cfg);
211     config_destroy(&cfg);
212     exit(3);/* from original code. */
213   }
214   
215   if(config_lookup_int(&cfg, "node_number", &MyNode))
216   {
217     if (MyNode==-1) {
218       write_at_console("Node number must be specified");
219       config_destroy(&cfg);
220       exit(1);
221     };
222   }
223   else
224   {
225     AppConfiguration::error(&cfg);
226     config_destroy(&cfg);
227     exit(1);
228   }
229
230   if(config_lookup_string(&cfg, "host", &str)) {
231     k++;
232     pomlink = new NETlink;
233     strcpy(pomlink->addr, str);
234     pomlink->connected = FALSE;
235     pomlink->sock = socket(AF_INET, SOCK_STREAM, 0); 
236     fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,F_GETFL,0));
237     on=1; 
238     setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on)); 
239     Links.append(pomlink); 
240     to_connect++;
241   }
242   else {
243     AppConfiguration::error(&cfg);
244   }
245
246   config_destroy(&cfg);
247
248   if (k==0) all_connected=TRUE;
249 }
250
251
252
253 void NETMOD::write_at_console(char *s)
254 {
255  MESSAGE msg;
256
257  msg.msg_type = MSG_NET;
258  msg.param.pword[0] = NET_CSWRITELN;
259  strcpy(msg.param.pstr,s);
260  send_to_kernel(&msg);
261 }
262
263 void NETMOD::send_to_kernel(MESSAGE *msg)
264 {
265  write(kernel_sock,msg,sizeof(MESSAGE));
266 }
267
268 void NETMOD::send_to_node(NETlink *lnk, MESSAGE *msg)
269 {
270  msg->msg_type = MSG_NET;
271 // msg2netmsg(msg);
272 if (lnk->sock)
273  write(lnk->sock,msg,sizeof(MESSAGE));
274 }
275
276 void NETMOD::send_to_int(MESSAGE *msg)
277 {
278  INTlink *pomlink;
279  
280  pomlink = findINTlink(msg->param.pword[5]);
281  if (pomlink!=NULL) write(pomlink->sock,msg,sizeof(MESSAGE));
282 }
283
284
285
286 void NETMOD::accept_connection()
287 {
288  unsigned int sz;
289  int nsock, on;
290  struct sockaddr_in svr;
291  fd_set rset,wset;
292  struct timeval tout = {0,0};
293  NETlink *pomlink;
294
295  FD_ZERO(&rset);FD_ZERO(&wset);
296  FD_SET(listen_sock,&rset);
297  
298  if (select(listen_sock+1,&rset,&wset,0,(struct timeval *)&tout)>0)
299   if (FD_ISSET(listen_sock,&rset))
300  {
301  
302 /* accept connection on listen socket */
303  sz = sizeof(svr);
304  bzero(&svr, sizeof(svr));
305  nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);   
306
307  if (nsock>0)
308   {
309     
310    /* i<0 someone wants to connect us */
311       
312        pomlink = new NETlink;
313        strcpy(pomlink->addr,inet_ntoa(svr.sin_addr));
314        pomlink->sock = nsock;
315        pomlink->connected = TRUE;
316        fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock, 
317            F_GETFL,0));
318        on=1;
319        setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
320        Links.append(pomlink);
321   } /* nsock > 0 */
322 } /* ISSET */ 
323 }
324
325
326 void NETMOD::check_node(int n, int sc)
327 {
328  MESSAGE m;
329  NETlink *pomlink;
330  
331  m.msg_type = MSG_NET;
332  m.param.pword[0] = NET_NODE_EXIST;
333  
334  pomlink = Links.first();
335  m.param.pword[1] = 0;
336  while (pomlink!=NULL)
337  {
338   if ( pomlink->node_number==n )
339    {
340      m.param.pword[1] = 1;
341      strcpy(m.param.pstr,pomlink->addr);
342      break;
343    }
344   pomlink = Links.next();
345  }
346  write(sc,&m,sizeof(MESSAGE));
347 }
348
349
350 // ************* Internal message from kernel or INT *******************
351
352 void NETMOD::get_internal()
353 {
354  int nr,nrset;
355  MESSAGE msg;
356  int si, sj;
357  fd_set readset,writeset;
358  struct timeval tout={0,0};
359  INTlink *pomlink;
360  struct sockaddr_un svr;
361
362  
363  
364  FD_ZERO(&readset);FD_ZERO(&writeset);
365  FD_SET(kernel_sock,&readset);
366  nrset=kernel_sock;
367
368  pomlink = Interpreters.first();
369  while (pomlink!=NULL)
370  {
371   FD_SET(pomlink->sock,&readset);
372   if (nrset<pomlink->sock) nrset=pomlink->sock;
373   pomlink=Interpreters.next();
374  }
375
376  if (select(nrset+1,&readset,&writeset,0,(struct timeval *)&tout)>0)
377  { 
378  
379 /* Check request sockets */
380  pomlink = Interpreters.first();
381  while (pomlink!=NULL)
382  {
383    if (FD_ISSET(pomlink->sock,&readset))
384    {
385      nr = read(pomlink->sock,&msg,sizeof(MESSAGE));
386      if (nr>0)
387       {
388           if (msg.msg_type == MSG_NET)
389              switch(msg.param.pword[0])
390                {
391                  case NET_PROPAGATE:propagate_msg(&msg);break;
392                  case NET_NODE_EXIST:check_node(msg.param.pword[1],pomlink->sock);break;
393                  case NET_GET_INFO: conn_info(pomlink->sock);break;
394                  case NET_NODES_NUM: msg.param.pword[0]=NET_NODES_NUM_RESPONSE;
395                                      msg.param.pword[1]=Links.count();
396                                      write(pomlink->sock,&msg,sizeof(MESSAGE));
397                                      break; 
398                }/* switch */
399       }
400    } /* ISSET */
401    pomlink=Interpreters.next();
402  } // while
403  
404 /* Check internal socket */
405  if (FD_ISSET(kernel_sock,&readset))
406  {
407   nr = read(kernel_sock, &msg, sizeof(MESSAGE));
408   if (nr>0)
409   {
410    if (msg.msg_type == MSG_NET)
411    {
412      switch(msg.param.pword[0])
413      { 
414        case NET_TRANSMIT_CODE:
415                  transmit_file(msg.param.pword[2],msg.param.pstr,msg.param.pword[1]);  
416                  break;
417        case NET_EXIT: { disconnect_seq();exit_sequence();}
418                    break;
419        case NET_GET_INFO: conn_info(kernel_sock);break;                          
420        case NET_PROPAGATE:propagate_msg(&msg);break;
421        case NET_DISCONNECT:disconnect_seq();
422                    break;            
423        case NET_NODE_EXIST: check_node(msg.param.pword[1],kernel_sock);break;
424        case NET_CONNECT_TO: connect_seq(msg.param.pstr);
425
426         } /* end switch */
427     } /* MSg_NET */
428
429    if (msg.msg_type == MSG_VLP)
430     switch(msg.param.pword[0])
431     {
432      case VLP_REGINT:
433         {
434            pomlink = new INTlink;
435            pomlink->sock = socket(AF_UNIX,SOCK_STREAM,0);
436            bzero(&svr,sizeof(svr));
437            svr.sun_family = AF_UNIX;
438            strcpy(svr.sun_path,msg.param.pstr);
439            si = strlen(svr.sun_path)+sizeof(svr.sun_family);
440            sj = connect(pomlink->sock,(struct sockaddr*)&svr,si);
441            if (sj==0)
442             fcntl(pomlink->sock,F_SETFL, O_NONBLOCK|
443                    fcntl(pomlink->sock,F_GETFL,0));
444            int on=1;
445            setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
446            pomlink->ID = msg.param.pword[1];
447            pomlink->connected=TRUE;
448            Interpreters.append(pomlink);
449
450          };break;
451      case VLP_INTERPRETER_DOWN:
452         {
453           pomlink = findINTlink(msg.param.pword[1]);
454           if (pomlink!=NULL)
455           {
456            close(pomlink->sock);
457            Interpreters.remove(pomlink);
458           } 
459          };break;
460             
461      break;
462     } /* MSg_VLP */
463    }
464   }/* ISSET */
465  } /* select >0 */  
466
467
468 }
469
470 void NETMOD::get_message(NETlink *lnk)
471 {
472  int nr;
473  MESSAGE msg;
474  char pomstr[80];
475  int rdbt,rd,sz,j,psock;
476  struct sockaddr_in svr;
477  unsigned char buffer[FILE_BUFFER_SIZE];
478  protdescr proto;
479
480 if (lnk->connected)
481 {
482   nr = read(lnk->sock,&msg, sizeof(MESSAGE));
483   if (nr>0)
484    {
485 //     netmsg2msg(&msg);
486      if (msg.msg_type == MSG_NET)
487       {
488         switch(msg.param.pword[0])
489         {
490          case NET_CCD_START:
491                      
492                      lnk->code_transmit = TRUE;
493                      sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
494                      strcat(pomstr,".ccd");
495                      lnk->CodeFile = fopen(pomstr,"wb");
496                      if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
497                      lnk->code_transmit=FALSE;}
498                      lnk->CodeSize=msg.param.pword[1];  
499                      psock = socket(AF_INET, SOCK_STREAM, 0); 
500                      bzero(&svr, sizeof(svr));
501                      svr.sin_family = AF_INET;
502                      svr.sin_port = htons(CODEPORT);
503                      svr.sin_addr.s_addr = inet_addr(lnk->addr);
504                      j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
505                      if ( j==0) 
506                      {
507                       //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
508                        sz=0;
509                        while (sz<lnk->CodeSize)
510                        {
511                          rd = read(psock,&buffer,sizeof(buffer));
512                          rdbt = fwrite(&buffer,sizeof(unsigned char),rd,lnk->CodeFile);
513                          sz=sz+rd; 
514                         }
515                        close(psock);
516                        fclose(lnk->CodeFile);
517                       }
518                      
519                      break;
520
521          case NET_PCD_START:
522                      
523                      lnk->code_transmit = TRUE;
524                      sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
525                      strcat(pomstr,".pcd");
526                      lnk->CodeFile = fopen(pomstr,"wb");
527                      if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
528                      lnk->code_transmit=FALSE;}
529                      lnk->CodeSize=msg.param.pword[1];  
530                      psock = socket(AF_INET, SOCK_STREAM, 0); 
531                      bzero(&svr, sizeof(svr));
532                      svr.sin_family = AF_INET;
533                      svr.sin_port = htons(CODEPORT1);
534                      svr.sin_addr.s_addr = inet_addr(lnk->addr);
535                      j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
536                      if ( j==0) 
537                      {
538                       //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
539                        sz=0;
540                        while (sz<lnk->CodeSize)
541                        {
542                          rd = read(psock,&proto,sizeof(proto));
543                          rdbt = fwrite(&proto,sizeof(unsigned char),rd,lnk->CodeFile);
544                          sz=sz+rd; 
545                         }
546                        close(psock);
547                        fclose(lnk->CodeFile);
548                       }
549                      
550                      break;
551
552          case NET_CONNECT:
553                      
554                      sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
555                      lnk->addr);
556                      lnk->node_number = msg.param.pword[1];
557                      write_at_console(pomstr);
558                      send_accept_info(lnk);
559                      break;
560          case NET_ACCEPT:
561                     
562                     sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
563                     lnk->addr);
564                     lnk->node_number = msg.param.pword[1];
565                     write_at_console(pomstr);
566                     break;
567          case NET_DISCONNECT:
568                     
569                     sprintf(pomstr,"Node: %d disconnected",msg.param.pword[1]);
570                     write_at_console(pomstr);
571                     ::close(lnk->sock);
572                     Links.remove(lnk);
573                     delete(lnk);
574                     break; 
575          case NET_PROPAGATE:
576                
577                if (msg.param.pword[1] == MSG_VLP) send_to_kernel(&msg);
578                else if (msg.param.pword[1] == MSG_INT) send_to_int(&msg);
579                break;            
580
581                 } /* end switch */
582         
583        }   
584      } /* nr > 0 */
585
586 } /* end if used */
587 }
588
589
590
591 void NETMOD::remote_messages()
592 {
593  int max;
594  fd_set rset,wset;
595  struct timeval tout={0,0};
596  NETlink *pomlink;
597  
598  FD_ZERO(&rset);FD_ZERO(&wset);
599  max=0;
600  
601  pomlink = Links.first();
602  while (pomlink!=NULL)
603  {
604   if (pomlink->connected)
605   {
606     FD_SET(pomlink->sock,&rset);
607     if  (max<pomlink->sock) max=pomlink->sock;
608   }
609   pomlink=Links.next();
610  }
611  
612  if (select(max+1,&rset,&wset,0,(struct timeval *)&tout)>0)
613  {
614   pomlink=Links.first();
615   while (pomlink!=NULL)
616   {
617    if (FD_ISSET(pomlink->sock,&rset)) get_message(pomlink);
618    pomlink=Links.next();
619   }
620  }  
621
622 }
623
624
625 /******************************    2010 ********************************************/
626 void NETMOD::doitall() {
627  unsigned int sz;
628  int nsock,max=0,on, nr, si, sj;
629  struct sockaddr_in svr;
630  fd_set rset;
631  NETlink *pomlink;
632  INTlink *pomlink2;
633  struct sockaddr_un svr2;
634  MESSAGE msg;
635
636
637  FD_ZERO(&rset);
638  FD_SET(listen_sock,&rset);
639  max = listen_sock;
640  FD_SET(kernel_sock,&rset);
641  if (max<kernel_sock) { max=kernel_sock;}
642  pomlink2 = Interpreters.first();
643  while (pomlink2!=NULL)
644  {
645   FD_SET(pomlink2->sock,&rset);
646   if (max<pomlink2->sock) max=pomlink2->sock;
647   pomlink2=Interpreters.next();
648  }
649  pomlink = Links.first();
650  while (pomlink!=NULL)
651  {
652   if (pomlink->connected)
653   {
654     FD_SET(pomlink->sock,&rset);
655     if  (max<pomlink->sock) max=pomlink->sock;
656   }
657   pomlink=Links.next();
658  }
659   
660  // odczyt
661  if (select(max+1,&rset,0,0,NULL) > 0) {
662     // accept connection
663        if (FD_ISSET(listen_sock,&rset))
664         {
665           /* accept connection on listen socket */
666           sz = sizeof(svr);
667           bzero(&svr, sizeof(svr));
668           nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);   
669           if (nsock>0)
670           {   
671             pomlink = new NETlink;
672             strcpy(pomlink->addr,inet_ntoa(svr.sin_addr));
673             pomlink->sock = nsock;
674             pomlink->connected = TRUE;
675             fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,F_GETFL,0));
676             on=1;
677             setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
678             Links.append(pomlink);
679            } /* nsock > 0 */
680         } /* ISSET */ 
681
682     // get internal message
683        /* Check request sockets */
684        pomlink2 = Interpreters.first();
685        while (pomlink2!=NULL)
686        {
687          if (FD_ISSET(pomlink2->sock,&rset))
688          {
689            nr = read(pomlink2->sock,&msg,sizeof(MESSAGE));
690            if (nr>0)
691            {
692              if (msg.msg_type == MSG_NET)
693                switch(msg.param.pword[0])
694                {
695                  case NET_PROPAGATE:propagate_msg(&msg); break;
696                  case NET_NODE_EXIST:check_node(msg.param.pword[1],pomlink2->sock);break;
697                  case NET_GET_INFO: conn_info(pomlink2->sock);break;
698                  case NET_NODES_NUM: msg.param.pword[0]=NET_NODES_NUM_RESPONSE;
699                                      msg.param.pword[1]=Links.count();
700                                      write(pomlink2->sock,&msg,sizeof(MESSAGE));
701                                      break; 
702                }/* switch */
703            }
704          } /* ISSET */
705          pomlink2=Interpreters.next();
706        } // while
707        /* Check internal socket */
708        if (FD_ISSET(kernel_sock,&rset))
709        {
710           nr = read(kernel_sock, &msg, sizeof(MESSAGE));
711           if (nr>0)
712           {
713             if (msg.msg_type == MSG_NET)
714             {
715               switch(msg.param.pword[0])
716               { 
717                  case NET_TRANSMIT_CODE:
718                  
719                  transmit_file(msg.param.pword[2],msg.param.pstr,msg.param.pword[1]);  
720                  
721                  break;
722                  case NET_EXIT: {  disconnect_seq();exit_sequence();}
723                  break;
724                  case NET_GET_INFO: conn_info(kernel_sock);break;                          
725                  case NET_PROPAGATE:propagate_msg(&msg);break;
726                  case NET_DISCONNECT:disconnect_seq();
727                    break;            
728                  case NET_NODE_EXIST: check_node(msg.param.pword[1],kernel_sock);break;
729                  case NET_CONNECT_TO: connect_seq(msg.param.pstr);
730               } /* end switch */
731             } /* MSG_NET */
732             if (msg.msg_type == MSG_VLP)
733               switch(msg.param.pword[0])
734               {
735                  case VLP_REGINT:
736                  {
737                     
738                     pomlink2 = new INTlink;
739                     pomlink2->sock = socket(AF_UNIX,SOCK_STREAM,0);
740                     bzero(&svr2,sizeof(svr2));
741                     svr2.sun_family = AF_UNIX;
742                     strcpy(svr2.sun_path,msg.param.pstr);
743                     si = strlen(svr2.sun_path)+sizeof(svr2.sun_family);
744                     sj = connect(pomlink2->sock,(struct sockaddr*)&svr2,si);
745                     if (sj==0)
746                       fcntl(pomlink2->sock,F_SETFL, O_NONBLOCK|fcntl(pomlink2->sock,F_GETFL,0));
747                     int on=1;
748                     setsockopt(pomlink2->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
749                     pomlink2->ID = msg.param.pword[1];
750                     pomlink2->connected=TRUE;
751                     Interpreters.append(pomlink2);
752                     
753                  };break;
754                  case VLP_INTERPRETER_DOWN:
755                  { 
756                    
757                    pomlink2 = findINTlink(msg.param.pword[1]);
758                    if (pomlink2!=NULL)
759                    {
760                      close(pomlink2->sock);
761                      Interpreters.remove(pomlink2);
762                    } 
763                  };break;                       
764               } /* MSg_VLP */
765            } // nr > 0
766        }/* ISSET */
767
768     // get remote message
769
770     pomlink=Links.first();
771     while (pomlink!=NULL)
772     {
773       if (FD_ISSET(pomlink->sock,&rset)) get_message(pomlink);
774       pomlink=Links.next();
775     }
776  } // select 
777
778
779 }
780 /******************************    END 2010 ********************************************/
781
782
783
784 void NETMOD::propagate_msg(MESSAGE *msg)
785 {
786  char ss[255];
787  NETlink *pomlink;
788  
789  pomlink = findNETlink(msg->param.pword[4]);
790  if ((pomlink!=NULL)&&(pomlink->connected))
791   send_to_node(pomlink,msg);
792  else { 
793        if (msg->param.pword[1]==MSG_INT)
794        {
795         send_to_int(msg);
796         }
797        else
798        {
799        sprintf(ss,"Not connected to Node %d",msg->param.pword[4]);  
800        write_at_console(ss);
801        }
802       }      
803
804 }
805
806
807 void NETMOD::connect_seq(char *a)
808 {
809  NETlink *pom;
810  struct sockaddr_in svr;
811  int j,on;
812
813
814  pom=Links.first();
815  while (pom!=NULL)
816  {
817   if (strcmp(pom->addr,a)==0) return;
818   pom=Links.next();
819   }
820      pom = new NETlink;
821      strcpy(pom->addr,a);
822      pom->connected = FALSE;
823      pom->sock = socket(AF_INET, SOCK_STREAM, 0); 
824      bzero(&svr, sizeof(svr));
825      svr.sin_family = AF_INET;
826      svr.sin_port = htons(LOGPORT);
827      svr.sin_addr.s_addr = inet_addr(pom->addr);
828      j = connect(pom->sock, (struct sockaddr*)&svr, sizeof(svr));
829      if ( j==0) 
830           { pom->connected = TRUE;
831            fcntl(pom->sock, F_SETFL,O_NONBLOCK | fcntl(pom->sock, 
832            F_GETFL,0));
833            on=1;
834            setsockopt(pom->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
835            send_connect_info(pom);
836            Links.append(pom); 
837           }
838       else write_at_console("Connection failed");
839 }
840
841 void NETMOD::check_links()
842 {
843
844 int j=1;
845 struct sockaddr_in svr;
846 NETlink *pomlink;
847
848 /* connect to all other nodes */
849 if (!all_connected)
850 {
851  pomlink=Links.first();
852  while (pomlink!=NULL)
853  {
854  if ( !(pomlink->connected) )
855  {
856    bzero(&svr, sizeof(svr));
857    svr.sin_family = AF_INET;
858    svr.sin_port = htons(LOGPORT);
859    svr.sin_addr.s_addr = inet_addr(pomlink->addr);
860   
861    j = connect(pomlink->sock, (struct sockaddr*)&svr, sizeof(svr));
862    if ( j==0) 
863           { pomlink->connected = TRUE;
864            fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock, 
865            F_GETFL,0));
866            send_connect_info(pomlink);
867
868            }
869    if (errno == ECONNREFUSED) 
870            sock_reopen(pomlink);
871  } // not connected
872   pomlink=Links.next();
873  } //while
874
875 }//if
876 all_connected=TRUE;
877 pomlink=Links.first();
878 while(pomlink!=NULL)
879 {
880  if (pomlink->connected==FALSE) {all_connected=FALSE;break;}
881  pomlink=Links.next();
882 }
883
884 }
885
886 void NETMOD::sock_reopen(NETlink *lnk)
887 {
888  int on=1;
889
890  close(lnk->sock);
891  lnk->sock = socket(AF_INET, SOCK_STREAM, 0); 
892  fcntl(lnk->sock, F_SETFL,O_NONBLOCK | fcntl(lnk->sock, 
893            F_GETFL,0));
894  setsockopt(lnk->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
895 }
896
897
898 // **************** Acknowledges *************************
899
900 void NETMOD::send_connect_info(NETlink *lnk)
901 {
902  MESSAGE m;
903
904  m.param.pword[0] = NET_CONNECT;
905  m.param.pword[1] = MyNode;
906  m.msg_type = MSG_NET;
907 // msg2netmsg(&m);
908  if (lnk->sock)
909  write(lnk->sock,&m,sizeof(MESSAGE));
910 }
911
912 void NETMOD::send_accept_info(NETlink *lnk)
913 {
914  MESSAGE m;
915
916  m.param.pword[0] = NET_ACCEPT;
917  m.param.pword[1] = MyNode;
918  m.msg_type = MSG_NET;
919 // msg2netmsg(&m);
920  if (lnk->sock)
921  write(lnk->sock,&m,sizeof(MESSAGE));
922 }
923
924 void NETMOD::send_code_ack(NETlink *lnk)
925 {
926 MESSAGE m;
927
928  m.param.pword[0] = NET_CODESTREAM_OK;
929  m.msg_type = MSG_NET;
930 // msg2netmsg(&m);
931  if (lnk->sock)
932  write(lnk->sock,&m,sizeof(MESSAGE));
933 }
934
935
936 void NETMOD::send_to_all(MESSAGE *msg)
937 {
938  NETlink *pomlink;
939  pomlink=Links.first();
940  while (pomlink!=NULL)
941  {
942   write(pomlink->sock,msg,sizeof(MESSAGE));
943   pomlink=Links.next();
944  }
945 }
946
947
948
949
950 void NETMOD::run()
951 {
952   while(1)
953  {
954   /*accept_connection();
955   get_internal();
956   remote_messages(); */
957   // 2010
958   doitall();
959  }
960 }
961
962 void NETMOD::exit_sequence()
963 {
964  NETlink *pomlink;
965
966  ::close(kernel_sock);
967  ::close(listen_sock);
968  unlink(kername);
969  pomlink = Links.first();
970  while (pomlink!=NULL)
971  {
972   ::close(pomlink->sock);
973   pomlink=Links.next();
974   }
975  exit(0);
976 }
977
978 void NETMOD::disconnect_seq()
979 {
980  MESSAGE m;
981  NETlink *p;
982
983  bzero(&m,sizeof(MESSAGE));
984  m.msg_type = MSG_NET;
985  m.param.pword[0] = NET_DISCONNECT;
986  m.param.pword[1] = MyNode;
987
988  p=Links.first();
989  while(p!=NULL)
990  {
991   send_to_node(p,&m);
992   p=Links.next();
993  }
994  p=Links.first();
995  while(p!=NULL)
996  {
997   ::close(p->sock);
998   p=Links.next();
999  }
1000  Links.clear();
1001 }
1002
1003 NETlink *NETMOD::findNETlink(int node)
1004 {
1005  NETlink *pomlink;
1006  pomlink=Links.first();
1007  while(pomlink!=NULL)
1008  {
1009   if (pomlink->node_number == node) return(pomlink);
1010   pomlink=Links.next();
1011  } 
1012  return(pomlink);
1013 }
1014
1015 INTlink *NETMOD::findINTlink(int id)
1016 {
1017  INTlink *pomlink;
1018  pomlink=Interpreters.first();
1019  while(pomlink!=NULL)
1020  {
1021   if (pomlink->ID == id) return(pomlink);
1022   pomlink=Interpreters.next();
1023  } 
1024  return(pomlink);
1025 }
1026
1027
1028 /* ----------------    Sending code to a remote node -------------- */
1029
1030 void NETMOD::transmit_file(int ton, char *fname, int fromINT)
1031 {
1032  FILE *f;
1033  MESSAGE msg;
1034  char fn[80];
1035  char b[255];
1036  unsigned char buffer[FILE_BUFFER_SIZE];
1037  protdescr proto;
1038  int i,tsock,sock;
1039  unsigned int sz;
1040  NETlink *outlink;
1041  struct sockaddr_in svr;
1042  fd_set rset,wset;
1043
1044
1045 // **************** CCD FILE
1046
1047  strcpy(fn,fname);
1048  strcat(fn,".ccd");
1049  f = fopen(fn,"rb");
1050  if (f!=NULL)
1051  {
1052  fseek(f,0,SEEK_END);
1053  msg.param.pword[1] = ftell(f);
1054  fclose(f);
1055  f = fopen(fn,"rb");
1056  
1057  strcpy(b,rindex(fname,'/'));
1058  for(i=0;i<strlen(b);i++)
1059    b[i] = b[i+1];
1060  msg.param.pword[0] = NET_CCD_START;
1061  strcpy(msg.param.pstr,b);
1062
1063  outlink = findNETlink(ton);
1064  if (outlink==NULL) exit(1);
1065  bzero(&svr,sizeof(svr));
1066  sock = socket(AF_INET,SOCK_STREAM,0);
1067  svr.sin_family = AF_INET;
1068  svr.sin_addr.s_addr = INADDR_ANY;
1069  svr.sin_port = htons(CODEPORT);
1070  bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1071  listen(sock,5);   
1072  send_to_node(outlink, &msg);
1073  sz=sizeof(svr);
1074  FD_ZERO(&rset);FD_ZERO(&wset);
1075  FD_SET(sock,&rset);
1076  if (select(sock+1,&rset,&wset,0,0))
1077   if (FD_ISSET(sock,&rset))
1078     tsock = accept(sock, (struct sockaddr*)&svr,&sz );
1079  if (tsock>0)
1080  {
1081   close(sock);
1082   while (!feof(f))
1083    {
1084     i = fread(&buffer,1,sizeof(buffer),f);
1085     write(tsock,&buffer,i);
1086     FD_ZERO(&rset);FD_ZERO(&wset);
1087     FD_SET(tsock,&wset);
1088     select(tsock+1,&rset,&wset,0,0);
1089     }
1090   close(tsock);
1091  }
1092  fclose(f);
1093  } // f!=NULL
1094   else
1095      {
1096     sprintf(b,"Cannot open file to send %s\n",fname);
1097     write_at_console(b);
1098    }
1099
1100
1101 // *************** PCD FILE
1102  
1103  strcpy(fn,fname);
1104  strcat(fn,".pcd");
1105  f = fopen(fn,"r");
1106   if (f!=NULL)
1107  {
1108  fseek(f,0,SEEK_END);
1109  msg.param.pword[1] = ftell(f);
1110  fclose(f);
1111  f = fopen(fn,"rb");
1112  
1113  strcpy(b,rindex(fname,'/'));
1114  for(i=0;i<strlen(b);i++)
1115    b[i] = b[i+1];
1116  msg.param.pword[0] = NET_PCD_START;
1117  strcpy(msg.param.pstr,b);
1118
1119  outlink = findNETlink(ton);
1120  if (outlink==NULL) exit(1);
1121  bzero(&svr,sizeof(svr));
1122  sock = socket(AF_INET,SOCK_STREAM,0);
1123  svr.sin_family = AF_INET;
1124  svr.sin_addr.s_addr = INADDR_ANY;
1125  svr.sin_port = htons(CODEPORT1);
1126  bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1127  listen(sock,5);   
1128  send_to_node(outlink, &msg);
1129  sz=sizeof(svr);
1130  FD_ZERO(&rset);FD_ZERO(&wset);
1131  FD_SET(sock,&rset);
1132  if (select(sock+1,&rset,&wset,0,0))
1133   if (FD_ISSET(sock,&rset))
1134     tsock = accept(sock, (struct sockaddr*)&svr,&sz );
1135  if (tsock>0)
1136  {
1137   close(sock);
1138   while (!feof(f))
1139    {
1140     i = fread(&proto,1,sizeof(proto),f);
1141     write(tsock,&proto,i);
1142     FD_ZERO(&rset);FD_ZERO(&wset);
1143     FD_SET(tsock,&wset);
1144     select(tsock+1,&rset,&wset,0,0);
1145     }
1146   close(tsock);
1147  }
1148  fclose(f);
1149  } // f!=NULL
1150   else
1151    {
1152     sprintf(b,"Cannot open file to send %s\n",fname);
1153     write_at_console(b);
1154    }
1155  
1156
1157  msg.msg_type = MSG_NET; 
1158  msg.param.pword[0] = NET_TRANSMITTED;
1159  msg.param.pword[1] = fromINT;
1160  send_to_kernel(&msg);
1161  
1162
1163 }
1164
1165
1166
1167 void NETMOD::conn_info(int sk)
1168 {
1169  NETlink *pom;
1170  MESSAGE m;
1171  int k=0;
1172  char poms[255];
1173
1174  m.msg_type = MSG_NET;
1175  m.param.pword[0] = NET_INFO;
1176  strcpy(m.param.pstr,"");
1177  pom=Links.first();
1178  while (pom!=NULL)
1179  {
1180   sprintf(poms,"%d=%s;",pom->node_number,pom->addr);
1181   strcat(m.param.pstr,poms);
1182   k++;
1183   if (k==12)
1184   {
1185     m.param.pword[1]=12;
1186     write(sk,&m,sizeof(MESSAGE));
1187     k=0;
1188    }
1189   pom=Links.next();
1190  }
1191  if (k>0)
1192   {
1193    m.param.pword[1]=k;
1194    write(sk,&m,sizeof(MESSAGE));
1195    }
1196   m.msg_type = MSG_NET;
1197   m.param.pword[0] = NET_INFO_END;
1198   write(sk,&m,sizeof(MESSAGE));
1199 }
1200
1201 int main(int argc,char **argv)
1202 {
1203  NETMOD netter(argv[1]);
1204  netter.run();
1205  return 0;
1206 }