2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket
*s
){
38 /* Create a socket IPC from uri, create
39 * sockets and parse host and port. */
40 SocketIPC::SocketIPC(char *uri
){
44 pcrecpp::RE
re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
46 re
.PartialMatch(uri
, &shost
, &port
, &options
);
47 host
= (char*)shost
.c_str();
49 printf("Started socket on: %s\n", uri
);
50 printf("Host: %s\nPort: %i\n", host
, port
);
51 sock
= Socket::CreateSocket(SOCKET_INET
, 0);
53 sock
->setAddress(host
);
57 /* Connect to remote process for IPC */
58 int SocketIPC::RequestIPC(){
62 for (i
= 0; i
< 20; i
++){
63 ret
= sock
->Connect();
69 printf("Unable to connect to IPC\n");
74 /* Wait for incoming IPC */
75 int SocketIPC::ListenIPC(){
79 auxsock
= sock
->Accept();
83 /* Close IPC Session */
84 int SocketIPC::CloseIPC(){
91 /* Lousy Parser, just works
92 * TODO: May not work with fragmented buffers.
93 * TODO: Write something more robust.
95 int SocketIPC::ParseLoose(){
96 char buffer
[255], abuffer
[255];
97 int ret
, state
, onParse
;
98 int i
, j
, r
, l
, argc
, argn
;
99 IPCMessage
*msg
= NULL
;
101 //printf("Loose Parsing\n");
103 /* Initialize our state variables */
108 ret
= sock
->Poll(-1, SOCKET_POLL_READ
);
110 if (ret
& SOCKET_POLL_READ
){
111 /* Clear our read buffer */
112 memset(buffer
, 0, 255);
114 /* Read incoming data */
115 r
= sock
->Read(buffer
, 255);
116 }else if(ret
& SOCKET_POLL_ERROR
){
117 printf("Socket Error");
120 //printf("Timeout\n");
125 /* Check if we have more data */
129 printf("IPC: Buffer(%i): %s\n", state
, buffer
);
132 /* Consume the buffer */
133 for (i
= 0; i
< r
; i
++){
135 case 0: /* PARSES: MSG[ */
136 /* Once we reached the data marker change state */
137 if (buffer
[i
] == '['){
140 memset(abuffer
, 0, 255);
143 case 1: /* PARSES: \d] */
144 if (buffer
[i
] == ']'){
146 printf("IPC: MSG Header Len %i\n", l
);
149 /* Copy character to our temporary buffer */
150 abuffer
[j
] = buffer
[i
];
153 case 2: /* PARSES: [ */
154 /* Once we reached the data marker change state */
155 if (buffer
[i
] == '['){
158 memset(abuffer
, 0, 255);
161 case 3: /* PARSES: \d] */
162 if (buffer
[i
] == ']'){
163 argc
= atoi(abuffer
);
165 printf("IPC: MSG Argc %i\n", argc
);
169 memset(abuffer
, 0, 255);
171 /* Copy character to our temporary buffer */
172 abuffer
[j
] = buffer
[i
];
176 case 4: /* Reads l chars */
177 /* Copy character to our temporary buffer */
178 abuffer
[j
] = buffer
[i
];
182 printf("IPC: msg header: %s\n", abuffer
);
183 msg
= new IPCMessage(abuffer
);
188 case 5: /* Check Param Count */
189 printf("IPC: argc(%i) argn(%i)\n", argc
, argn
);
197 case 6: /* PARSES: PARAM[ */
198 /* Once we reached the data marker change state */
199 if (buffer
[i
] == '['){
202 memset(abuffer
, 0, 255);
205 case 7: /* PARSES: \d] */
206 if (buffer
[i
] == ']'){
208 printf("IPC: arg len: %i\n", l
);
211 memset(abuffer
, 0, 255);
213 /* Copy character to our temporary buffer */
214 abuffer
[j
] = buffer
[i
];
218 case 8: /* Reads l chars */
219 /* Copy character to our temporary buffer */
220 abuffer
[j
] = buffer
[i
];
224 printf("IPC: param: %s\n", abuffer
);
225 msg
->PushParam(abuffer
);
243 printf("IPC Message Pushed\n");
250 /* Not in use, old parsing function.
251 * It is too strict and has some problems */
252 int SocketIPC::FetchMessage2(){
253 char buffer
[255], xbuff
[255], tbuff
[20], *msgdata
;
255 int i
, t
, r
, mlen
, margc
;
258 //printf("IPC: Parsing Message\n");
270 //printf("\tReading Buffer\n");
272 memset(buffer
, 0, 255);
273 r
= auxsock
->Read(buffer
, 255);
274 //printf("\tBuffer Contents: %s\n", buffer);
281 printf("IPC: buffer contents: %s\n", buffer
);
283 for (i
= 0; i
< r
; i
++){
286 if (buffer
[i
] == 'M')
290 if (buffer
[i
] == 'S')
294 if (buffer
[i
] == 'G')
298 if (buffer
[i
] == '['){
300 memset(tbuff
, 0, 20);
305 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
306 tbuff
[t
] = buffer
[i
];
316 if (buffer
[i
] == '['){
318 memset(tbuff
, 0, 20);
323 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
324 tbuff
[t
] = buffer
[i
];
334 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
341 msgdata
[t
] = buffer
[i
];
350 printf("Msg Header: %s\n", msgdata
);
351 msg
= new IPCMessage(msgdata
);
352 if (msgdata
!= xbuff
)
361 if (buffer
[i
] == 'P')
365 if (buffer
[i
] == 'A')
369 if (buffer
[i
] == 'R')
373 if (buffer
[i
] == 'A')
377 if (buffer
[i
] == 'M')
381 if (buffer
[i
] == '['){
383 memset(tbuff
, 0, 20);
388 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
389 tbuff
[t
] = buffer
[i
];
402 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
410 msgdata
[t
] = buffer
[i
];
420 msg
->PushParam(msgdata
);
421 if (msgdata
!= xbuff
)
423 if (msg
->ParamCount() == margc
){
443 printf("IPC Message Pushed\n");
448 int SocketIPC::PeekMessage(){
453 if (msgQueue
.size() > 0)
459 int SocketIPC::SendMessage(IPCMessage
*msg
){
460 return PushMessage(msg
);
463 IPCMessage
*SocketIPC::ReciveMessage(){
466 //printf("IPC: Recive Message\n");
467 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
469 if (msgQueue
.size() > 0){
470 //printf("\tMSG Already on Queue\n");
471 msg
= msgQueue
.front();
476 //printf("\tPolling for Incoming Message\n");
478 /* TODO: Add a timeout or max try limit */
479 while(msgQueue
.size() < 1){
483 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
485 msg
= msgQueue
.front();
488 //printf("IPC: Message Retrieved\n");
492 char *makemsg(IPCMessage
*msg
, int *r
){
494 int len
, i
, j
, offset
;
498 len
+= msg
->GetPayload();
499 len
+= msg
->ParamCount() * 15;
501 ret
= (char*)malloc(sizeof(char) * len
);
502 sprintf(ret
, "MSG[%i][%i]%s", strlen(msg
->GetMessageName()), msg
->ParamCount(), msg
->GetMessageName());
504 offset
= strlen(ret
);
505 j
= msg
->ParamCount();
507 for (i
= 0; i
< j
; i
++){
508 tmp
= msg
->PopParam();
514 }else if(len
< 1000){
519 sprintf((char*)((int)ret
+offset
), "PARAM[%i]%s", strlen(tmp
), tmp
);
528 /* TODO: Better memory management for IPCMessage */
529 int SocketIPC::PushMessage(IPCMessage
*msg
){
530 char *tmp
, buffer
[50];
533 tmp
= makemsg(msg
, &l
);
534 //printf("IPCMSG: %s\n", tmp);
543 printf("IPC: Pushing Message\n");
545 tmp
= msg
->GetMessageName();
548 sprintf(buffer
, "MSG[%i][%i]", l
, msg
->ParamCount());
549 blen
= strlen(buffer
);
551 printf("\tBegin Header: %s%s\n", buffer
, tmp
);
553 sock
->Poll(-1, SOCKET_POLL_WRITE
);
554 r
= sock
->Write(buffer
, blen
);
557 printf("\tError Writting Header\n");
560 sock
->Poll(-1, SOCKET_POLL_WRITE
);
561 r
= sock
->Write(tmp
, l
);
563 printf("\tError Writting Header\n");
566 while((tmp
= msg
->PopParam())){
569 sprintf(buffer
, "PARAM[%i]", l
);
570 blen
= strlen(buffer
);
572 printf("\tBegin param: %s%s\n", buffer
, tmp
);
574 sock
->Poll(-1, SOCKET_POLL_WRITE
);
575 r
= sock
->Write(buffer
, blen
);
577 printf("\tError Writting Header\n");
579 sock
->Poll(-1, SOCKET_POLL_WRITE
);
580 r
= sock
->Write(tmp
, l
);
582 printf("\tError Writting Header\n");
588 printf("IPC Message Send OK!\n");
593 IPCMessage
*SocketIPC::PopMessage(){
596 msg
= msgQueue
.front();
602 int SocketIPC::RawRead(char *buff
, int size
){
603 return sock
->Read(buff
, size
);
606 int SocketIPC::RawWrite(char *buff
, int size
){
607 return sock
->Write(buff
, size
);