Using only one message queue.
[wsti_so.git] / src / process2.c
1 #include <stdio.h>
2 /* exit.. */
3 #include <stdlib.h>
4 /* strsignal... */
5 #include <string.h>
6
7 /* open/read/write/close */
8 #include <fcntl.h>
9
10 /* Signals handling.. */
11 #include <signal.h>
12
13 #include <sys/shm.h>
14
15 /** If buffer is too small to hold entire string, it is incremented by this value */
16 #define BUFFER_STEP 16
17
18 /** Named pipe used to communicate with process1 */
19 char * read_pipe = "/tmp/process1pipe";
20
21 /** Named pipe used to communicate with process3 */
22 char * write_pipe = "/tmp/process2pipe";
23
24 /** Descriptor of input pipe */
25 int read_descriptor;
26
27 /** Descriptor of output pipe */
28 int write_descriptor;
29
30
31 /**
32  * Shared memory variables
33  */
34
35 /**
36  * Memory key for processes. Must be same between all processes to properly
37  * communicate.
38  */
39 key_t shmkey = 18912;
40 /**
41  * Id of the shared memory
42  */
43 int shmid;
44
45 /**
46  * Message shared by processes. Contains array of process IDs
47  */
48 struct message {
49         pid_t pids[3];
50 };
51
52 struct message * processes = NULL;
53
54 /**
55  * Message queue variables
56  */
57 key_t qkey = 12356;
58 int qid;
59
60 struct queue_message {
61         long mtype;
62         int signo[1];
63 };
64
65 void notify_other_processes(int signo) {
66         int i = 0;
67         struct queue_message msg;
68         msg.signo[0] = signo;
69
70         for (; i < 3; i++) {
71                 pid_t pid = processes->pids[i];
72                 // Bleh
73                 if (i != 1 && pid != 0) {
74                         msg.mtype = i+1;
75                         fprintf(stderr, "[%s] Sending message of type (%d) with value %d\n", "process1", msg.mtype, msg.signo[0]);
76                         msgsnd(qid, &msg, sizeof(msg), 0);
77                         fprintf(stderr, "[%s] Sending signal %s (%d) to PID: %d\n", "process2", strsignal(SIGUSR1), SIGUSR1, pid);
78                         kill(pid, SIGUSR1);
79                 }
80         }
81 }
82
83 /**
84  * Handler for signals.
85  */
86 void sig_handler(int signo)
87 {
88         fprintf(stderr, "[%s] Received %s!\n", "process2", strsignal(signo));
89         if (signo == SIGUSR1) {
90                 fprintf(stderr, "[%s] > Notified!\n", "process2");
91                 struct queue_message msg;
92                 /* Check queues from both other processes */
93                 if (msgrcv(qid, &msg, sizeof(int), 2, 0) > 0) {
94                         fprintf(stderr, "[%s] > Notified with value: %s!\n", "process2", strsignal(msg.signo[0]));
95                         raise(msg.signo[0]);
96                 }
97                 else if (msgrcv(qid, &msg, sizeof(int), 2, 0) > 0) {
98                         fprintf(stderr, "[%s] > Notified with value: %s!\n", "process2", strsignal(msg.signo[0]));
99                         raise(msg.signo[0]);
100                 }
101         }
102         else if (signo == SIGTERM) {
103                 fprintf(stderr, "[%s] > Signalling other processes..\n", "process2");
104                 processes->pids[1] = 0;
105                 notify_other_processes(signo);
106
107                 fprintf(stderr, "[%s] > Releasing resources\n", "process2");
108                 close(read_descriptor);
109                 close(write_descriptor);
110                 unlink(write_descriptor);
111                 exit(0);
112         }
113         else if (signo == SIGTSTP) {
114                 fprintf(stderr, "[%s] > Close reading pipe\n", "process2");
115                 close(read_descriptor);
116                 notify_other_processes(signo);
117                 fprintf(stderr, "[%s] > Close writing pipe\n", "process2");
118                 close(write_descriptor);
119                 raise (SIGSTOP);
120         }
121         else if (signo == SIGCONT) {
122                 fprintf(stderr, "[%s] > Signalling other processes..\n", "process2");
123                 notify_other_processes(signo);
124
125                 fprintf(stderr, "[%s] > Opening pipes\n", "process2");
126                 write_descriptor = open(write_pipe, O_WRONLY);
127                 read_descriptor = open(read_pipe, O_RDONLY);
128         }
129 }
130
131 /**
132  * Program grabs data from process1, calculates number of characters in each line
133  * and pass the value to process3.
134  */
135 int main(void) {
136         /**
137          * Buffer used for storing data from input pipe.
138          * Data is stored in chunks of BUFFER_STEP size.
139          * If data during reading is bigger than this value, then number of
140          * characters is saved, and buffer is cleared for reading another chunk.
141          */
142         char buffer[BUFFER_STEP];
143
144         /** Index used when iterating buffer */
145         int i = 0;
146
147         /** Stores number of bytes read from input pipe in current iteration */
148         ssize_t count = 0;
149
150         int number_of_characters = 0;
151
152         fprintf(stderr, "[%s] Init!\n", "process2");
153
154         /**
155          * Register signals handled by process
156          */
157         if (signal(SIGUSR1, sig_handler) == SIG_ERR) {
158                 fprintf(stderr, "can't catch SIGUSR1\n");
159         }
160         if (signal(SIGTERM, sig_handler) == SIG_ERR) {
161                 fprintf(stderr, "can't catch SIGTERM\n");
162         }
163         if (signal(SIGTSTP, sig_handler) == SIG_ERR) {
164                 fprintf(stderr, "can't catch SIGTSTP\n");
165         }
166         if (signal(SIGCONT, sig_handler) == SIG_ERR) {
167                 fprintf(stderr, "can't catch SIGCONT\n");
168         }
169
170         /*
171          * Register memory to share with other processes, and pass current
172          * process id to the array.
173          */
174         shmid = shmget(shmkey, sizeof(struct message), 0666);
175
176         processes = (struct message *)shmat(shmid, NULL, 0);
177         processes->pids[1] = getpid();
178
179         fprintf(stderr, "[%s] Shared pid: %d\n", "process2", getpid());
180
181         /**
182          * Register message queue to communicate with other processes
183          */
184         qid = msgget(qkey, 0666);
185
186         /* Reading from process1 */
187         read_descriptor = open(read_pipe, O_RDONLY);
188
189         /* Writing to process2 */
190         mkfifo(write_pipe, 0666);
191         write_descriptor = open(write_pipe, O_WRONLY);
192
193         while(1) {
194                 /* Read data from input pipe */
195                 count = read(read_descriptor, buffer, BUFFER_STEP);
196
197                 fprintf(stderr, "[%s] Fetched: %d bytes\n", "process2", count);
198
199                 if (count > 0) {
200                         for (i = 0; i < count; i++, number_of_characters++) {
201                                 if (buffer[i] == '\n') {
202                                         fprintf(stderr, "[%s] Calculated: %d characters. Sending...\n", "process2", number_of_characters);
203                                         write(write_descriptor, &number_of_characters, sizeof(number_of_characters));
204                                         write(write_descriptor, '\n', 1);
205                                         number_of_characters = 0;
206                                 }
207                         }
208                 }
209                 else {
210                         break;
211                 }
212         }
213
214         /* Release resources in normal program flow exit. */
215         close(read_descriptor);
216         close(write_descriptor);
217         unlink(write_descriptor);
218
219         return 0;
220 }