2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
35 /* SocketIPC State Machine */
51 buffer
= obuffer
= NULL
;
56 void setBuffer(char * buf
, int len
){
62 void setOBuffer(char * buf
, int len
){
74 if ((bpos
> blen
) || (buffer
== NULL
))
78 case 0:/* PARSES: MSG[ */
79 /* Once we reached the data marker change state */
80 if (buffer
[bpos
] == '['){
85 case 1:/* PARSES: \d] */
86 if (buffer
[bpos
] == ']'){
90 /* Copy character to our temporary buffer */
91 obuffer
[opos
] = buffer
[bpos
];
94 case 2:/* PARSES: [ */
95 /* Once we reached the data marker change state */
96 if (buffer
[bpos
] == '['){
101 case 3:/* PARSES: \d] */
102 if (buffer
[bpos
] == ']'){
106 /* Copy character to our temporary buffer */
107 obuffer
[opos
] = buffer
[bpos
];
111 case 4:/* Reads l chars */
112 /* Copy character to our temporary buffer */
113 obuffer
[opos
] = buffer
[bpos
];
121 case 5:/* Check Param Count */
129 case 6:/* PARSES: PARAM[ */
130 /* Once we reached the data marker change state */
131 if (buffer
[bpos
] == '['){
135 case 7:/* PARSES: \d] */
136 if (buffer
[bpos
] == ']'){
140 /* Copy character to our temporary buffer */
141 obuffer
[opos
] = buffer
[bpos
];
145 case 8:/* Reads l chars */
146 /* Copy character to our temporary buffer */
147 obuffer
[opos
] = buffer
[bpos
];
169 SocketIPC::SocketIPC(Socket
* s
){
173 /* Create a socket IPC from uri, create
174 * sockets and parse host and port. */
175 SocketIPC::SocketIPC(char * uri
){
178 pcrecpp::RE
re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
180 re
.PartialMatch(uri
, &shost
, &port
, &options
);
181 host
= shost
.c_str();
183 sock
= Socket::CreateSocket(SOCKET_INET
, 0);
185 sock
->setAddress(host
);
189 /* Connect to remote process for IPC */
190 int SocketIPC::RequestIPC(){
194 for (i
= 0; i
< 20; i
++){
195 ret
= sock
->Connect();
203 /* Wait for incoming IPC */
204 int SocketIPC::ListenIPC(){
208 auxsock
= sock
->Accept();
212 /* Close IPC Session */
213 int SocketIPC::CloseIPC(){
220 /* Lousy Parser, just works
221 * TODO: May not work with fragmented buffers.
222 * TODO: Write something more robust.
224 int SocketIPC::ParseLoose(){
225 char buffer
[255], abuffer
[255];
226 int ret
, state
, onParse
;
227 int l
, r
, argc
, argn
;
228 IPCMessage
* msg
= NULL
;
231 //printf("Loose Parsing\n");
233 /* Initialize our state variables */
244 ret
= sock
->Poll(100000, SOCKET_POLL_READ
| SOCKET_POLL_WRITE
| SOCKET_POLL_ERROR
);
246 if (ret
& SOCKET_POLL_READ
){
247 memset(buffer
, 0, 255);
248 r
= sock
->Read(buffer
, 255);
251 parser
.setBuffer(buffer
, r
);
258 memset(abuffer
, 0, 255);
259 parser
.setOBuffer(abuffer
, 255);
263 //printf("S2: %i\n", l);
264 memset(abuffer
, 0, 255);
265 parser
.setOBuffer(abuffer
, 255);
268 argc
= atoi(abuffer
);
269 //printf("S4: %i\n", argc);
270 memset(abuffer
, 0, 255);
271 parser
.setOBuffer(abuffer
, l
);
274 msg
= new IPCMessage(abuffer
);
275 //printf("New IPC Message: %s\n", abuffer);
277 memset(abuffer
, 0, 255);
278 parser
.setOBuffer(abuffer
, 255);
281 parser
.setOBuffer(NULL
, 0);
286 memset(abuffer
, 0, 255);
287 parser
.setOBuffer(abuffer
, l
);
290 msg
->PushParam(abuffer
);
291 //printf("\tParam: %s\n", abuffer);
294 memset(abuffer
, 0, 255);
295 parser
.setOBuffer(abuffer
, 255);
298 parser
.setOBuffer(NULL
, 0);
308 state
= parser
.Parse();
318 int SocketIPC::PeekMessage(){
322 if (msgQueue
.size() > 0)
328 int SocketIPC::SendMessage(IPCMessage
* msg
){
329 return PushMessage(msg
);
332 IPCMessage
* SocketIPC::ReciveMessage(){
335 //printf("IPC: Recive Message\n");
336 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
338 if (msgQueue
.size() > 0){
339 //printf("\tMSG Already on Queue\n");
340 msg
= msgQueue
.front();
345 //printf("\tPolling for Incoming Message\n");
347 /* TODO: Add a timeout or max try limit */
348 while(msgQueue
.size() < 1){
352 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
354 msg
= msgQueue
.front();
357 //printf("IPC: Message Retrieved\n");
361 char *makemsg(IPCMessage
* msg
, int * r
){
368 len
+= msg
->GetPayload();
369 len
+= msg
->ParamCount() * 15;
371 ret
= (char *)malloc(sizeof(char) * len
);
374 sprintf(ret
, "MSG[%i][%i]%s", strlen(msg
->GetMessageName()), msg
->ParamCount(), msg
->GetMessageName());
376 offset
= strlen(ret
);
377 j
= msg
->ParamCount();
379 for (i
= 0; i
< j
; i
++){
380 tmp
= msg
->PopParam();
384 } else if(len
< 100){
386 } else if(len
< 1000){
391 sprintf((char *)((int)ret
+offset
), "PARAM[%i]%s", strlen(tmp
), tmp
);
400 /* TODO: Better memory management for IPCMessage */
401 int SocketIPC::PushMessage(IPCMessage
* msg
){
405 tmp
= makemsg(msg
, &l
);
406 //printf("IPCMSG: %s\n", tmp);
414 IPCMessage
* SocketIPC::PopMessage(){
417 msg
= msgQueue
.front();
423 int SocketIPC::RawRead(char * buff
, int size
){
424 return sock
->Read(buff
, size
);
427 int SocketIPC::RawWrite(char * buff
, int size
){
428 return sock
->Write(buff
, size
);