Experimental usage.
[vlp.git] / src / net / lognet.cpp
1 #include "genint1.h"
2 #include "comm.h"
3
4 #include <sys/socket.h>
5 #include <sys/un.h>
6 #include <netinet/in.h>
7 #include <errno.h>
8 #include <netdb.h>
9 #include <arpa/inet.h>
10
11 #include <sys/types.h>
12 #include <sys/time.h>
13 #include <stdlib.h>
14 #include <stdio.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/stat.h>
18 #include <string.h>
19 #include <qlist.h>
20 #include <qfile.h>
21 #include <qstring.h>
22 #include <qstringlist.h>
23 #include <unistd.h>
24
25 #include <libconfig.h>
26 #include "AppConfiguration.h"
27
28 #define REMOTE_PATH "REMOTE"
29 #define MAXLINKS 30
30 #define LOGPORT 3600
31 #define CODEPORT 3700
32 #define CODEPORT1 3800
33 #define MAXINTERP 10
34 #define FILE_BUFFER_SIZE 2048
35  
36
37
38 // ************** Interpreter slot *******************
39 class INTlink
40 {
41  public:
42   int sock,ID;
43   bool connected;
44   INTlink();
45
46 };
47
48 INTlink::INTlink()
49 {
50  connected=FALSE;
51  sock=0;
52 }
53
54
55 // ********************  Network slot ********************
56 class NETlink
57 {
58  public:
59  int sock;
60  bool connected,code_transmit;
61  char addr[255];
62
63  int node_number;
64  int aliases[5];
65
66  FILE *CodeFile;
67  char CodeName[255];
68  long CodeSize;
69
70
71  NETlink();
72 };
73
74 NETlink::NETlink()
75 {
76  int i;
77  for(i=0;i<5;i++) aliases[i]=-1;
78  connected=FALSE;
79  sock=0;
80  code_transmit=FALSE;
81 }
82
83
84
85
86 //********************** NET Module ****************************
87
88 class NETMOD
89 {
90 public:
91  int kernel_sock,listen_sock;
92  bool all_connected,local_mode;
93  int to_connect,MyNode;
94  char kername[256];
95
96
97  QList<INTlink> Interpreters; // List of the Interpeter slots
98  QList<NETlink> Links;       //  List of the Network slots
99
100  NETMOD(char*);
101
102  void load_config(char*);    
103  void write_at_console(char*);
104  void send_to_kernel(MESSAGE*);
105  void sock_reopen(NETlink*);
106  void send_connect_info(NETlink*);
107  void send_accept_info(NETlink*);
108  void send_to_node(NETlink*,MESSAGE*);
109  void send_to_int(MESSAGE*);
110  void send_code_ack(NETlink*);
111  void send_to_all(MESSAGE *);
112
113  NETlink *findNETlink(int node);
114  INTlink *findINTlink(int id);
115  void transmit_file(int ton, char *fname, int fromINT);
116  void propagate_msg(MESSAGE *msg);
117  void check_node(int,int);
118
119  void run();
120  void exit_sequence();
121  void disconnect_seq();
122  void connect_seq(char*);
123  void accept_connection();
124  void get_internal();
125  void remote_messages(); 
126  void check_links();
127  void get_message(NETlink*);
128  void conn_info(int);
129
130  void doitall(); // 2010
131 };
132
133
134 NETMOD::NETMOD(char *kernel_name)
135 {
136  int i,len,on;
137  struct sockaddr_in svr;
138  struct sockaddr_un svr1;
139  MESSAGE m;
140  char s[256];
141
142
143  Links.clear();
144  Interpreters.clear();
145
146  bzero(&svr, sizeof(svr)); 
147  listen_sock = socket(AF_INET, SOCK_STREAM, 0);
148  svr.sin_family = AF_INET;
149  svr.sin_addr.s_addr = INADDR_ANY;
150  svr.sin_port = htons(LOGPORT);
151  bind(listen_sock, (struct sockaddr*)&svr, sizeof(svr));
152  listen(listen_sock,5);
153  fcntl(listen_sock, F_SETFL,O_NONBLOCK | fcntl(listen_sock, F_GETFL,0));
154
155  to_connect=0;
156  all_connected=FALSE;
157  load_config("vlp.cfg");
158
159  kernel_sock = socket(AF_UNIX,SOCK_STREAM,0);
160  bzero(&svr1,sizeof(svr1));
161  svr1.sun_family = AF_UNIX;
162  strcpy(svr1.sun_path,kernel_name);
163  strcpy(kername,kernel_name);
164  len = strlen(svr1.sun_path)+sizeof(svr1.sun_family);
165  i = connect(kernel_sock,(struct sockaddr*)&svr1,len);
166  if (i==0)
167  fcntl(kernel_sock,F_SETFL, O_NONBLOCK|fcntl(kernel_sock,F_GETFL,0));
168  on=1;
169  setsockopt(kernel_sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
170  m.msg_type = MSG_NET;
171  m.param.pword[0] = NET_NODE;
172  m.param.pword[1] = MyNode;
173  send_to_kernel(&m);
174
175  // if (regme) regme_sequence();
176
177  if (to_connect > 0){
178  write_at_console("Connecting remote VLPs...");  
179  while (!all_connected) check_links();
180                     }
181  sprintf(s,"Local node number %d",MyNode);
182  write_at_console(s);
183 }
184
185 // #####################  Load configuration ##########################
186
187 void NETMOD::load_config(char *fname)
188 {
189   int on,k=0;
190   NETlink *pomlink;
191   
192   
193   /* Hack for checking if file exists without using external libs.*/
194   FILE * file = fopen(fname, "r");
195   if (!file) {
196     fprintf(stderr, "Error: Cannot load configuration file %s!\n", fname);
197     write_at_console("Cannot load configuration file!");
198     exit(3);
199   }
200   /* File exists, so file has been locked. Release it. */
201   fclose(file);
202   
203   AppConfiguration config(fname);
204   
205   MyNode = config.getInt("node_number");
206   if (config.error() == TRUE) {
207     write_at_console("Node number must be specified");
208     config.release();
209     exit(1);
210   }
211
212   char host[255];
213   strcpy(host, config.getString("host"));
214   if(config.error() == FALSE)
215   {
216     k++;
217     pomlink = new NETlink;
218     strcpy(pomlink->addr, host);
219     pomlink->connected = FALSE;
220     pomlink->sock = socket(AF_INET, SOCK_STREAM, 0); 
221     fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,F_GETFL,0));
222     on=1; 
223     setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on)); 
224     Links.append(pomlink); 
225     to_connect++;
226   }
227   config.release();
228
229   if (k==0) all_connected=TRUE;
230 }
231
232
233
234 void NETMOD::write_at_console(char *s)
235 {
236  MESSAGE msg;
237
238  msg.msg_type = MSG_NET;
239  msg.param.pword[0] = NET_CSWRITELN;
240  strcpy(msg.param.pstr,s);
241  send_to_kernel(&msg);
242 }
243
244 void NETMOD::send_to_kernel(MESSAGE *msg)
245 {
246  write(kernel_sock,msg,sizeof(MESSAGE));
247 }
248
249 void NETMOD::send_to_node(NETlink *lnk, MESSAGE *msg)
250 {
251  msg->msg_type = MSG_NET;
252 // msg2netmsg(msg);
253 if (lnk->sock)
254  write(lnk->sock,msg,sizeof(MESSAGE));
255 }
256
257 void NETMOD::send_to_int(MESSAGE *msg)
258 {
259  INTlink *pomlink;
260  
261  pomlink = findINTlink(msg->param.pword[5]);
262  if (pomlink!=NULL) write(pomlink->sock,msg,sizeof(MESSAGE));
263 }
264
265
266
267 void NETMOD::accept_connection()
268 {
269  unsigned int sz;
270  int nsock, on;
271  struct sockaddr_in svr;
272  fd_set rset,wset;
273  struct timeval tout = {0,0};
274  NETlink *pomlink;
275
276  FD_ZERO(&rset);FD_ZERO(&wset);
277  FD_SET(listen_sock,&rset);
278  
279  if (select(listen_sock+1,&rset,&wset,0,(struct timeval *)&tout)>0)
280   if (FD_ISSET(listen_sock,&rset))
281  {
282  
283 /* accept connection on listen socket */
284  sz = sizeof(svr);
285  bzero(&svr, sizeof(svr));
286  nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);   
287
288  if (nsock>0)
289   {
290     
291    /* i<0 someone wants to connect us */
292       
293        pomlink = new NETlink;
294        strcpy(pomlink->addr,inet_ntoa(svr.sin_addr));
295        pomlink->sock = nsock;
296        pomlink->connected = TRUE;
297        fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock, 
298            F_GETFL,0));
299        on=1;
300        setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
301        Links.append(pomlink);
302   } /* nsock > 0 */
303 } /* ISSET */ 
304 }
305
306
307 void NETMOD::check_node(int n, int sc)
308 {
309  MESSAGE m;
310  NETlink *pomlink;
311  
312  m.msg_type = MSG_NET;
313  m.param.pword[0] = NET_NODE_EXIST;
314  
315  pomlink = Links.first();
316  m.param.pword[1] = 0;
317  while (pomlink!=NULL)
318  {
319   if ( pomlink->node_number==n )
320    {
321      m.param.pword[1] = 1;
322      strcpy(m.param.pstr,pomlink->addr);
323      break;
324    }
325   pomlink = Links.next();
326  }
327  write(sc,&m,sizeof(MESSAGE));
328 }
329
330
331 // ************* Internal message from kernel or INT *******************
332
333 void NETMOD::get_internal()
334 {
335  int nr,nrset;
336  MESSAGE msg;
337  int si, sj;
338  fd_set readset,writeset;
339  struct timeval tout={0,0};
340  INTlink *pomlink;
341  struct sockaddr_un svr;
342
343  
344  
345  FD_ZERO(&readset);FD_ZERO(&writeset);
346  FD_SET(kernel_sock,&readset);
347  nrset=kernel_sock;
348
349  pomlink = Interpreters.first();
350  while (pomlink!=NULL)
351  {
352   FD_SET(pomlink->sock,&readset);
353   if (nrset<pomlink->sock) nrset=pomlink->sock;
354   pomlink=Interpreters.next();
355  }
356
357  if (select(nrset+1,&readset,&writeset,0,(struct timeval *)&tout)>0)
358  { 
359  
360 /* Check request sockets */
361  pomlink = Interpreters.first();
362  while (pomlink!=NULL)
363  {
364    if (FD_ISSET(pomlink->sock,&readset))
365    {
366      nr = read(pomlink->sock,&msg,sizeof(MESSAGE));
367      if (nr>0)
368       {
369           if (msg.msg_type == MSG_NET)
370              switch(msg.param.pword[0])
371                {
372                  case NET_PROPAGATE:propagate_msg(&msg);break;
373                  case NET_NODE_EXIST:check_node(msg.param.pword[1],pomlink->sock);break;
374                  case NET_GET_INFO: conn_info(pomlink->sock);break;
375                  case NET_NODES_NUM: msg.param.pword[0]=NET_NODES_NUM_RESPONSE;
376                                      msg.param.pword[1]=Links.count();
377                                      write(pomlink->sock,&msg,sizeof(MESSAGE));
378                                      break; 
379                }/* switch */
380       }
381    } /* ISSET */
382    pomlink=Interpreters.next();
383  } // while
384  
385 /* Check internal socket */
386  if (FD_ISSET(kernel_sock,&readset))
387  {
388   nr = read(kernel_sock, &msg, sizeof(MESSAGE));
389   if (nr>0)
390   {
391    if (msg.msg_type == MSG_NET)
392    {
393      switch(msg.param.pword[0])
394      { 
395        case NET_TRANSMIT_CODE:
396                  transmit_file(msg.param.pword[2],msg.param.pstr,msg.param.pword[1]);  
397                  break;
398        case NET_EXIT: { disconnect_seq();exit_sequence();}
399                    break;
400        case NET_GET_INFO: conn_info(kernel_sock);break;                          
401        case NET_PROPAGATE:propagate_msg(&msg);break;
402        case NET_DISCONNECT:disconnect_seq();
403                    break;            
404        case NET_NODE_EXIST: check_node(msg.param.pword[1],kernel_sock);break;
405        case NET_CONNECT_TO: connect_seq(msg.param.pstr);
406
407         } /* end switch */
408     } /* MSg_NET */
409
410    if (msg.msg_type == MSG_VLP)
411     switch(msg.param.pword[0])
412     {
413      case VLP_REGINT:
414         {
415            pomlink = new INTlink;
416            pomlink->sock = socket(AF_UNIX,SOCK_STREAM,0);
417            bzero(&svr,sizeof(svr));
418            svr.sun_family = AF_UNIX;
419            strcpy(svr.sun_path,msg.param.pstr);
420            si = strlen(svr.sun_path)+sizeof(svr.sun_family);
421            sj = connect(pomlink->sock,(struct sockaddr*)&svr,si);
422            if (sj==0)
423             fcntl(pomlink->sock,F_SETFL, O_NONBLOCK|
424                    fcntl(pomlink->sock,F_GETFL,0));
425            int on=1;
426            setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
427            pomlink->ID = msg.param.pword[1];
428            pomlink->connected=TRUE;
429            Interpreters.append(pomlink);
430
431          };break;
432      case VLP_INTERPRETER_DOWN:
433         {
434           pomlink = findINTlink(msg.param.pword[1]);
435           if (pomlink!=NULL)
436           {
437            close(pomlink->sock);
438            Interpreters.remove(pomlink);
439           } 
440          };break;
441             
442      break;
443     } /* MSg_VLP */
444    }
445   }/* ISSET */
446  } /* select >0 */  
447
448
449 }
450
451 void NETMOD::get_message(NETlink *lnk)
452 {
453  int nr;
454  MESSAGE msg;
455  char pomstr[80];
456  int rdbt,rd,sz,j,psock;
457  struct sockaddr_in svr;
458  unsigned char buffer[FILE_BUFFER_SIZE];
459  protdescr proto;
460
461 if (lnk->connected)
462 {
463   nr = read(lnk->sock,&msg, sizeof(MESSAGE));
464   if (nr>0)
465    {
466 //     netmsg2msg(&msg);
467      if (msg.msg_type == MSG_NET)
468       {
469         switch(msg.param.pword[0])
470         {
471          case NET_CCD_START:
472                      
473                      lnk->code_transmit = TRUE;
474                      sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
475                      strcat(pomstr,".ccd");
476                      lnk->CodeFile = fopen(pomstr,"wb");
477                      if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
478                      lnk->code_transmit=FALSE;}
479                      lnk->CodeSize=msg.param.pword[1];  
480                      psock = socket(AF_INET, SOCK_STREAM, 0); 
481                      bzero(&svr, sizeof(svr));
482                      svr.sin_family = AF_INET;
483                      svr.sin_port = htons(CODEPORT);
484                      svr.sin_addr.s_addr = inet_addr(lnk->addr);
485                      j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
486                      if ( j==0) 
487                      {
488                       //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
489                        sz=0;
490                        while (sz<lnk->CodeSize)
491                        {
492                          rd = read(psock,&buffer,sizeof(buffer));
493                          rdbt = fwrite(&buffer,sizeof(unsigned char),rd,lnk->CodeFile);
494                          sz=sz+rd; 
495                         }
496                        close(psock);
497                        fclose(lnk->CodeFile);
498                       }
499                      
500                      break;
501
502          case NET_PCD_START:
503                      
504                      lnk->code_transmit = TRUE;
505                      sprintf(pomstr,"%s/%s",REMOTE_PATH,msg.param.pstr);
506                      strcat(pomstr,".pcd");
507                      lnk->CodeFile = fopen(pomstr,"wb");
508                      if ( lnk->CodeFile == NULL) { write_at_console("Cannot open file\n");
509                      lnk->code_transmit=FALSE;}
510                      lnk->CodeSize=msg.param.pword[1];  
511                      psock = socket(AF_INET, SOCK_STREAM, 0); 
512                      bzero(&svr, sizeof(svr));
513                      svr.sin_family = AF_INET;
514                      svr.sin_port = htons(CODEPORT1);
515                      svr.sin_addr.s_addr = inet_addr(lnk->addr);
516                      j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
517                      if ( j==0) 
518                      {
519                       //fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));
520                        sz=0;
521                        while (sz<lnk->CodeSize)
522                        {
523                          rd = read(psock,&proto,sizeof(proto));
524                          rdbt = fwrite(&proto,sizeof(unsigned char),rd,lnk->CodeFile);
525                          sz=sz+rd; 
526                         }
527                        close(psock);
528                        fclose(lnk->CodeFile);
529                       }
530                      
531                      break;
532
533          case NET_CONNECT:
534                      
535                      sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
536                      lnk->addr);
537                      lnk->node_number = msg.param.pword[1];
538                      write_at_console(pomstr);
539                      send_accept_info(lnk);
540                      break;
541          case NET_ACCEPT:
542                     
543                     sprintf(pomstr,"Node: %d Addr: %s",msg.param.pword[1],
544                     lnk->addr);
545                     lnk->node_number = msg.param.pword[1];
546                     write_at_console(pomstr);
547                     break;
548          case NET_DISCONNECT:
549                     
550                     sprintf(pomstr,"Node: %d disconnected",msg.param.pword[1]);
551                     write_at_console(pomstr);
552                     ::close(lnk->sock);
553                     Links.remove(lnk);
554                     delete(lnk);
555                     break; 
556          case NET_PROPAGATE:
557                
558                if (msg.param.pword[1] == MSG_VLP) send_to_kernel(&msg);
559                else if (msg.param.pword[1] == MSG_INT) send_to_int(&msg);
560                break;            
561
562                 } /* end switch */
563         
564        }   
565      } /* nr > 0 */
566
567 } /* end if used */
568 }
569
570
571
572 void NETMOD::remote_messages()
573 {
574  int max;
575  fd_set rset,wset;
576  struct timeval tout={0,0};
577  NETlink *pomlink;
578  
579  FD_ZERO(&rset);FD_ZERO(&wset);
580  max=0;
581  
582  pomlink = Links.first();
583  while (pomlink!=NULL)
584  {
585   if (pomlink->connected)
586   {
587     FD_SET(pomlink->sock,&rset);
588     if  (max<pomlink->sock) max=pomlink->sock;
589   }
590   pomlink=Links.next();
591  }
592  
593  if (select(max+1,&rset,&wset,0,(struct timeval *)&tout)>0)
594  {
595   pomlink=Links.first();
596   while (pomlink!=NULL)
597   {
598    if (FD_ISSET(pomlink->sock,&rset)) get_message(pomlink);
599    pomlink=Links.next();
600   }
601  }  
602
603 }
604
605
606 /******************************    2010 ********************************************/
607 void NETMOD::doitall() {
608  unsigned int sz;
609  int nsock,max=0,on, nr, si, sj;
610  struct sockaddr_in svr;
611  fd_set rset;
612  NETlink *pomlink;
613  INTlink *pomlink2;
614  struct sockaddr_un svr2;
615  MESSAGE msg;
616
617
618  FD_ZERO(&rset);
619  FD_SET(listen_sock,&rset);
620  max = listen_sock;
621  FD_SET(kernel_sock,&rset);
622  if (max<kernel_sock) { max=kernel_sock;}
623  pomlink2 = Interpreters.first();
624  while (pomlink2!=NULL)
625  {
626   FD_SET(pomlink2->sock,&rset);
627   if (max<pomlink2->sock) max=pomlink2->sock;
628   pomlink2=Interpreters.next();
629  }
630  pomlink = Links.first();
631  while (pomlink!=NULL)
632  {
633   if (pomlink->connected)
634   {
635     FD_SET(pomlink->sock,&rset);
636     if  (max<pomlink->sock) max=pomlink->sock;
637   }
638   pomlink=Links.next();
639  }
640   
641  // odczyt
642  if (select(max+1,&rset,0,0,NULL) > 0) {
643     // accept connection
644        if (FD_ISSET(listen_sock,&rset))
645         {
646           /* accept connection on listen socket */
647           sz = sizeof(svr);
648           bzero(&svr, sizeof(svr));
649           nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);   
650           if (nsock>0)
651           {   
652             pomlink = new NETlink;
653             strcpy(pomlink->addr,inet_ntoa(svr.sin_addr));
654             pomlink->sock = nsock;
655             pomlink->connected = TRUE;
656             fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock,F_GETFL,0));
657             on=1;
658             setsockopt(pomlink->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
659             Links.append(pomlink);
660            } /* nsock > 0 */
661         } /* ISSET */ 
662
663     // get internal message
664        /* Check request sockets */
665        pomlink2 = Interpreters.first();
666        while (pomlink2!=NULL)
667        {
668          if (FD_ISSET(pomlink2->sock,&rset))
669          {
670            nr = read(pomlink2->sock,&msg,sizeof(MESSAGE));
671            if (nr>0)
672            {
673              if (msg.msg_type == MSG_NET)
674                switch(msg.param.pword[0])
675                {
676                  case NET_PROPAGATE:propagate_msg(&msg); break;
677                  case NET_NODE_EXIST:check_node(msg.param.pword[1],pomlink2->sock);break;
678                  case NET_GET_INFO: conn_info(pomlink2->sock);break;
679                  case NET_NODES_NUM: msg.param.pword[0]=NET_NODES_NUM_RESPONSE;
680                                      msg.param.pword[1]=Links.count();
681                                      write(pomlink2->sock,&msg,sizeof(MESSAGE));
682                                      break; 
683                }/* switch */
684            }
685          } /* ISSET */
686          pomlink2=Interpreters.next();
687        } // while
688        /* Check internal socket */
689        if (FD_ISSET(kernel_sock,&rset))
690        {
691           nr = read(kernel_sock, &msg, sizeof(MESSAGE));
692           if (nr>0)
693           {
694             if (msg.msg_type == MSG_NET)
695             {
696               switch(msg.param.pword[0])
697               { 
698                  case NET_TRANSMIT_CODE:
699                  
700                  transmit_file(msg.param.pword[2],msg.param.pstr,msg.param.pword[1]);  
701                  
702                  break;
703                  case NET_EXIT: {  disconnect_seq();exit_sequence();}
704                  break;
705                  case NET_GET_INFO: conn_info(kernel_sock);break;                          
706                  case NET_PROPAGATE:propagate_msg(&msg);break;
707                  case NET_DISCONNECT:disconnect_seq();
708                    break;            
709                  case NET_NODE_EXIST: check_node(msg.param.pword[1],kernel_sock);break;
710                  case NET_CONNECT_TO: connect_seq(msg.param.pstr);
711               } /* end switch */
712             } /* MSG_NET */
713             if (msg.msg_type == MSG_VLP)
714               switch(msg.param.pword[0])
715               {
716                  case VLP_REGINT:
717                  {
718                     
719                     pomlink2 = new INTlink;
720                     pomlink2->sock = socket(AF_UNIX,SOCK_STREAM,0);
721                     bzero(&svr2,sizeof(svr2));
722                     svr2.sun_family = AF_UNIX;
723                     strcpy(svr2.sun_path,msg.param.pstr);
724                     si = strlen(svr2.sun_path)+sizeof(svr2.sun_family);
725                     sj = connect(pomlink2->sock,(struct sockaddr*)&svr2,si);
726                     if (sj==0)
727                       fcntl(pomlink2->sock,F_SETFL, O_NONBLOCK|fcntl(pomlink2->sock,F_GETFL,0));
728                     int on=1;
729                     setsockopt(pomlink2->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
730                     pomlink2->ID = msg.param.pword[1];
731                     pomlink2->connected=TRUE;
732                     Interpreters.append(pomlink2);
733                     
734                  };break;
735                  case VLP_INTERPRETER_DOWN:
736                  { 
737                    
738                    pomlink2 = findINTlink(msg.param.pword[1]);
739                    if (pomlink2!=NULL)
740                    {
741                      close(pomlink2->sock);
742                      Interpreters.remove(pomlink2);
743                    } 
744                  };break;                       
745               } /* MSg_VLP */
746            } // nr > 0
747        }/* ISSET */
748
749     // get remote message
750
751     pomlink=Links.first();
752     while (pomlink!=NULL)
753     {
754       if (FD_ISSET(pomlink->sock,&rset)) get_message(pomlink);
755       pomlink=Links.next();
756     }
757  } // select 
758
759
760 }
761 /******************************    END 2010 ********************************************/
762
763
764
765 void NETMOD::propagate_msg(MESSAGE *msg)
766 {
767  char ss[255];
768  NETlink *pomlink;
769  
770  pomlink = findNETlink(msg->param.pword[4]);
771  if ((pomlink!=NULL)&&(pomlink->connected))
772   send_to_node(pomlink,msg);
773  else { 
774        if (msg->param.pword[1]==MSG_INT)
775        {
776         send_to_int(msg);
777         }
778        else
779        {
780        sprintf(ss,"Not connected to Node %d",msg->param.pword[4]);  
781        write_at_console(ss);
782        }
783       }      
784
785 }
786
787
788 void NETMOD::connect_seq(char *a)
789 {
790  NETlink *pom;
791  struct sockaddr_in svr;
792  int j,on;
793
794
795  pom=Links.first();
796  while (pom!=NULL)
797  {
798   if (strcmp(pom->addr,a)==0) return;
799   pom=Links.next();
800   }
801      pom = new NETlink;
802      strcpy(pom->addr,a);
803      pom->connected = FALSE;
804      pom->sock = socket(AF_INET, SOCK_STREAM, 0); 
805      bzero(&svr, sizeof(svr));
806      svr.sin_family = AF_INET;
807      svr.sin_port = htons(LOGPORT);
808      svr.sin_addr.s_addr = inet_addr(pom->addr);
809      j = connect(pom->sock, (struct sockaddr*)&svr, sizeof(svr));
810      if ( j==0) 
811           { pom->connected = TRUE;
812            fcntl(pom->sock, F_SETFL,O_NONBLOCK | fcntl(pom->sock, 
813            F_GETFL,0));
814            on=1;
815            setsockopt(pom->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
816            send_connect_info(pom);
817            Links.append(pom); 
818           }
819       else write_at_console("Connection failed");
820 }
821
822 void NETMOD::check_links()
823 {
824
825 int j=1;
826 struct sockaddr_in svr;
827 NETlink *pomlink;
828
829 /* connect to all other nodes */
830 if (!all_connected)
831 {
832  pomlink=Links.first();
833  while (pomlink!=NULL)
834  {
835  if ( !(pomlink->connected) )
836  {
837    bzero(&svr, sizeof(svr));
838    svr.sin_family = AF_INET;
839    svr.sin_port = htons(LOGPORT);
840    svr.sin_addr.s_addr = inet_addr(pomlink->addr);
841   
842    j = connect(pomlink->sock, (struct sockaddr*)&svr, sizeof(svr));
843    if ( j==0) 
844           { pomlink->connected = TRUE;
845            fcntl(pomlink->sock, F_SETFL,O_NONBLOCK | fcntl(pomlink->sock, 
846            F_GETFL,0));
847            send_connect_info(pomlink);
848
849            }
850    if (errno == ECONNREFUSED) 
851            sock_reopen(pomlink);
852  } // not connected
853   pomlink=Links.next();
854  } //while
855
856 }//if
857 all_connected=TRUE;
858 pomlink=Links.first();
859 while(pomlink!=NULL)
860 {
861  if (pomlink->connected==FALSE) {all_connected=FALSE;break;}
862  pomlink=Links.next();
863 }
864
865 }
866
867 void NETMOD::sock_reopen(NETlink *lnk)
868 {
869  int on=1;
870
871  close(lnk->sock);
872  lnk->sock = socket(AF_INET, SOCK_STREAM, 0); 
873  fcntl(lnk->sock, F_SETFL,O_NONBLOCK | fcntl(lnk->sock, 
874            F_GETFL,0));
875  setsockopt(lnk->sock,IPPROTO_TCP,TCP_NODELAY,(char*)&on,sizeof(on));
876 }
877
878
879 // **************** Acknowledges *************************
880
881 void NETMOD::send_connect_info(NETlink *lnk)
882 {
883  MESSAGE m;
884
885  m.param.pword[0] = NET_CONNECT;
886  m.param.pword[1] = MyNode;
887  m.msg_type = MSG_NET;
888 // msg2netmsg(&m);
889  if (lnk->sock)
890  write(lnk->sock,&m,sizeof(MESSAGE));
891 }
892
893 void NETMOD::send_accept_info(NETlink *lnk)
894 {
895  MESSAGE m;
896
897  m.param.pword[0] = NET_ACCEPT;
898  m.param.pword[1] = MyNode;
899  m.msg_type = MSG_NET;
900 // msg2netmsg(&m);
901  if (lnk->sock)
902  write(lnk->sock,&m,sizeof(MESSAGE));
903 }
904
905 void NETMOD::send_code_ack(NETlink *lnk)
906 {
907 MESSAGE m;
908
909  m.param.pword[0] = NET_CODESTREAM_OK;
910  m.msg_type = MSG_NET;
911 // msg2netmsg(&m);
912  if (lnk->sock)
913  write(lnk->sock,&m,sizeof(MESSAGE));
914 }
915
916
917 void NETMOD::send_to_all(MESSAGE *msg)
918 {
919  NETlink *pomlink;
920  pomlink=Links.first();
921  while (pomlink!=NULL)
922  {
923   write(pomlink->sock,msg,sizeof(MESSAGE));
924   pomlink=Links.next();
925  }
926 }
927
928
929
930
931 void NETMOD::run()
932 {
933   while(1)
934  {
935   /*accept_connection();
936   get_internal();
937   remote_messages(); */
938   // 2010
939   doitall();
940  }
941 }
942
943 void NETMOD::exit_sequence()
944 {
945  NETlink *pomlink;
946
947  ::close(kernel_sock);
948  ::close(listen_sock);
949  unlink(kername);
950  pomlink = Links.first();
951  while (pomlink!=NULL)
952  {
953   ::close(pomlink->sock);
954   pomlink=Links.next();
955   }
956  exit(0);
957 }
958
959 void NETMOD::disconnect_seq()
960 {
961  MESSAGE m;
962  NETlink *p;
963
964  bzero(&m,sizeof(MESSAGE));
965  m.msg_type = MSG_NET;
966  m.param.pword[0] = NET_DISCONNECT;
967  m.param.pword[1] = MyNode;
968
969  p=Links.first();
970  while(p!=NULL)
971  {
972   send_to_node(p,&m);
973   p=Links.next();
974  }
975  p=Links.first();
976  while(p!=NULL)
977  {
978   ::close(p->sock);
979   p=Links.next();
980  }
981  Links.clear();
982 }
983
984 NETlink *NETMOD::findNETlink(int node)
985 {
986  NETlink *pomlink;
987  pomlink=Links.first();
988  while(pomlink!=NULL)
989  {
990   if (pomlink->node_number == node) return(pomlink);
991   pomlink=Links.next();
992  } 
993  return(pomlink);
994 }
995
996 INTlink *NETMOD::findINTlink(int id)
997 {
998  INTlink *pomlink;
999  pomlink=Interpreters.first();
1000  while(pomlink!=NULL)
1001  {
1002   if (pomlink->ID == id) return(pomlink);
1003   pomlink=Interpreters.next();
1004  } 
1005  return(pomlink);
1006 }
1007
1008
1009 /* ----------------    Sending code to a remote node -------------- */
1010
1011 void NETMOD::transmit_file(int ton, char *fname, int fromINT)
1012 {
1013  FILE *f;
1014  MESSAGE msg;
1015  char fn[80];
1016  char b[255];
1017  unsigned char buffer[FILE_BUFFER_SIZE];
1018  protdescr proto;
1019  int i,tsock,sock;
1020  unsigned int sz;
1021  NETlink *outlink;
1022  struct sockaddr_in svr;
1023  fd_set rset,wset;
1024
1025
1026 // **************** CCD FILE
1027
1028  strcpy(fn,fname);
1029  strcat(fn,".ccd");
1030  f = fopen(fn,"rb");
1031  if (f!=NULL)
1032  {
1033  fseek(f,0,SEEK_END);
1034  msg.param.pword[1] = ftell(f);
1035  fclose(f);
1036  f = fopen(fn,"rb");
1037  
1038  strcpy(b,rindex(fname,'/'));
1039  for(i=0;i<strlen(b);i++)
1040    b[i] = b[i+1];
1041  msg.param.pword[0] = NET_CCD_START;
1042  strcpy(msg.param.pstr,b);
1043
1044  outlink = findNETlink(ton);
1045  if (outlink==NULL) exit(1);
1046  bzero(&svr,sizeof(svr));
1047  sock = socket(AF_INET,SOCK_STREAM,0);
1048  svr.sin_family = AF_INET;
1049  svr.sin_addr.s_addr = INADDR_ANY;
1050  svr.sin_port = htons(CODEPORT);
1051  bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1052  listen(sock,5);   
1053  send_to_node(outlink, &msg);
1054  sz=sizeof(svr);
1055  FD_ZERO(&rset);FD_ZERO(&wset);
1056  FD_SET(sock,&rset);
1057  if (select(sock+1,&rset,&wset,0,0))
1058   if (FD_ISSET(sock,&rset))
1059     tsock = accept(sock, (struct sockaddr*)&svr,&sz );
1060  if (tsock>0)
1061  {
1062   close(sock);
1063   while (!feof(f))
1064    {
1065     i = fread(&buffer,1,sizeof(buffer),f);
1066     write(tsock,&buffer,i);
1067     FD_ZERO(&rset);FD_ZERO(&wset);
1068     FD_SET(tsock,&wset);
1069     select(tsock+1,&rset,&wset,0,0);
1070     }
1071   close(tsock);
1072  }
1073  fclose(f);
1074  } // f!=NULL
1075   else
1076      {
1077     sprintf(b,"Cannot open file to send %s\n",fname);
1078     write_at_console(b);
1079    }
1080
1081
1082 // *************** PCD FILE
1083  
1084  strcpy(fn,fname);
1085  strcat(fn,".pcd");
1086  f = fopen(fn,"r");
1087   if (f!=NULL)
1088  {
1089  fseek(f,0,SEEK_END);
1090  msg.param.pword[1] = ftell(f);
1091  fclose(f);
1092  f = fopen(fn,"rb");
1093  
1094  strcpy(b,rindex(fname,'/'));
1095  for(i=0;i<strlen(b);i++)
1096    b[i] = b[i+1];
1097  msg.param.pword[0] = NET_PCD_START;
1098  strcpy(msg.param.pstr,b);
1099
1100  outlink = findNETlink(ton);
1101  if (outlink==NULL) exit(1);
1102  bzero(&svr,sizeof(svr));
1103  sock = socket(AF_INET,SOCK_STREAM,0);
1104  svr.sin_family = AF_INET;
1105  svr.sin_addr.s_addr = INADDR_ANY;
1106  svr.sin_port = htons(CODEPORT1);
1107  bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1108  listen(sock,5);   
1109  send_to_node(outlink, &msg);
1110  sz=sizeof(svr);
1111  FD_ZERO(&rset);FD_ZERO(&wset);
1112  FD_SET(sock,&rset);
1113  if (select(sock+1,&rset,&wset,0,0))
1114   if (FD_ISSET(sock,&rset))
1115     tsock = accept(sock, (struct sockaddr*)&svr,&sz );
1116  if (tsock>0)
1117  {
1118   close(sock);
1119   while (!feof(f))
1120    {
1121     i = fread(&proto,1,sizeof(proto),f);
1122     write(tsock,&proto,i);
1123     FD_ZERO(&rset);FD_ZERO(&wset);
1124     FD_SET(tsock,&wset);
1125     select(tsock+1,&rset,&wset,0,0);
1126     }
1127   close(tsock);
1128  }
1129  fclose(f);
1130  } // f!=NULL
1131   else
1132    {
1133     sprintf(b,"Cannot open file to send %s\n",fname);
1134     write_at_console(b);
1135    }
1136  
1137
1138  msg.msg_type = MSG_NET; 
1139  msg.param.pword[0] = NET_TRANSMITTED;
1140  msg.param.pword[1] = fromINT;
1141  send_to_kernel(&msg);
1142  
1143
1144 }
1145
1146
1147
1148 void NETMOD::conn_info(int sk)
1149 {
1150  NETlink *pom;
1151  MESSAGE m;
1152  int k=0;
1153  char poms[255];
1154
1155  m.msg_type = MSG_NET;
1156  m.param.pword[0] = NET_INFO;
1157  strcpy(m.param.pstr,"");
1158  pom=Links.first();
1159  while (pom!=NULL)
1160  {
1161   sprintf(poms,"%d=%s;",pom->node_number,pom->addr);
1162   strcat(m.param.pstr,poms);
1163   k++;
1164   if (k==12)
1165   {
1166     m.param.pword[1]=12;
1167     write(sk,&m,sizeof(MESSAGE));
1168     k=0;
1169    }
1170   pom=Links.next();
1171  }
1172  if (k>0)
1173   {
1174    m.param.pword[1]=k;
1175    write(sk,&m,sizeof(MESSAGE));
1176    }
1177   m.msg_type = MSG_NET;
1178   m.param.pword[0] = NET_INFO_END;
1179   write(sk,&m,sizeof(MESSAGE));
1180 }
1181
1182 int main(int argc,char **argv)
1183 {
1184  NETMOD netter(argv[1]);
1185  netter.run();
1186  return 0;
1187 }