2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket
*s
){
38 /* Create a socket IPC from uri, create
39 * sockets and parse host and port. */
40 SocketIPC::SocketIPC(char *uri
){
43 pcrecpp::RE
re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
45 re
.PartialMatch(uri
, &shost
, &port
, &options
);
46 host
= (char*)shost
.c_str();
48 sock
= Socket::CreateSocket(SOCKET_INET
, 0);
50 sock
->setAddress(host
);
54 /* Connect to remote process for IPC */
55 int SocketIPC::RequestIPC(){
59 for (i
= 0; i
< 20; i
++){
60 ret
= sock
->Connect();
68 /* Wait for incoming IPC */
69 int SocketIPC::ListenIPC(){
73 auxsock
= sock
->Accept();
77 /* Close IPC Session */
78 int SocketIPC::CloseIPC(){
85 /* Lousy Parser, just works
86 * TODO: May not work with fragmented buffers.
87 * TODO: Write something more robust.
89 int SocketIPC::ParseLoose(){
90 char buffer
[255], abuffer
[255];
91 int ret
, state
, onParse
;
92 int i
, j
, r
, l
, argc
, argn
;
93 IPCMessage
*msg
= NULL
;
95 //printf("Loose Parsing\n");
97 /* Initialize our state variables */
102 ret
= sock
->Poll(-1, SOCKET_POLL_READ
);
104 if (ret
& SOCKET_POLL_READ
){
105 /* Clear our read buffer */
106 memset(buffer
, 0, 255);
108 /* Read incoming data */
109 r
= sock
->Read(buffer
, 255);
110 }else if(ret
& SOCKET_POLL_ERROR
){
111 printf("Socket Error");
114 //printf("Timeout\n");
119 /* Check if we have more data */
123 //printf("IPC: Buffer(%i): %s\n", state, buffer);
126 /* Consume the buffer */
127 for (i
= 0; i
< r
; i
++){
129 case 0: /* PARSES: MSG[ */
130 /* Once we reached the data marker change state */
131 if (buffer
[i
] == '['){
134 memset(abuffer
, 0, 255);
137 case 1: /* PARSES: \d] */
138 if (buffer
[i
] == ']'){
140 //printf("IPC: MSG Header Len %i\n", l);
143 /* Copy character to our temporary buffer */
144 abuffer
[j
] = buffer
[i
];
147 case 2: /* PARSES: [ */
148 /* Once we reached the data marker change state */
149 if (buffer
[i
] == '['){
152 memset(abuffer
, 0, 255);
155 case 3: /* PARSES: \d] */
156 if (buffer
[i
] == ']'){
157 argc
= atoi(abuffer
);
159 //printf("IPC: MSG Argc %i\n", argc);
163 memset(abuffer
, 0, 255);
165 /* Copy character to our temporary buffer */
166 abuffer
[j
] = buffer
[i
];
170 case 4: /* Reads l chars */
171 /* Copy character to our temporary buffer */
172 abuffer
[j
] = buffer
[i
];
176 //printf("IPC: msg header: %s\n", abuffer);
177 msg
= new IPCMessage(abuffer
);
182 case 5: /* Check Param Count */
183 //printf("IPC: argc(%i) argn(%i)\n", argc, argn);
191 case 6: /* PARSES: PARAM[ */
192 /* Once we reached the data marker change state */
193 if (buffer
[i
] == '['){
196 memset(abuffer
, 0, 255);
199 case 7: /* PARSES: \d] */
200 if (buffer
[i
] == ']'){
202 //printf("IPC: arg len: %i\n", l);
205 memset(abuffer
, 0, 255);
207 /* Copy character to our temporary buffer */
208 abuffer
[j
] = buffer
[i
];
212 case 8: /* Reads l chars */
213 /* Copy character to our temporary buffer */
214 abuffer
[j
] = buffer
[i
];
218 //printf("IPC: param: %s\n", abuffer);
219 msg
->PushParam(abuffer
);
237 //printf("IPC Message Pushed\n");
244 /* Not in use, old parsing function.
245 * It is too strict and has some problems */
246 int SocketIPC::FetchMessage2(){
247 char buffer
[255], xbuff
[255], tbuff
[20], *msgdata
;
249 int i
, t
, r
, mlen
, margc
;
252 //printf("IPC: Parsing Message\n");
264 //printf("\tReading Buffer\n");
266 memset(buffer
, 0, 255);
267 r
= auxsock
->Read(buffer
, 255);
268 //printf("\tBuffer Contents: %s\n", buffer);
275 //printf("IPC: buffer contents: %s\n", buffer);
277 for (i
= 0; i
< r
; i
++){
280 if (buffer
[i
] == 'M')
284 if (buffer
[i
] == 'S')
288 if (buffer
[i
] == 'G')
292 if (buffer
[i
] == '['){
294 memset(tbuff
, 0, 20);
299 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
300 tbuff
[t
] = buffer
[i
];
310 if (buffer
[i
] == '['){
312 memset(tbuff
, 0, 20);
317 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
318 tbuff
[t
] = buffer
[i
];
328 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
335 msgdata
[t
] = buffer
[i
];
344 printf("Msg Header: %s\n", msgdata
);
345 msg
= new IPCMessage(msgdata
);
346 if (msgdata
!= xbuff
)
355 if (buffer
[i
] == 'P')
359 if (buffer
[i
] == 'A')
363 if (buffer
[i
] == 'R')
367 if (buffer
[i
] == 'A')
371 if (buffer
[i
] == 'M')
375 if (buffer
[i
] == '['){
377 memset(tbuff
, 0, 20);
382 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
383 tbuff
[t
] = buffer
[i
];
396 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
404 msgdata
[t
] = buffer
[i
];
414 msg
->PushParam(msgdata
);
415 if (msgdata
!= xbuff
)
417 if (msg
->ParamCount() == margc
){
437 printf("IPC Message Pushed\n");
442 int SocketIPC::PeekMessage(){
446 if (msgQueue
.size() > 0)
452 int SocketIPC::SendMessage(IPCMessage
*msg
){
453 return PushMessage(msg
);
456 IPCMessage
*SocketIPC::ReciveMessage(){
459 //printf("IPC: Recive Message\n");
460 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
462 if (msgQueue
.size() > 0){
463 //printf("\tMSG Already on Queue\n");
464 msg
= msgQueue
.front();
469 //printf("\tPolling for Incoming Message\n");
471 /* TODO: Add a timeout or max try limit */
472 while(msgQueue
.size() < 1){
476 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
478 msg
= msgQueue
.front();
481 //printf("IPC: Message Retrieved\n");
485 char *makemsg(IPCMessage
*msg
, int *r
){
492 len
+= msg
->GetPayload();
493 len
+= msg
->ParamCount() * 15;
495 ret
= (char*)malloc(sizeof(char) * len
);
498 sprintf(ret
, "MSG[%i][%i]%s", strlen(msg
->GetMessageName()), msg
->ParamCount(), msg
->GetMessageName());
500 offset
= strlen(ret
);
501 j
= msg
->ParamCount();
503 for (i
= 0; i
< j
; i
++){
504 tmp
= msg
->PopParam();
510 }else if(len
< 1000){
515 sprintf((char*)((int)ret
+offset
), "PARAM[%i]%s", strlen(tmp
), tmp
);
524 /* TODO: Better memory management for IPCMessage */
525 int SocketIPC::PushMessage(IPCMessage
*msg
){
526 char *tmp
, buffer
[50];
529 tmp
= makemsg(msg
, &l
);
530 //printf("IPCMSG: %s\n", tmp);
538 //printf("IPC: Pushing Message\n");
540 tmp
= msg
->GetMessageName();
543 sprintf(buffer
, "MSG[%i][%i]", l
, msg
->ParamCount());
544 blen
= strlen(buffer
);
546 //printf("\tBegin Header: %s%s\n", buffer, tmp);
548 sock
->Poll(-1, SOCKET_POLL_WRITE
);
549 r
= sock
->Write(buffer
, blen
);
552 printf("\tError Writting Header\n");
555 sock
->Poll(-1, SOCKET_POLL_WRITE
);
556 r
= sock
->Write(tmp
, l
);
558 printf("\tError Writting Header\n");
561 while((tmp
= msg
->PopParam())){
564 sprintf(buffer
, "PARAM[%i]", l
);
565 blen
= strlen(buffer
);
567 //printf("\tBegin param: %s%s\n", buffer, tmp);
569 sock
->Poll(-1, SOCKET_POLL_WRITE
);
570 r
= sock
->Write(buffer
, blen
);
572 printf("\tError Writting Header\n");
574 sock
->Poll(-1, SOCKET_POLL_WRITE
);
575 r
= sock
->Write(tmp
, l
);
577 printf("\tError Writting Header\n");
583 //printf("IPC Message Send OK!\n");
588 IPCMessage
*SocketIPC::PopMessage(){
591 msg
= msgQueue
.front();
597 int SocketIPC::RawRead(char *buff
, int size
){
598 return sock
->Read(buff
, size
);
601 int SocketIPC::RawWrite(char *buff
, int size
){
602 return sock
->Write(buff
, size
);