vlp-10 Using coding style in loggraph.
[vlp.git] / src / net / lognet.cpp
1 #include "genint1.h"
2 #include "comm.h"
3
4 #include <sys/socket.h>
5 #include <sys/un.h>
6 #include <netinet/in.h>
7 #include <errno.h>
8 #include <netdb.h>
9 #include <arpa/inet.h>
10
11 #include <sys/types.h>
12 #include <sys/time.h>
13 #include <stdlib.h>
14 #include <stdio.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/stat.h>
18 #include <string.h>
19 #include <qlist.h>
20 #include <qfile.h>
21 #include <qstring.h>
22 #include <qstringlist.h>
23 #include <unistd.h>
24
25 #include <libconfig.h>
26
27 #define REMOTE_PATH "REMOTE"
28 #define MAXLINKS 30
29 #define LOGPORT 3600
30 #define CODEPORT 3700
31 #define CODEPORT1 3800
32 #define MAXINTERP 10
33 #define FILE_BUFFER_SIZE 2048
34
35
36
37 /*************** Interpreter slot ********************/
38 class INTlink {
39 public:
40         int sock;
41         int ID;
42         bool connected;
43         INTlink();
44 };
45
46 INTlink::INTlink()
47 {
48         connected = FALSE;
49         sock = 0;
50 }
51
52 /*********************  Network slot ********************/
53 class NETlink {
54 public:
55         int sock;
56         bool connected;
57         bool code_transmit;
58         char addr[255];
59
60         int node_number;
61         int aliases[5];
62
63         FILE *CodeFile;
64         char CodeName[255];
65         long CodeSize;
66
67         NETlink();
68 };
69
70 NETlink::NETlink()
71 {
72         int i;
73         for(i = 0; i < 5; i++) 
74                 aliases[i] = -1;
75
76         connected = FALSE;
77         sock = 0;
78         code_transmit = FALSE;
79 }
80
81 /*********************** NET Module *****************************/
82
83 class NETMOD {
84 public:
85         int kernel_sock;
86         int listen_sock;
87         bool all_connected;
88         bool local_mode;
89         int to_connect;
90         int MyNode;
91         char kername[256];
92
93         /* List of the Interpeter slots */
94         QList<INTlink> Interpreters;
95         /* List of the Network slots */ 
96         QList<NETlink> Links;
97
98         NETMOD(char*);
99
100         void load_config(char*);    
101         void write_at_console(char*);
102         void send_to_kernel(MESSAGE*);
103         void sock_reopen(NETlink*);
104         void send_connect_info(NETlink*);
105         void send_accept_info(NETlink*);
106         void send_to_node(NETlink*, MESSAGE*);
107         void send_to_int(MESSAGE*);
108         void send_code_ack(NETlink*);
109         void send_to_all(MESSAGE *);
110
111         NETlink *findNETlink(int node);
112         INTlink *findINTlink(int id);
113         void transmit_file(int ton, char *fname, int fromINT);
114         void propagate_msg(MESSAGE *msg);
115         void check_node(int, int);
116
117         void run();
118         void exit_sequence();
119         void disconnect_seq();
120         void connect_seq(char*);
121         void accept_connection();
122         void get_internal();
123         void remote_messages(); 
124         void check_links();
125         void get_message(NETlink*);
126         void conn_info(int);
127
128         /* 2010 */
129         void doitall();
130 };
131
132 NETMOD::NETMOD(char *kernel_name)
133 {
134         int i;
135         int len;
136         int on;
137         struct sockaddr_in svr;
138         struct sockaddr_un svr1;
139         MESSAGE m;
140         char s[256];
141
142         Links.clear();
143         Interpreters.clear();
144
145         bzero(&svr, sizeof(svr)); 
146         listen_sock = socket(AF_INET, SOCK_STREAM, 0);
147         svr.sin_family = AF_INET;
148         svr.sin_addr.s_addr = INADDR_ANY;
149         svr.sin_port = htons(LOGPORT);
150         bind(listen_sock, (struct sockaddr*)&svr, sizeof(svr));
151         listen(listen_sock,5);
152         fcntl(listen_sock, F_SETFL, O_NONBLOCK | fcntl(listen_sock, F_GETFL, 
153                                                                         0));
154
155         to_connect = 0;
156         all_connected = FALSE;
157         load_config("vlp.cfg");
158
159         kernel_sock = socket(AF_UNIX, SOCK_STREAM, 0);
160         bzero(&svr1, sizeof(svr1));
161         svr1.sun_family = AF_UNIX;
162         strcpy(svr1.sun_path, kernel_name);
163         strcpy(kername, kernel_name);
164         len = strlen(svr1.sun_path) + sizeof(svr1.sun_family);
165         i = connect(kernel_sock, (struct sockaddr*)&svr1, len);
166         if (i == 0)
167         fcntl(kernel_sock, F_SETFL, O_NONBLOCK|fcntl(kernel_sock, F_GETFL, 0));
168         on = 1;
169         setsockopt(kernel_sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on,
170                                                                 sizeof(on));
171         m.msg_type = MSG_NET;
172         m.param.pword[0] = NET_NODE;
173         m.param.pword[1] = MyNode;
174         send_to_kernel(&m);
175
176         /* if (regme) regme_sequence();*/
177
178         if (to_connect > 0) {
179                 write_at_console("Connecting remote VLPs...");  
180                 while (!all_connected) {
181                         check_links();
182                 }
183         }
184         sprintf(s, "Local node number %d", MyNode);
185         write_at_console(s);
186 }
187
188 /*#####################  Load configuration ##########################*/
189
190 void NETMOD::load_config(char *fname)
191 {
192         config_t cfg;
193         config_setting_t *setting;
194         int on;
195         int k = 0;
196         NETlink *pomlink;
197
198         config_init(&cfg);
199
200         /* Hack for checking if file exists without using external libs.*/
201         FILE * file = fopen(fname, "rt");
202         if (!file) {
203                 fprintf(stderr, "Error: Cannot load configuration file %s!\n",
204                                                                         fname);
205                 write_at_console("Cannot load configuration file!");
206                 fclose(file);
207                 exit(3);
208         }
209
210         /* Read the file. If there is an error, report it and exit. */
211         if (!config_read(&cfg, file))  {
212                 fprintf(stderr, "%s: In file %s, line %d\n",
213                                                 config_error_text(&cfg),
214                                                 config_error_file(&cfg),
215                                                 config_error_line(&cfg));
216                 config_destroy(&cfg);
217                 fclose(file);
218                 /* from original code. */
219                 exit(3);
220         }
221
222         setting = config_lookup(&cfg, "node_number");
223         if (setting) {
224                 MyNode = config_setting_get_int(setting);
225         }
226         /* else */
227         if (!setting || MyNode == -1) {
228                 fprintf(stderr, "%s! In file %s, '%s' was not found.\n",
229                                                 "Error", fname, "node_number");
230                 write_at_console("Node number must be specified");
231                 config_destroy(&cfg);
232                 fclose(file);
233                 exit(1);
234         }
235
236         setting = config_lookup(&cfg, "host");  
237         if (setting) {
238                 k++;
239                 pomlink = new NETlink;
240
241                 switch(config_setting_type(setting)) {
242                 /* TODO: Deprecated. Made for back compatibility. */
243                 case CONFIG_TYPE_STRING:
244                         strncpy(pomlink->addr,
245                                 config_setting_get_string(setting), 255);
246                         break;
247                 case CONFIG_TYPE_ARRAY:
248                         strncpy(pomlink->addr,
249                                 config_setting_get_string_elem(setting, 0),
250                                                                         255);
251                         break;
252                 default:
253                         fprintf(stderr, "%s! In file %s, bad entry type for %s."
254                                                 " Will not be read.\n",
255                                                 "Fatal error", fname, "host");
256                         config_destroy(&cfg);
257                         fclose(file);
258                         exit(1);
259                 }
260                 pomlink->connected = FALSE;
261                 pomlink->sock = socket(AF_INET, SOCK_STREAM, 0); 
262                 fcntl(pomlink->sock, F_SETFL,O_NONBLOCK |
263                                         fcntl(pomlink->sock, F_GETFL, 0));
264                 on = 1; 
265                 setsockopt(pomlink->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on,
266                                                                 sizeof(on)); 
267                 Links.append(pomlink); 
268                 to_connect++;
269         } else {
270                 fprintf(stderr, "%s! In file %s, '%s' was not found.\n",
271                                                 "Warning", fname, "host");
272         }
273
274         config_destroy(&cfg);
275         fclose(file);
276
277         if (k == 0) {
278                 all_connected = TRUE;
279         }
280 }
281
282 void NETMOD::write_at_console(char *s)
283 {
284         MESSAGE msg;
285
286         msg.msg_type = MSG_NET;
287         msg.param.pword[0] = NET_CSWRITELN;
288         strcpy(msg.param.pstr, s);
289         send_to_kernel(&msg);
290 }
291
292 void NETMOD::send_to_kernel(MESSAGE *msg)
293 {
294         write(kernel_sock, msg, sizeof(MESSAGE));
295 }
296
297 void NETMOD::send_to_node(NETlink *lnk, MESSAGE *msg)
298 {
299         msg->msg_type = MSG_NET;
300         /* msg2netmsg(msg);*/
301         if (lnk->sock) {
302                 write(lnk->sock, msg, sizeof(MESSAGE));
303         }
304 }
305
306 void NETMOD::send_to_int(MESSAGE *msg)
307 {
308         INTlink *pomlink;
309
310         pomlink = findINTlink(msg->param.pword[5]);
311         if (pomlink != NULL) {
312                 write(pomlink->sock, msg, sizeof(MESSAGE));
313         }
314 }
315
316
317
318 void NETMOD::accept_connection()
319 {
320         unsigned int sz;
321         int nsock;
322         int on;
323         struct sockaddr_in svr;
324         fd_set rset;
325         fd_set wset;
326         struct timeval tout = {0, 0};
327         NETlink *pomlink;
328
329         FD_ZERO(&rset);
330         FD_ZERO(&wset);
331         FD_SET(listen_sock, &rset);
332
333         if (select(listen_sock + 1, &rset, &wset, 0, (struct timeval *)&tout) >
334                                                                         0) {
335                 if (FD_ISSET(listen_sock,&rset)) {
336                         /* accept connection on listen socket */
337                         sz = sizeof(svr);
338                         bzero(&svr, sizeof(svr));
339                         nsock = accept(listen_sock, (struct sockaddr*)&svr,
340                                                                         &sz);
341
342                         if (nsock > 0) {
343
344                                 /* i<0 someone wants to connect us */
345                                 pomlink = new NETlink;
346                                 strcpy(pomlink->addr, inet_ntoa(svr.sin_addr));
347                                 pomlink->sock = nsock;
348                                 pomlink->connected = TRUE;
349                                 fcntl(pomlink->sock, F_SETFL, O_NONBLOCK |
350                                         fcntl(pomlink->sock, F_GETFL,0));
351                                 on = 1;
352                                 setsockopt(pomlink->sock, IPPROTO_TCP,
353                                         TCP_NODELAY, (char*)&on, sizeof(on));
354                                 Links.append(pomlink);
355                         } /* nsock > 0 */
356                 } /* ISSET */
357         }
358 }
359
360
361 void NETMOD::check_node(int n, int sc)
362 {
363         MESSAGE m;
364         NETlink *pomlink;
365
366         m.msg_type = MSG_NET;
367         m.param.pword[0] = NET_NODE_EXIST;
368
369         pomlink = Links.first();
370         m.param.pword[1] = 0;
371         while (pomlink != NULL) {
372                 if (pomlink->node_number == n) {
373                         m.param.pword[1] = 1;
374                         strcpy(m.param.pstr, pomlink->addr);
375                         break;
376                 }
377                 pomlink = Links.next();
378         }
379         write(sc, &m, sizeof(MESSAGE));
380 }
381
382
383 /************** Internal message from kernel or INT ********************/
384
385 void NETMOD::get_internal()
386 {
387         int nr;
388         int nrset;
389         MESSAGE msg;
390         int si;
391         int sj;
392         fd_set readset;
393         fd_set writeset;
394         struct timeval tout={0, 0};
395         INTlink *pomlink;
396         struct sockaddr_un svr;
397
398         FD_ZERO(&readset);
399         FD_ZERO(&writeset);
400         FD_SET(kernel_sock, &readset);
401         nrset = kernel_sock;
402
403         pomlink = Interpreters.first();
404         while (pomlink != NULL) {
405                 FD_SET(pomlink->sock, &readset);
406                 if (nrset < pomlink->sock) {
407                         nrset = pomlink->sock;
408                 }
409                 pomlink = Interpreters.next();
410         }
411
412         if (select(nrset + 1, &readset, &writeset, 0, (struct timeval *)&tout) >
413                                                                         0) {
414
415                 /* Check request sockets */
416                 pomlink = Interpreters.first();
417                 while (pomlink != NULL) {
418                         if (FD_ISSET(pomlink->sock, &readset)) {
419                                 nr = read(pomlink->sock, &msg, sizeof(MESSAGE));
420                                 if (nr > 0) {
421                                         if (msg.msg_type == MSG_NET) {
422                                                 switch(msg.param.pword[0]) {
423                                                 case NET_PROPAGATE:
424                                                         propagate_msg(&msg);
425                                                         break;
426                                                 case NET_NODE_EXIST:
427                                                         check_node(msg.param.pword[1], pomlink->sock);
428                                                         break;
429                                                 case NET_GET_INFO: 
430                                                         conn_info(pomlink->sock);
431                                                         break;
432                                                 case NET_NODES_NUM:
433                                                         msg.param.pword[0] = NET_NODES_NUM_RESPONSE;
434                                                         msg.param.pword[1] = Links.count();
435                                                         write(pomlink->sock, &msg, sizeof(MESSAGE));
436                                                         break;
437                                                 }/* switch */
438                                         }
439                                 }
440                         } /* ISSET */
441                         pomlink=Interpreters.next();
442                 } // while
443
444                 /* Check internal socket */
445                 if (FD_ISSET(kernel_sock,&readset)) {
446                         nr = read(kernel_sock, &msg, sizeof(MESSAGE));
447                         if (nr > 0) {
448                                 if (msg.msg_type == MSG_NET) {
449                                         switch(msg.param.pword[0]) { 
450                                         case NET_TRANSMIT_CODE:
451                                                 transmit_file(msg.param.pword[2],
452                                                         msg.param.pstr, 
453                                                         msg.param.pword[1]);
454                                                 break;
455                                         case NET_EXIT:
456                                                 disconnect_seq();
457                                                 exit_sequence();
458                                                 break;
459                                         case NET_GET_INFO:
460                                                 conn_info(kernel_sock);
461                                                 break;
462                                         case NET_PROPAGATE:
463                                                 propagate_msg(&msg);
464                                                 break;
465                                         case NET_DISCONNECT:
466                                                 disconnect_seq();
467                                                 break;
468                                         case NET_NODE_EXIST:
469                                                 check_node(msg.param.pword[1], kernel_sock);
470                                                 break;
471                                         case NET_CONNECT_TO:
472                                                 connect_seq(msg.param.pstr);
473                                         } /* end switch */
474                                 } /* MSg_NET */
475
476                                 if (msg.msg_type == MSG_VLP) {
477                                         switch(msg.param.pword[0]) {
478                                         case VLP_REGINT: {
479                                                 pomlink = new INTlink;
480                                                 pomlink->sock = socket(AF_UNIX, SOCK_STREAM, 0);
481                                                 bzero(&svr,sizeof(svr));
482                                                 svr.sun_family = AF_UNIX;
483                                                 strcpy(svr.sun_path, msg.param.pstr);
484                                                 si = strlen(svr.sun_path) + sizeof(svr.sun_family);
485                                                 sj = connect(pomlink->sock, (struct sockaddr*)&svr, si);
486                                                 if (sj == 0)
487                                                         fcntl(pomlink->sock, F_SETFL, O_NONBLOCK | fcntl(pomlink->sock, F_GETFL, 0));
488                                                 int on = 1;
489                                                 setsockopt(pomlink->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on));
490                                                 pomlink->ID = msg.param.pword[1];
491                                                 pomlink->connected=TRUE;
492                                                 Interpreters.append(pomlink);
493                                                 };
494                                                 break;
495                                         case VLP_INTERPRETER_DOWN: {
496                                                 pomlink = findINTlink(msg.param.pword[1]);
497                                                 if (pomlink != NULL) {
498                                                         close(pomlink->sock);
499                                                         Interpreters.remove(pomlink);
500                                                 }
501                                                 };
502                                                 break;
503                                         break;
504                                         } /* MSg_VLP */
505                                 }
506                         }
507                 }/* ISSET */
508         } /* select >0 */
509 }
510
511 void NETMOD::get_message(NETlink *lnk)
512 {
513         int nr;
514         MESSAGE msg;
515         char pomstr[80];
516         int rdbt;
517         int rd;
518         int sz;
519         int j;
520         int psock;
521         struct sockaddr_in svr;
522         unsigned char buffer[FILE_BUFFER_SIZE];
523         protdescr proto;
524
525         if (lnk->connected) {
526                 nr = read(lnk->sock, &msg, sizeof(MESSAGE));
527                 if (nr > 0) {
528                         /* netmsg2msg(&msg); */
529                         if (msg.msg_type == MSG_NET) {
530                                 switch(msg.param.pword[0]) {
531                                 case NET_CCD_START:
532                                         lnk->code_transmit = TRUE;
533                                         sprintf(pomstr, "%s/%s", REMOTE_PATH, msg.param.pstr);
534                                         strcat(pomstr,".ccd");
535                                         lnk->CodeFile = fopen(pomstr, "wb");
536                                         if (lnk->CodeFile == NULL) {
537                                                 write_at_console("Cannot open file\n");
538                                                 lnk->code_transmit = FALSE;
539                                         }
540                                         lnk->CodeSize = msg.param.pword[1];
541                                         psock = socket(AF_INET, SOCK_STREAM, 0); 
542                                         bzero(&svr, sizeof(svr));
543                                         svr.sin_family = AF_INET;
544                                         svr.sin_port = htons(CODEPORT);
545                                         svr.sin_addr.s_addr = inet_addr(lnk->addr);
546                                         j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
547                                         if (j == 0) {
548                                                 /*fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));*/
549                                                 sz = 0;
550                                                 while (sz < lnk->CodeSize) {
551                                                         rd = read(psock, &buffer, sizeof(buffer));
552                                                         rdbt = fwrite(&buffer, sizeof(unsigned char), rd, lnk->CodeFile);
553                                                         sz = sz + rd;
554                                                 }
555                                                 close(psock);
556                                                 fclose(lnk->CodeFile);
557                                         }
558                                         break;
559                                 case NET_PCD_START:
560                                         lnk->code_transmit = TRUE;
561                                         sprintf(pomstr, "%s/%s", REMOTE_PATH, msg.param.pstr);
562                                         strcat(pomstr, ".pcd");
563                                         lnk->CodeFile = fopen(pomstr, "wb");
564                                         if (lnk->CodeFile == NULL) {
565                                                 write_at_console("Cannot open file\n");
566                                                 lnk->code_transmit = FALSE;
567                                         }
568                                         lnk->CodeSize = msg.param.pword[1];
569                                         psock = socket(AF_INET, SOCK_STREAM, 0); 
570                                         bzero(&svr, sizeof(svr));
571                                         svr.sin_family = AF_INET;
572                                         svr.sin_port = htons(CODEPORT1);
573                                         svr.sin_addr.s_addr = inet_addr(lnk->addr);
574                                         j = connect(psock, (struct sockaddr*)&svr, sizeof(svr));
575                                         if (j == 0) {
576                                                 /*fcntl(psock, F_SETFL,O_NONBLOCK | fcntl(psock, F_GETFL,0));*/
577                                                 sz = 0;
578                                                 while (sz < lnk->CodeSize) {
579                                                         rd = read(psock, &proto, sizeof(proto));
580                                                         rdbt = fwrite(&proto, sizeof(unsigned char), rd, lnk->CodeFile);
581                                                         sz = sz + rd;
582                                                 }
583                                                 close(psock);
584                                                 fclose(lnk->CodeFile);
585                                         }
586                                         break;
587                                 case NET_CONNECT:
588                                         sprintf(pomstr, "Node: %d Addr: %s",
589                                                 msg.param.pword[1], lnk->addr);
590                                         lnk->node_number = msg.param.pword[1];
591                                         write_at_console(pomstr);
592                                         send_accept_info(lnk);
593                                         break;
594                                 case NET_ACCEPT:
595                                         sprintf(pomstr, "Node: %d Addr: %s",
596                                                 msg.param.pword[1], lnk->addr);
597                                         lnk->node_number = msg.param.pword[1];
598                                         write_at_console(pomstr);
599                                         break;
600                                 case NET_DISCONNECT:
601                                         sprintf(pomstr,"Node: %d disconnected",
602                                                 msg.param.pword[1]);
603                                         write_at_console(pomstr);
604                                         ::close(lnk->sock);
605                                         Links.remove(lnk);
606                                         delete lnk;
607                                         break;
608                                 case NET_PROPAGATE:
609                                         if (msg.param.pword[1] == MSG_VLP) {
610                                                 send_to_kernel(&msg);
611                                         } else if (msg.param.pword[1] == MSG_INT) {
612                                                 send_to_int(&msg);
613                                         }
614                                         break;
615                                 } /* end switch */
616                         }   
617                 } /* nr > 0 */
618
619         } /* end if used */
620 }
621
622
623
624 void NETMOD::remote_messages()
625 {
626         int max;
627         fd_set rset, wset;
628         struct timeval tout={0, 0};
629         NETlink *pomlink;
630
631         FD_ZERO(&rset);
632         FD_ZERO(&wset);
633         max = 0;
634
635         pomlink = Links.first();
636         while (pomlink != NULL) {
637                 if (pomlink->connected) {
638                         FD_SET(pomlink->sock, &rset);
639                         if (max < pomlink->sock){
640                                 max = pomlink->sock;
641                         }
642                 }
643                 pomlink=Links.next();
644         }
645
646         if (select(max + 1, &rset, &wset, 0, (struct timeval *)&tout) > 0) {
647                 pomlink=Links.first();
648                 while (pomlink!=NULL) {
649                         if (FD_ISSET(pomlink->sock,&rset)) {
650                                 get_message(pomlink);
651                         }
652                         pomlink=Links.next();
653                 }
654         }
655 }
656
657 /******************************2010********************************************/
658 void NETMOD::doitall() {
659         unsigned int sz;
660         int nsock,max = 0, on, nr, si, sj;
661         struct sockaddr_in svr;
662         fd_set rset;
663         NETlink *pomlink;
664         INTlink *pomlink2;
665         struct sockaddr_un svr2;
666         MESSAGE msg;
667
668         FD_ZERO(&rset);
669         FD_SET(listen_sock, &rset);
670         max = listen_sock;
671         FD_SET(kernel_sock, &rset);
672         if (max < kernel_sock) {
673                 max=kernel_sock;
674         }
675         pomlink2 = Interpreters.first();
676         while (pomlink2 != NULL) {
677                 FD_SET(pomlink2->sock, &rset);
678                 if (max < pomlink2->sock) {
679                         max=pomlink2->sock;
680                 }
681                 pomlink2 = Interpreters.next();
682         }
683         pomlink = Links.first();
684         while (pomlink != NULL) {
685                 if (pomlink->connected) {
686                         FD_SET(pomlink->sock, &rset);
687                         if (max < pomlink->sock) {
688                                 max = pomlink->sock;
689                         }
690                 }
691                 pomlink=Links.next();
692         }
693
694         /* odczyt */
695         if (select(max + 1, &rset, 0, 0, NULL) > 0) {
696                 /* accept connection */
697                 if (FD_ISSET(listen_sock,&rset)) {
698                         /* accept connection on listen socket */
699                         sz = sizeof(svr);
700                         bzero(&svr, sizeof(svr));
701                         nsock = accept(listen_sock, (struct sockaddr*)&svr, &sz);
702                         if (nsock > 0) {
703                                 pomlink = new NETlink;
704                                 strcpy(pomlink->addr, inet_ntoa(svr.sin_addr));
705                                 pomlink->sock = nsock;
706                                 pomlink->connected = TRUE;
707                                 fcntl(pomlink->sock, F_SETFL,O_NONBLOCK |
708                                         fcntl(pomlink->sock, F_GETFL, 0));
709                                 on = 1;
710                                 setsockopt(pomlink->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on));
711                                 Links.append(pomlink);
712                         } /* nsock > 0 */
713                 } /* ISSET */
714
715                 /* get internal message*/
716                 /* Check request sockets */
717                 pomlink2 = Interpreters.first();
718                 while (pomlink2 != NULL) {
719                         if (FD_ISSET(pomlink2->sock,&rset)) {
720                                 nr = read(pomlink2->sock, &msg, sizeof(MESSAGE));
721                                 if (nr > 0) {
722                                         if (msg.msg_type == MSG_NET) {
723                                                 switch(msg.param.pword[0]) {
724                                                 case NET_PROPAGATE:
725                                                         propagate_msg(&msg);
726                                                         break;
727                                                 case NET_NODE_EXIST:
728                                                         check_node(msg.param.pword[1], pomlink2->sock);
729                                                         break;
730                                                 case NET_GET_INFO:
731                                                         conn_info(pomlink2->sock);
732                                                         break;
733                                                 case NET_NODES_NUM:
734                                                         msg.param.pword[0] = NET_NODES_NUM_RESPONSE;
735                                                         msg.param.pword[1] = Links.count();
736                                                         write(pomlink2->sock, &msg, sizeof(MESSAGE));
737                                                         break;
738                                                 }/* switch */
739                                         }
740                                 }
741                         }/* ISSET */
742                         pomlink2 = Interpreters.next();
743                 }/* while */
744
745                 /* Check internal socket */
746                 if (FD_ISSET(kernel_sock, &rset)) {
747                         nr = read(kernel_sock, &msg, sizeof(MESSAGE));
748                         if (nr > 0) {
749                                 if (msg.msg_type == MSG_NET) {
750                                         switch(msg.param.pword[0]) { 
751                                         case NET_TRANSMIT_CODE:
752                                                 transmit_file(msg.param.pword[2], msg.param.pstr, msg.param.pword[1]);  
753                                                 break;
754                                         case NET_EXIT:
755                                                 disconnect_seq();
756                                                 exit_sequence();
757                                                 break;
758                                         case NET_GET_INFO:
759                                                 conn_info(kernel_sock);
760                                                 break;
761                                         case NET_PROPAGATE:
762                                                 propagate_msg(&msg);
763                                                 break;
764                                         case NET_DISCONNECT:
765                                                 disconnect_seq();
766                                                 break;
767                                         case NET_NODE_EXIST:
768                                                 check_node(msg.param.pword[1], kernel_sock);
769                                                 break;
770                                         case NET_CONNECT_TO:
771                                                 connect_seq(msg.param.pstr);
772                                         } /* end switch */
773                                 } /* MSG_NET */
774                                 if (msg.msg_type == MSG_VLP) {
775                                         switch(msg.param.pword[0]) {
776                                         case VLP_REGINT: {
777                                                 pomlink2 = new INTlink;
778                                                 pomlink2->sock = socket(AF_UNIX, SOCK_STREAM, 0);
779                                                 bzero(&svr2, sizeof(svr2));
780                                                 svr2.sun_family = AF_UNIX;
781                                                 strcpy(svr2.sun_path, msg.param.pstr);
782                                                 si = strlen(svr2.sun_path) + sizeof(svr2.sun_family);
783                                                 sj = connect(pomlink2->sock,(struct sockaddr*)&svr2, si);
784                                                 if (sj == 0)
785                                                         fcntl(pomlink2->sock,F_SETFL, O_NONBLOCK | fcntl(pomlink2->sock, F_GETFL, 0));
786                                                 int on = 1;
787                                                 setsockopt(pomlink2->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on));
788                                                 pomlink2->ID = msg.param.pword[1];
789                                                 pomlink2->connected = TRUE;
790                                                 Interpreters.append(pomlink2);
791                                                 };
792                                                 break;
793                                         case VLP_INTERPRETER_DOWN: {
794                                                 pomlink2 = findINTlink(msg.param.pword[1]);
795                                                 if (pomlink2 != NULL) {
796                                                         close(pomlink2->sock);
797                                                         Interpreters.remove(pomlink2);
798                                                 }
799                                                 };
800                                                 break;
801                                         } /* MSg_VLP */
802                                 }
803                         } /* nr > 0 */
804                 }/* ISSET */
805
806                 /* get remote message*/
807
808                 pomlink=Links.first();
809                 while (pomlink != NULL) {
810                         if (FD_ISSET(pomlink->sock, &rset))
811                                 get_message(pomlink);
812                         pomlink=Links.next();
813                 }
814         } // select 
815 }
816 /****************************** END 2010 **************************************/
817
818
819
820 void NETMOD::propagate_msg(MESSAGE *msg)
821 {
822         char ss[255];
823         NETlink *pomlink;
824
825         pomlink = findNETlink(msg->param.pword[4]);
826         if ((pomlink != NULL) && (pomlink->connected))
827                 send_to_node(pomlink,msg);
828         else {
829                 if (msg->param.pword[1] == MSG_INT) {
830                         send_to_int(msg);
831                 } else {
832                         sprintf(ss, "Not connected to Node %d",
833                                                         msg->param.pword[4]);  
834                         write_at_console(ss);
835                 }
836         }
837 }
838
839 void NETMOD::connect_seq(char *a)
840 {
841         NETlink *pom;
842         struct sockaddr_in svr;
843         int j, on;
844
845         pom = Links.first();
846         while (pom != NULL) {
847                 if (strcmp(pom->addr, a) == 0)
848                         return;
849                 pom = Links.next();
850         }
851         pom = new NETlink;
852         strcpy(pom->addr, a);
853         pom->connected = FALSE;
854         pom->sock = socket(AF_INET, SOCK_STREAM, 0); 
855         bzero(&svr, sizeof(svr));
856         svr.sin_family = AF_INET;
857         svr.sin_port = htons(LOGPORT);
858         svr.sin_addr.s_addr = inet_addr(pom->addr);
859         j = connect(pom->sock, (struct sockaddr*)&svr, sizeof(svr));
860         if (j == 0) {
861                 pom->connected = TRUE;
862                 fcntl(pom->sock, F_SETFL, O_NONBLOCK | fcntl(pom->sock, F_GETFL,
863                                                                         0));
864                 on = 1;
865                 setsockopt(pom->sock, IPPROTO_TCP, TCP_NODELAY, (char*)&on,
866                                                                 sizeof(on));
867                 send_connect_info(pom);
868                 Links.append(pom); 
869         } else {
870                 write_at_console("Connection failed");
871         }
872 }
873
874 void NETMOD::check_links()
875 {
876         int j = 1;
877         struct sockaddr_in svr;
878         NETlink *pomlink;
879
880         /* connect to all other nodes */
881         if (!all_connected) {
882                 pomlink = Links.first();
883                 while (pomlink != NULL) {
884                         if (!(pomlink->connected)) {
885                                 bzero(&svr, sizeof(svr));
886                                 svr.sin_family = AF_INET;
887                                 svr.sin_port = htons(LOGPORT);
888                                 svr.sin_addr.s_addr = inet_addr(pomlink->addr);
889
890                                 j = connect(pomlink->sock,
891                                         (struct sockaddr*)&svr, sizeof(svr));
892                                 if (j == 0) {
893                                         pomlink->connected = TRUE;
894                                         fcntl(pomlink->sock, F_SETFL, O_NONBLOCK
895                                                 | fcntl(pomlink->sock, F_GETFL,
896                                                                         0));
897                                         send_connect_info(pomlink);
898                                 }
899                                 if (errno == ECONNREFUSED) {
900                                         sock_reopen(pomlink);
901                                 }
902                         } /* not connected */
903                         pomlink = Links.next();
904                 } /* while */
905         } /* if */
906         all_connected = TRUE;
907         pomlink = Links.first();
908         while(pomlink != NULL) {
909                 if (pomlink->connected == FALSE) {
910                         all_connected = FALSE;
911                         break;
912                 }
913                 pomlink = Links.next();
914         }
915 }
916
917 void NETMOD::sock_reopen(NETlink *lnk)
918 {
919         int on = 1;
920
921         close(lnk->sock);
922         lnk->sock = socket(AF_INET, SOCK_STREAM, 0);
923         fcntl(lnk->sock, F_SETFL,O_NONBLOCK | fcntl(lnk->sock, F_GETFL, 0));
924         setsockopt(lnk->sock, IPPROTO_TCP,TCP_NODELAY, (char*)&on, sizeof(on));
925 }
926
927 /***************** Acknowledges **************************/
928
929 void NETMOD::send_connect_info(NETlink *lnk)
930 {
931         MESSAGE m;
932
933         m.param.pword[0] = NET_CONNECT;
934         m.param.pword[1] = MyNode;
935         m.msg_type = MSG_NET;
936         /* msg2netmsg(&m);*/
937         if (lnk->sock)
938                 write(lnk->sock, &m, sizeof(MESSAGE));
939 }
940
941 void NETMOD::send_accept_info(NETlink *lnk)
942 {
943         MESSAGE m;
944
945         m.param.pword[0] = NET_ACCEPT;
946         m.param.pword[1] = MyNode;
947         m.msg_type = MSG_NET;
948         /* msg2netmsg(&m);*/
949         if (lnk->sock)
950                 write(lnk->sock, &m, sizeof(MESSAGE));
951 }
952
953 void NETMOD::send_code_ack(NETlink *lnk)
954 {
955         MESSAGE m;
956
957         m.param.pword[0] = NET_CODESTREAM_OK;
958         m.msg_type = MSG_NET;
959         /* msg2netmsg(&m);*/
960         if (lnk->sock)
961                 write(lnk->sock, &m, sizeof(MESSAGE));
962 }
963
964 void NETMOD::send_to_all(MESSAGE *msg)
965 {
966         NETlink *pomlink;
967         pomlink = Links.first();
968         while (pomlink != NULL) {
969                 write(pomlink->sock, msg, sizeof(MESSAGE));
970                 pomlink = Links.next();
971         }
972 }
973
974 void NETMOD::run()
975 {
976         while(1) {
977                 /*
978                 accept_connection();
979                 get_internal();
980                 remote_messages();
981                 */
982                 /* 2010*/
983                 doitall();
984         }
985 }
986
987 void NETMOD::exit_sequence()
988 {
989         NETlink *pomlink;
990
991         ::close(kernel_sock);
992         ::close(listen_sock);
993         unlink(kername);
994         pomlink = Links.first();
995         while (pomlink != NULL) {
996                 ::close(pomlink->sock);
997                 pomlink = Links.next();
998         }
999         exit(0);
1000 }
1001
1002 void NETMOD::disconnect_seq()
1003 {
1004         MESSAGE m;
1005         NETlink *p;
1006
1007         bzero(&m, sizeof(MESSAGE));
1008         m.msg_type = MSG_NET;
1009         m.param.pword[0] = NET_DISCONNECT;
1010         m.param.pword[1] = MyNode;
1011
1012         p = Links.first();
1013         while(p != NULL) {
1014                 send_to_node(p, &m);
1015                 p=Links.next();
1016         }
1017         p = Links.first();
1018         while(p != NULL) {
1019                 ::close(p->sock);
1020                 p = Links.next();
1021         }
1022         Links.clear();
1023 }
1024
1025 NETlink *NETMOD::findNETlink(int node)
1026 {
1027         NETlink *pomlink;
1028         pomlink = Links.first();
1029         while(pomlink != NULL) {
1030                 if (pomlink->node_number == node)
1031                         return pomlink;
1032
1033                 pomlink = Links.next();
1034         } 
1035         return pomlink;
1036 }
1037
1038 INTlink *NETMOD::findINTlink(int id)
1039 {
1040         INTlink *pomlink;
1041         pomlink = Interpreters.first();
1042         while(pomlink != NULL) {
1043                 if (pomlink->ID == id)
1044                         return pomlink;
1045                 pomlink = Interpreters.next();
1046         }
1047         return pomlink;
1048 }
1049
1050
1051 /* ----------------    Sending code to a remote node -------------- */
1052
1053 void NETMOD::transmit_file(int ton, char *fname, int fromINT)
1054 {
1055         FILE *f;
1056         MESSAGE msg;
1057         char fn[80];
1058         char b[255];
1059         unsigned char buffer[FILE_BUFFER_SIZE];
1060         protdescr proto;
1061         int i,tsock,sock;
1062         unsigned int sz;
1063         NETlink *outlink;
1064         struct sockaddr_in svr;
1065         fd_set rset;
1066         fd_set wset;
1067
1068         /***************** CCD FILE*/
1069         strcpy(fn, fname);
1070         strcat(fn, ".ccd");
1071         f = fopen(fn, "rb");
1072         if (f != NULL) {
1073                 fseek(f, 0, SEEK_END);
1074                 msg.param.pword[1] = ftell(f);
1075                 fclose(f);
1076                 f = fopen(fn, "rb");
1077
1078                 strcpy(b, rindex(fname, '/'));
1079                 for(i = 0; i < strlen(b); i++)
1080                         b[i] = b[i + 1];
1081
1082                 msg.param.pword[0] = NET_CCD_START;
1083                 strcpy(msg.param.pstr, b);
1084
1085                 outlink = findNETlink(ton);
1086                 if (outlink == NULL)
1087                         exit(1);
1088                 bzero(&svr, sizeof(svr));
1089                 sock = socket(AF_INET, SOCK_STREAM, 0);
1090                 svr.sin_family = AF_INET;
1091                 svr.sin_addr.s_addr = INADDR_ANY;
1092                 svr.sin_port = htons(CODEPORT);
1093                 bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1094                 listen(sock, 5);
1095                 send_to_node(outlink, &msg);
1096                 sz = sizeof(svr);
1097                 FD_ZERO(&rset);
1098                 FD_ZERO(&wset);
1099                 FD_SET(sock, &rset);
1100                 if (select(sock + 1, &rset, &wset, 0, 0))
1101                         if (FD_ISSET(sock,&rset))
1102                                 tsock = accept(sock, (struct sockaddr*)&svr,
1103                                                                         &sz);
1104                 if (tsock > 0) {
1105                         close(sock);
1106                         while (!feof(f)) {
1107                                 i = fread(&buffer, 1, sizeof(buffer), f);
1108                                 write(tsock, &buffer, i);
1109                                 FD_ZERO(&rset);
1110                                 FD_ZERO(&wset);
1111                                 FD_SET(tsock, &wset);
1112                                 select(tsock + 1, &rset, &wset, 0, 0);
1113                         }
1114                         close(tsock);
1115                 }
1116                 fclose(f);
1117         } // f!=NULL
1118         else {
1119                 sprintf(b, "Cannot open file to send %s\n", fname);
1120                 write_at_console(b);
1121         }
1122
1123
1124         /**************** PCD FILE */
1125
1126         strcpy(fn, fname);
1127         strcat(fn, ".pcd");
1128         f = fopen(fn, "r");
1129         if (f != NULL) {
1130                 fseek(f, 0, SEEK_END);
1131                 msg.param.pword[1] = ftell(f);
1132                 fclose(f);
1133                 f = fopen(fn, "rb");
1134
1135                 strcpy(b, rindex(fname, '/'));
1136                 for(i = 0; i < strlen(b); i++)
1137                         b[i] = b[i + 1];
1138                 msg.param.pword[0] = NET_PCD_START;
1139                 strcpy(msg.param.pstr, b);
1140
1141                 outlink = findNETlink(ton);
1142                 if (outlink == NULL)
1143                         exit(1);
1144                 bzero(&svr, sizeof(svr));
1145                 sock = socket(AF_INET, SOCK_STREAM, 0);
1146                 svr.sin_family = AF_INET;
1147                 svr.sin_addr.s_addr = INADDR_ANY;
1148                 svr.sin_port = htons(CODEPORT1);
1149                 bind(sock, (struct sockaddr*)&svr, sizeof(svr));
1150                 listen(sock, 5);
1151                 send_to_node(outlink, &msg);
1152                 sz = sizeof(svr);
1153                 FD_ZERO(&rset);
1154                 FD_ZERO(&wset);
1155                 FD_SET(sock, &rset);
1156                 if (select(sock + 1, &rset, &wset, 0, 0))
1157                         if (FD_ISSET(sock, &rset))
1158                                 tsock = accept(sock, (struct sockaddr*)&svr,
1159                                                                         &sz);
1160                 if (tsock>0) {
1161                         close(sock);
1162                         while (!feof(f)) {
1163                                 i = fread(&proto, 1, sizeof(proto), f);
1164                                 write(tsock, &proto, i);
1165                                 FD_ZERO(&rset);
1166                                 FD_ZERO(&wset);
1167                                 FD_SET(tsock, &wset);
1168                                 select(tsock + 1, &rset, &wset, 0, 0);
1169                         }
1170                         close(tsock);
1171                 }
1172                 fclose(f);
1173         } // f!=NULL
1174         else {
1175                 sprintf(b, "Cannot open file to send %s\n", fname);
1176                 write_at_console(b);
1177         }
1178
1179         msg.msg_type = MSG_NET; 
1180         msg.param.pword[0] = NET_TRANSMITTED;
1181         msg.param.pword[1] = fromINT;
1182         send_to_kernel(&msg);
1183 }
1184
1185 void NETMOD::conn_info(int sk)
1186 {
1187         NETlink *pom;
1188         MESSAGE m;
1189         int k = 0;
1190         char poms[255];
1191
1192         m.msg_type = MSG_NET;
1193         m.param.pword[0] = NET_INFO;
1194         strcpy(m.param.pstr, "");
1195         pom = Links.first();
1196         while (pom != NULL) {
1197                 sprintf(poms, "%d=%s;", pom->node_number, pom->addr);
1198                 strcat(m.param.pstr, poms);
1199                 k++;
1200                 if (k == 12) {
1201                         m.param.pword[1] = 12;
1202                         write(sk, &m, sizeof(MESSAGE));
1203                         k = 0;
1204                 }
1205                 pom = Links.next();
1206         }
1207         if (k > 0) {
1208                 m.param.pword[1] = k;
1209                 write(sk, &m, sizeof(MESSAGE));
1210         }
1211         m.msg_type = MSG_NET;
1212         m.param.pword[0] = NET_INFO_END;
1213         write(sk, &m, sizeof(MESSAGE));
1214 }
1215
1216 int main(int argc,char **argv)
1217 {
1218         NETMOD netter(argv[1]);
1219         netter.run();
1220         return 0;
1221 }