2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket
*s
){
38 SocketIPC::SocketIPC(char *uri
){
42 pcrecpp::RE
re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
44 re
.PartialMatch(uri
, &shost
, &port
, &options
);
45 host
= (char*)shost
.c_str();
47 printf("Started socket on: %s\n", uri
);
48 printf("Host: %s\nPort: %i\n", host
, port
);
49 sock
= Socket::CreateSocket(SOCKET_INET
, 0);
51 sock
->setAddress(host
);
55 int SocketIPC::RequestIPC(){
59 for (i
= 0; i
< 20; i
++){
60 ret
= sock
->Connect();
66 printf("Unable to connect to IPC\n");
71 int SocketIPC::ListenIPC(){
75 auxsock
= sock
->Accept();
79 int SocketIPC::CloseIPC(){
85 /* TODO: Change to tree/graph based state machine */
86 int SocketIPC::FetchMessage2(){
87 char buffer
[255], xbuff
[255], tbuff
[20], *msgdata
;
89 int i
, t
, r
, mlen
, margc
;
92 //printf("IPC: Parsing Message\n");
104 //printf("\tReading Buffer\n");
105 memset(buffer
, 0, 255);
106 r
= auxsock
->Read(buffer
, 255);
107 //printf("\tBuffer Contents: %s\n", buffer);
115 for (i
= 0; i
< r
; i
++){
118 if (buffer
[i
] == 'M')
122 if (buffer
[i
] == 'S')
126 if (buffer
[i
] == 'G')
130 if (buffer
[i
] == '['){
132 memset(tbuff
, 0, 20);
137 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
138 tbuff
[t
] = buffer
[i
];
148 if (buffer
[i
] == '['){
150 memset(tbuff
, 0, 20);
155 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
156 tbuff
[t
] = buffer
[i
];
166 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
173 msgdata
[t
] = buffer
[i
];
182 printf("Msg Header: %s\n", msgdata
);
183 msg
= new IPCMessage(msgdata
);
184 if (msgdata
!= xbuff
)
193 if (buffer
[i
] == 'P')
197 if (buffer
[i
] == 'A')
201 if (buffer
[i
] == 'R')
205 if (buffer
[i
] == 'A')
209 if (buffer
[i
] == 'M')
213 if (buffer
[i
] == '['){
215 memset(tbuff
, 0, 20);
220 if ((buffer
[i
] >= 48) && (buffer
[i
] <= 57)){
221 tbuff
[t
] = buffer
[i
];
234 msgdata
= (char*)malloc(sizeof(char) * mlen
+ 1);
242 msgdata
[t
] = buffer
[i
];
252 msg
->PushParam(msgdata
);
253 if (msgdata
!= xbuff
)
255 if (msg
->ParamCount() == margc
){
275 printf("IPC Message Pushed\n");
282 char buffer[255], *param;
286 memset(buffer, 0, 255);
287 r = auxsock->Read(buffer, 255);
292 printf("buffer: %s\n", buffer);
293 m = odk_regex_match(re, buffer, 0);
297 printf("Regex Matched!!\n");
298 if (buffer[0] != 'M')
301 param = odk_submatch_copy(buffer, m, 1);
305 param = odk_submatch_copy(buffer, m, 2);
309 memset(buffer, 0, 255);
310 r = auxsock->Read(buffer, 255);
315 msg = new IPCMessage(buffer);
316 for (i = 0; i < j; i++){
317 auxsock->Read(buffer, 255);
318 m = odk_regex_match(re, buffer, 0);
320 if ((m == NULL) || (buffer[0] != 'P'))
322 param = odk_submatch_copy(buffer, m, 1);
326 if ((k < 1) || (k > 32000))
329 param = (char*)malloc(sizeof(char) * k+1);
331 memset(param, 0, k+1);
335 r = auxsock->Read(buffer, 255);
336 memcpy((char*)((int)param+s), buffer, r);
339 msg->PushParam(param);
347 int SocketIPC::PeekMessage(){
352 if (msgQueue
.size() > 0)
358 int SocketIPC::SendMessage(IPCMessage
*msg
){
359 return PushMessage(msg
);
362 IPCMessage
*SocketIPC::ReciveMessage(){
366 printf("IPC: Recive Message\n");
367 printf("\tmsgQueue Size: %i\n", msgQueue
.size());
369 if (msgQueue
.size() > 0){
370 printf("\tMSG Already on Queue\n");
371 msg
= msgQueue
.front();
376 printf("\tPolling for Incoming Message\n");
378 while(msgQueue
.size() < 1){
379 //ret = sock->Poll(1000000, SOCKET_POLL_READ);
381 if (ret
== SOCKET_POLL_READ
){
382 printf("\tGot Data, Parsing\n");
388 printf("\tmsgQueue Size: %i\n", msgQueue
.size());
390 msg
= msgQueue
.front();
393 printf("IPC: Message Retrieved\n");
397 /* TODO: Better memory management for IPCMessage */
398 int SocketIPC::PushMessage(IPCMessage
*msg
){
399 char *tmp
, buffer
[50];
402 printf("IPC: Pushing Message\n");
404 tmp
= msg
->GetMessageName();
407 sprintf(buffer
, "MSG[%i][%i]", l
, msg
->ParamCount());
408 blen
= strlen(buffer
);
410 printf("\tBegin Header: %s%s\n", buffer
, tmp
);
412 sock
->Write(buffer
, blen
);
415 while(tmp
= msg
->PopParam()){
418 sprintf(buffer
, "PARAM[%i]", l
);
419 blen
= strlen(buffer
);
421 printf("\tBegin param: %s%s\n", buffer
, tmp
);
423 sock
->Write(buffer
, blen
);
429 printf("IPC Message Send OK!\n");
434 IPCMessage
*SocketIPC::PopMessage(){
437 msg
= msgQueue
.front();
443 int SocketIPC::RawRead(char *buff
, int size
){
444 return sock
->Read(buff
, size
);
447 int SocketIPC::RawWrite(char *buff
, int size
){
448 return sock
->Write(buff
, size
);