Clean lots of warnings
[fmail.git] / src / socketipc.cpp
blob908786a677bff6d83e918b8f85aaa2c3379c20e2
1 /*
2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <stdio.h>
22 #include <string.h>
23 #include <malloc.h>
25 #include <pcrecpp.h>
27 #include <queue>
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket *s){
35 sock = s;
38 /* Create a socket IPC from uri, create
39 * sockets and parse host and port. */
40 SocketIPC::SocketIPC(char *uri){
41 char *str;
42 string shost;
43 string options;
44 pcrecpp::RE re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
46 re.PartialMatch(uri, &shost, &port, &options);
47 host = (char*)shost.c_str();
49 printf("Started socket on: %s\n", uri);
50 printf("Host: %s\nPort: %i\n", host, port);
51 sock = Socket::CreateSocket(SOCKET_INET, 0);
52 sock->setPort(port);
53 sock->setAddress(host);
54 auxsock = NULL;
57 /* Connect to remote process for IPC */
58 int SocketIPC::RequestIPC(){
59 int ret;
60 int i;
62 for (i = 0; i < 20; i++){
63 ret = sock->Connect();
64 if (ret == 0)
65 break;
68 if (ret)
69 printf("Unable to connect to IPC\n");
71 return ret;
74 /* Wait for incoming IPC */
75 int SocketIPC::ListenIPC(){
76 sock->Bind();
77 sock->Listen(10);
79 auxsock = sock->Accept();
80 return 0;
83 /* Close IPC Session */
84 int SocketIPC::CloseIPC(){
85 sock->Close();
86 if (auxsock)
87 auxsock->Close();
88 return 0;
91 /* Lousy Parser, just works
92 * TODO: May not work with fragmented buffers.
93 * TODO: Write something more robust.
94 * */
95 int SocketIPC::ParseLoose(){
96 char buffer[255], abuffer[255];
97 int ret, state, onParse;
98 int i, j, r, l, argc, argn;
99 IPCMessage *msg = NULL;
101 //printf("Loose Parsing\n");
103 /* Initialize our state variables */
104 state = 0;
105 onParse = 1;
107 while(onParse){
108 ret = sock->Poll(-1, SOCKET_POLL_READ);
110 if (ret & SOCKET_POLL_READ){
111 /* Clear our read buffer */
112 memset(buffer, 0, 255);
114 /* Read incoming data */
115 r = sock->Read(buffer, 255);
116 }else if(ret & SOCKET_POLL_ERROR){
117 printf("Socket Error");
118 return 0;
119 }else{
120 //printf("Timeout\n");
121 r = 0;
122 return 0;
125 /* Check if we have more data */
126 if (r < 1){
127 onParse = 0;
128 }else{
129 printf("IPC: Buffer(%i): %s\n", state, buffer);
132 /* Consume the buffer */
133 for (i = 0; i < r; i++){
134 switch(state){
135 case 0: /* PARSES: MSG[ */
136 /* Once we reached the data marker change state */
137 if (buffer[i] == '['){
138 state = 1;
139 j = 0;
140 memset(abuffer, 0, 255);
142 break;
143 case 1: /* PARSES: \d] */
144 if (buffer[i] == ']'){
145 l = atoi(abuffer);
146 printf("IPC: MSG Header Len %i\n", l);
147 state = 2;
148 }else{
149 /* Copy character to our temporary buffer */
150 abuffer[j] = buffer[i];
151 j++;
153 case 2: /* PARSES: [ */
154 /* Once we reached the data marker change state */
155 if (buffer[i] == '['){
156 state = 3;
157 j = 0;
158 memset(abuffer, 0, 255);
160 break;
161 case 3: /* PARSES: \d] */
162 if (buffer[i] == ']'){
163 argc = atoi(abuffer);
165 printf("IPC: MSG Argc %i\n", argc);
167 j = 0;
168 state = 4;
169 memset(abuffer, 0, 255);
170 }else{
171 /* Copy character to our temporary buffer */
172 abuffer[j] = buffer[i];
173 j++;
175 break;
176 case 4: /* Reads l chars */
177 /* Copy character to our temporary buffer */
178 abuffer[j] = buffer[i];
179 j++;
181 if (j == l){
182 printf("IPC: msg header: %s\n", abuffer);
183 msg = new IPCMessage(abuffer);
184 state = 5;
185 argn = 0;
187 break;
188 case 5: /* Check Param Count */
189 printf("IPC: argc(%i) argn(%i)\n", argc, argn);
190 if (argn < argc){
191 state = 6;
192 }else{
193 state = -1;
194 onParse = 0;
196 break;
197 case 6: /* PARSES: PARAM[ */
198 /* Once we reached the data marker change state */
199 if (buffer[i] == '['){
200 state = 7;
201 j = 0;
202 memset(abuffer, 0, 255);
204 break;
205 case 7: /* PARSES: \d] */
206 if (buffer[i] == ']'){
207 l = atoi(abuffer);
208 printf("IPC: arg len: %i\n", l);
209 state = 8;
210 j = 0;
211 memset(abuffer, 0, 255);
212 }else{
213 /* Copy character to our temporary buffer */
214 abuffer[j] = buffer[i];
215 j++;
217 break;
218 case 8: /* Reads l chars */
219 /* Copy character to our temporary buffer */
220 abuffer[j] = buffer[i];
221 j++;
223 if (j == l){
224 printf("IPC: param: %s\n", abuffer);
225 msg->PushParam(abuffer);
226 argn++;
227 if (argn == argc){
228 onParse = 0;
229 }else{
230 state = 6;
233 break;
234 default:
235 onParse = 0;
236 break;
242 if (msg != NULL){
243 printf("IPC Message Pushed\n");
244 msgQueue.push(msg);
245 return 1;
247 return 0;
250 /* Not in use, old parsing function.
251 * It is too strict and has some problems */
252 int SocketIPC::FetchMessage2(){
253 char buffer[255], xbuff[255], tbuff[20], *msgdata;
254 int state, onParse;
255 int i, t, r, mlen, margc;
256 IPCMessage *msg;
258 //printf("IPC: Parsing Message\n");
259 state = 0;
260 onParse = 1;
262 mlen = 0;
263 margc = 0;
264 msg = NULL;
266 if (auxsock == NULL)
267 auxsock = sock;
269 while (onParse){
270 //printf("\tReading Buffer\n");
272 memset(buffer, 0, 255);
273 r = auxsock->Read(buffer, 255);
274 //printf("\tBuffer Contents: %s\n", buffer);
276 if (r < 1){
277 return 0;
278 state = -1;
279 onParse = 0;
281 printf("IPC: buffer contents: %s\n", buffer);
283 for (i = 0; i < r; i++){
284 switch (state){
285 case 0:
286 if (buffer[i] == 'M')
287 state++;
288 break;
289 case 1:
290 if (buffer[i] == 'S')
291 state++;
292 break;
293 case 2:
294 if (buffer[i] == 'G')
295 state++;
296 break;
297 case 3:
298 if (buffer[i] == '['){
299 state++;
300 memset(tbuff, 0, 20);
301 t = 0;
303 break;
304 case 4:
305 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
306 tbuff[t] = buffer[i];
307 t++;
308 if (t == 20)
309 state = -1;
310 }else{
311 mlen = atoi(tbuff);
312 state++;
314 break;
315 case 5:
316 if (buffer[i] == '['){
317 state++;
318 memset(tbuff, 0, 20);
319 t = 0;
321 break;
322 case 6:
323 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
324 tbuff[t] = buffer[i];
325 t++;
326 if (t == 20)
327 state = -1;
328 }else{
329 margc = atoi(tbuff);
330 state++;
331 if (mlen < 255){
332 msgdata = xbuff;
333 }else{
334 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
336 t = 0;
338 break;
339 case 7:
340 if (t < mlen){
341 msgdata[t] = buffer[i];
342 t++;
343 if (t == mlen){
344 msgdata[t] = 0;
345 state++;
348 break;
349 case 8:
350 printf("Msg Header: %s\n", msgdata);
351 msg = new IPCMessage(msgdata);
352 if (msgdata != xbuff)
353 free(msgdata);
354 state++;
356 if (margc == 0)
357 state = 20;
358 i--;
359 break;
360 case 9:
361 if (buffer[i] == 'P')
362 state++;
363 break;
364 case 10:
365 if (buffer[i] == 'A')
366 state++;
367 break;
368 case 11:
369 if (buffer[i] == 'R')
370 state++;
371 break;
372 case 12:
373 if (buffer[i] == 'A')
374 state++;
375 break;
376 case 13:
377 if (buffer[i] == 'M')
378 state++;
379 break;
380 case 14:
381 if (buffer[i] == '['){
382 state++;
383 memset(tbuff, 0, 20);
384 t = 0;
386 break;
387 case 15:
388 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
389 tbuff[t] = buffer[i];
390 t++;
391 if (t == 20)
392 state = -1;
393 }else{
394 mlen = atoi(tbuff);
395 state++;
397 break;
398 case 16:
399 if (mlen < 255){
400 msgdata = xbuff;
401 }else{
402 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
404 t = 0;
405 state++;
406 i--;
407 break;
408 case 17:
409 if (t < mlen){
410 msgdata[t] = buffer[i];
411 t++;
412 if (t == mlen){
413 msgdata[t] = 0;
414 state++;
415 i--;
418 break;
419 case 18:
420 msg->PushParam(msgdata);
421 if (msgdata != xbuff)
422 free(msgdata);
423 if (msg->ParamCount() == margc){
424 state++;
425 }else{
426 state = 9;
428 onParse = 0;
429 break;
430 default:
431 onParse = 0;
432 break;
437 if (state == -1){
438 if (msg)
439 delete msg;
440 return 0;
443 printf("IPC Message Pushed\n");
444 msgQueue.push(msg);
445 return 1;
448 int SocketIPC::PeekMessage(){
450 if(ParseLoose())
451 return 1;
453 if (msgQueue.size() > 0)
454 return 1;
456 return 0;
459 int SocketIPC::SendMessage(IPCMessage *msg){
460 return PushMessage(msg);
463 IPCMessage *SocketIPC::ReciveMessage(){
464 IPCMessage *msg;
466 //printf("IPC: Recive Message\n");
467 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
469 if (msgQueue.size() > 0){
470 //printf("\tMSG Already on Queue\n");
471 msg = msgQueue.front();
472 msgQueue.pop();
473 return msg;
476 //printf("\tPolling for Incoming Message\n");
478 /* TODO: Add a timeout or max try limit */
479 while(msgQueue.size() < 1){
480 ParseLoose();
483 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
485 msg = msgQueue.front();
486 msgQueue.pop();
488 //printf("IPC: Message Retrieved\n");
489 return msg;
492 char *makemsg(IPCMessage *msg, int *r){
493 char *ret, *tmp;
494 int len, i, j, offset;
496 len = 40;
498 len += msg->GetPayload();
499 len += msg->ParamCount() * 15;
501 ret = (char*)malloc(sizeof(char) * len);
502 sprintf(ret, "MSG[%i][%i]%s", strlen(msg->GetMessageName()), msg->ParamCount(), msg->GetMessageName());
504 offset = strlen(ret);
505 j = msg->ParamCount();
507 for (i = 0; i < j; i++){
508 tmp = msg->PopParam();
509 len = strlen(tmp);
510 if (len < 10){
511 len++;
512 }else if(len < 100){
513 len += 2;
514 }else if(len < 1000){
515 len += 3;
517 len += 7;
519 sprintf((char*)((int)ret+offset), "PARAM[%i]%s", strlen(tmp), tmp);
520 offset += len;
521 free(tmp);
524 *r = offset;
525 return ret;
528 /* TODO: Better memory management for IPCMessage */
529 int SocketIPC::PushMessage(IPCMessage *msg){
530 char *tmp, buffer[50];
531 int l, blen, r;
533 tmp = makemsg(msg, &l);
534 //printf("IPCMSG: %s\n", tmp);
536 sock->Write(tmp, l);
537 free(tmp);
539 delete msg;
541 return 0;
543 printf("IPC: Pushing Message\n");
545 tmp = msg->GetMessageName();
546 l = strlen(tmp);
548 sprintf(buffer, "MSG[%i][%i]", l, msg->ParamCount());
549 blen = strlen(buffer);
551 printf("\tBegin Header: %s%s\n", buffer, tmp);
553 sock->Poll(-1, SOCKET_POLL_WRITE);
554 r = sock->Write(buffer, blen);
556 if (r < blen){
557 printf("\tError Writting Header\n");
560 sock->Poll(-1, SOCKET_POLL_WRITE);
561 r = sock->Write(tmp, l);
562 if (r < l){
563 printf("\tError Writting Header\n");
566 while((tmp = msg->PopParam())){
567 l = strlen(tmp);
569 sprintf(buffer, "PARAM[%i]", l);
570 blen = strlen(buffer);
572 printf("\tBegin param: %s%s\n", buffer, tmp);
574 sock->Poll(-1, SOCKET_POLL_WRITE);
575 r = sock->Write(buffer, blen);
576 if (r < blen){
577 printf("\tError Writting Header\n");
579 sock->Poll(-1, SOCKET_POLL_WRITE);
580 r = sock->Write(tmp, l);
581 if (r < l){
582 printf("\tError Writting Header\n");
585 free(tmp);
588 printf("IPC Message Send OK!\n");
590 return 1;
593 IPCMessage *SocketIPC::PopMessage(){
594 IPCMessage *msg;
596 msg = msgQueue.front();
597 msgQueue.pop();
599 return msg;
602 int SocketIPC::RawRead(char *buff, int size){
603 return sock->Read(buff, size);
606 int SocketIPC::RawWrite(char *buff, int size){
607 return sock->Write(buff, size);