Applied WANG Cong Patch, client src socketipc.
[fmail.git] / src / socketipc.cpp
blob55e0b97b6fb5e595c5e6e9c9db28aabc6e02a6c7
1 /*
2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <stdio.h>
22 #include <string.h>
23 #include <malloc.h>
25 #include <pcrecpp.h>
27 #include <queue>
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket *s){
35 sock = s;
38 /* Create a socket IPC from uri, create
39 * sockets and parse host and port. */
40 SocketIPC::SocketIPC(char *uri){
41 string shost;
42 string options;
43 pcrecpp::RE re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
45 re.PartialMatch(uri, &shost, &port, &options);
46 host = (char*)shost.c_str();
48 sock = Socket::CreateSocket(SOCKET_INET, 0);
49 sock->setPort(port);
50 sock->setAddress(host);
51 auxsock = NULL;
54 /* Connect to remote process for IPC */
55 int SocketIPC::RequestIPC(){
56 int ret;
57 int i;
59 for (i = 0; i < 20; i++){
60 ret = sock->Connect();
61 if (ret == 0)
62 break;
65 return ret;
68 /* Wait for incoming IPC */
69 int SocketIPC::ListenIPC(){
70 sock->Bind();
71 sock->Listen(10);
73 auxsock = sock->Accept();
74 return 0;
77 /* Close IPC Session */
78 int SocketIPC::CloseIPC(){
79 sock->Close();
80 if (auxsock)
81 auxsock->Close();
82 return 0;
85 /* Lousy Parser, just works
86 * TODO: May not work with fragmented buffers.
87 * TODO: Write something more robust.
88 * */
89 int SocketIPC::ParseLoose(){
90 char buffer[255], abuffer[255];
91 int ret, state, onParse;
92 int i, j, r, l, argc, argn;
93 IPCMessage *msg = NULL;
95 //printf("Loose Parsing\n");
97 /* Initialize our state variables */
98 state = 0;
99 onParse = 1;
101 while(onParse){
102 ret = sock->Poll(-1, SOCKET_POLL_READ);
104 if (ret & SOCKET_POLL_READ){
105 /* Clear our read buffer */
106 memset(buffer, 0, 255);
108 /* Read incoming data */
109 r = sock->Read(buffer, 255);
110 }else if(ret & SOCKET_POLL_ERROR){
111 printf("Socket Error");
112 return 0;
113 }else{
114 //printf("Timeout\n");
115 r = 0;
116 return 0;
119 /* Check if we have more data */
120 if (r < 1){
121 onParse = 0;
122 }//else{
123 //printf("IPC: Buffer(%i): %s\n", state, buffer);
126 /* Consume the buffer */
127 for (i = 0; i < r; i++){
128 switch(state){
129 case 0: /* PARSES: MSG[ */
130 /* Once we reached the data marker change state */
131 if (buffer[i] == '['){
132 state = 1;
133 j = 0;
134 memset(abuffer, 0, 255);
136 break;
137 case 1: /* PARSES: \d] */
138 if (buffer[i] == ']'){
139 l = atoi(abuffer);
140 //printf("IPC: MSG Header Len %i\n", l);
141 state = 2;
142 }else{
143 /* Copy character to our temporary buffer */
144 abuffer[j] = buffer[i];
145 j++;
147 case 2: /* PARSES: [ */
148 /* Once we reached the data marker change state */
149 if (buffer[i] == '['){
150 state = 3;
151 j = 0;
152 memset(abuffer, 0, 255);
154 break;
155 case 3: /* PARSES: \d] */
156 if (buffer[i] == ']'){
157 argc = atoi(abuffer);
159 //printf("IPC: MSG Argc %i\n", argc);
161 j = 0;
162 state = 4;
163 memset(abuffer, 0, 255);
164 }else{
165 /* Copy character to our temporary buffer */
166 abuffer[j] = buffer[i];
167 j++;
169 break;
170 case 4: /* Reads l chars */
171 /* Copy character to our temporary buffer */
172 abuffer[j] = buffer[i];
173 j++;
175 if (j == l){
176 //printf("IPC: msg header: %s\n", abuffer);
177 msg = new IPCMessage(abuffer);
178 state = 5;
179 argn = 0;
181 break;
182 case 5: /* Check Param Count */
183 //printf("IPC: argc(%i) argn(%i)\n", argc, argn);
184 if (argn < argc){
185 state = 6;
186 }else{
187 state = -1;
188 onParse = 0;
190 break;
191 case 6: /* PARSES: PARAM[ */
192 /* Once we reached the data marker change state */
193 if (buffer[i] == '['){
194 state = 7;
195 j = 0;
196 memset(abuffer, 0, 255);
198 break;
199 case 7: /* PARSES: \d] */
200 if (buffer[i] == ']'){
201 l = atoi(abuffer);
202 //printf("IPC: arg len: %i\n", l);
203 state = 8;
204 j = 0;
205 memset(abuffer, 0, 255);
206 }else{
207 /* Copy character to our temporary buffer */
208 abuffer[j] = buffer[i];
209 j++;
211 break;
212 case 8: /* Reads l chars */
213 /* Copy character to our temporary buffer */
214 abuffer[j] = buffer[i];
215 j++;
217 if (j == l){
218 //printf("IPC: param: %s\n", abuffer);
219 msg->PushParam(abuffer);
220 argn++;
221 if (argn == argc){
222 onParse = 0;
223 }else{
224 state = 6;
227 break;
228 default:
229 onParse = 0;
230 break;
236 if (msg != NULL){
237 //printf("IPC Message Pushed\n");
238 msgQueue.push(msg);
239 return 1;
241 return 0;
244 /* Not in use, old parsing function.
245 * It is too strict and has some problems */
246 int SocketIPC::FetchMessage2(){
247 char buffer[255], xbuff[255], tbuff[20], *msgdata;
248 int state, onParse;
249 int i, t, r, mlen, margc;
250 IPCMessage *msg;
252 //printf("IPC: Parsing Message\n");
253 state = 0;
254 onParse = 1;
256 mlen = 0;
257 margc = 0;
258 msg = NULL;
260 if (auxsock == NULL)
261 auxsock = sock;
263 while (onParse){
264 //printf("\tReading Buffer\n");
266 memset(buffer, 0, 255);
267 r = auxsock->Read(buffer, 255);
268 //printf("\tBuffer Contents: %s\n", buffer);
270 if (r < 1){
271 return 0;
272 state = -1;
273 onParse = 0;
275 //printf("IPC: buffer contents: %s\n", buffer);
277 for (i = 0; i < r; i++){
278 switch (state){
279 case 0:
280 if (buffer[i] == 'M')
281 state++;
282 break;
283 case 1:
284 if (buffer[i] == 'S')
285 state++;
286 break;
287 case 2:
288 if (buffer[i] == 'G')
289 state++;
290 break;
291 case 3:
292 if (buffer[i] == '['){
293 state++;
294 memset(tbuff, 0, 20);
295 t = 0;
297 break;
298 case 4:
299 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
300 tbuff[t] = buffer[i];
301 t++;
302 if (t == 20)
303 state = -1;
304 }else{
305 mlen = atoi(tbuff);
306 state++;
308 break;
309 case 5:
310 if (buffer[i] == '['){
311 state++;
312 memset(tbuff, 0, 20);
313 t = 0;
315 break;
316 case 6:
317 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
318 tbuff[t] = buffer[i];
319 t++;
320 if (t == 20)
321 state = -1;
322 }else{
323 margc = atoi(tbuff);
324 state++;
325 if (mlen < 255){
326 msgdata = xbuff;
327 }else{
328 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
330 t = 0;
332 break;
333 case 7:
334 if (t < mlen){
335 msgdata[t] = buffer[i];
336 t++;
337 if (t == mlen){
338 msgdata[t] = 0;
339 state++;
342 break;
343 case 8:
344 printf("Msg Header: %s\n", msgdata);
345 msg = new IPCMessage(msgdata);
346 if (msgdata != xbuff)
347 free(msgdata);
348 state++;
350 if (margc == 0)
351 state = 20;
352 i--;
353 break;
354 case 9:
355 if (buffer[i] == 'P')
356 state++;
357 break;
358 case 10:
359 if (buffer[i] == 'A')
360 state++;
361 break;
362 case 11:
363 if (buffer[i] == 'R')
364 state++;
365 break;
366 case 12:
367 if (buffer[i] == 'A')
368 state++;
369 break;
370 case 13:
371 if (buffer[i] == 'M')
372 state++;
373 break;
374 case 14:
375 if (buffer[i] == '['){
376 state++;
377 memset(tbuff, 0, 20);
378 t = 0;
380 break;
381 case 15:
382 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
383 tbuff[t] = buffer[i];
384 t++;
385 if (t == 20)
386 state = -1;
387 }else{
388 mlen = atoi(tbuff);
389 state++;
391 break;
392 case 16:
393 if (mlen < 255){
394 msgdata = xbuff;
395 }else{
396 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
398 t = 0;
399 state++;
400 i--;
401 break;
402 case 17:
403 if (t < mlen){
404 msgdata[t] = buffer[i];
405 t++;
406 if (t == mlen){
407 msgdata[t] = 0;
408 state++;
409 i--;
412 break;
413 case 18:
414 msg->PushParam(msgdata);
415 if (msgdata != xbuff)
416 free(msgdata);
417 if (msg->ParamCount() == margc){
418 state++;
419 }else{
420 state = 9;
422 onParse = 0;
423 break;
424 default:
425 onParse = 0;
426 break;
431 if (state == -1){
432 if (msg)
433 delete msg;
434 return 0;
437 printf("IPC Message Pushed\n");
438 msgQueue.push(msg);
439 return 1;
442 int SocketIPC::PeekMessage(){
443 if(ParseLoose())
444 return 1;
446 if (msgQueue.size() > 0)
447 return 1;
449 return 0;
452 int SocketIPC::SendMessage(IPCMessage *msg){
453 return PushMessage(msg);
456 IPCMessage *SocketIPC::ReciveMessage(){
457 IPCMessage *msg;
459 //printf("IPC: Recive Message\n");
460 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
462 if (msgQueue.size() > 0){
463 //printf("\tMSG Already on Queue\n");
464 msg = msgQueue.front();
465 msgQueue.pop();
466 return msg;
469 //printf("\tPolling for Incoming Message\n");
471 /* TODO: Add a timeout or max try limit */
472 while(msgQueue.size() < 1){
473 ParseLoose();
476 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
478 msg = msgQueue.front();
479 msgQueue.pop();
481 //printf("IPC: Message Retrieved\n");
482 return msg;
485 char *makemsg(IPCMessage *msg, int *r){
486 char *ret, *tmp;
487 int len, i, j;
488 size_t offset;
490 len = 40;
492 len += msg->GetPayload();
493 len += msg->ParamCount() * 15;
495 ret = (char*)malloc(sizeof(char) * len);
496 if (!ret)
497 return 0;
498 sprintf(ret, "MSG[%i][%i]%s", strlen(msg->GetMessageName()), msg->ParamCount(), msg->GetMessageName());
500 offset = strlen(ret);
501 j = msg->ParamCount();
503 for (i = 0; i < j; i++){
504 tmp = msg->PopParam();
505 len = strlen(tmp);
506 if (len < 10){
507 len++;
508 }else if(len < 100){
509 len += 2;
510 }else if(len < 1000){
511 len += 3;
513 len += 7;
515 sprintf((char*)((int)ret+offset), "PARAM[%i]%s", strlen(tmp), tmp);
516 offset += len;
517 free(tmp);
520 *r = offset;
521 return ret;
524 /* TODO: Better memory management for IPCMessage */
525 int SocketIPC::PushMessage(IPCMessage *msg){
526 char *tmp, buffer[50];
527 int l, blen, r;
529 tmp = makemsg(msg, &l);
530 //printf("IPCMSG: %s\n", tmp);
532 sock->Write(tmp, l);
533 free(tmp);
535 delete msg;
536 return 0;
538 //printf("IPC: Pushing Message\n");
540 tmp = msg->GetMessageName();
541 l = strlen(tmp);
543 sprintf(buffer, "MSG[%i][%i]", l, msg->ParamCount());
544 blen = strlen(buffer);
546 //printf("\tBegin Header: %s%s\n", buffer, tmp);
548 sock->Poll(-1, SOCKET_POLL_WRITE);
549 r = sock->Write(buffer, blen);
551 if (r < blen){
552 printf("\tError Writting Header\n");
555 sock->Poll(-1, SOCKET_POLL_WRITE);
556 r = sock->Write(tmp, l);
557 if (r < l){
558 printf("\tError Writting Header\n");
561 while((tmp = msg->PopParam())){
562 l = strlen(tmp);
564 sprintf(buffer, "PARAM[%i]", l);
565 blen = strlen(buffer);
567 //printf("\tBegin param: %s%s\n", buffer, tmp);
569 sock->Poll(-1, SOCKET_POLL_WRITE);
570 r = sock->Write(buffer, blen);
571 if (r < blen){
572 printf("\tError Writting Header\n");
574 sock->Poll(-1, SOCKET_POLL_WRITE);
575 r = sock->Write(tmp, l);
576 if (r < l){
577 printf("\tError Writting Header\n");
580 free(tmp);
583 //printf("IPC Message Send OK!\n");
585 return 1;
588 IPCMessage *SocketIPC::PopMessage(){
589 IPCMessage *msg;
591 msg = msgQueue.front();
592 msgQueue.pop();
594 return msg;
597 int SocketIPC::RawRead(char *buff, int size){
598 return sock->Read(buff, size);
601 int SocketIPC::RawWrite(char *buff, int size){
602 return sock->Write(buff, size);