Changed smtp for new worker classes
[fmail.git] / src / socketipc.cpp
blob8a5087e7cc8e3f6b9208bd7fd602c075b11bb5b9
1 /*
2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <stdio.h>
22 #include <string.h>
23 #include <malloc.h>
25 #include <pcrecpp.h>
27 #include <queue>
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
35 /* SocketIPC State Machine */
36 class SISM {
37 public:
38 int state;
39 int opts;
41 char * buffer;
42 int blen;
43 int bpos;
45 char * obuffer;
46 int olen;
47 int opos;
49 SISM(){
50 state = 0;
51 buffer = obuffer = NULL;
52 bpos = blen = 0;
53 olen = opos = 0;
56 void setBuffer(char * buf, int len){
57 buffer = buf;
58 blen = len;
59 bpos = 0;
62 void setOBuffer(char * buf, int len){
63 obuffer = buf;
64 olen = len;
65 opos = 0;
68 int Parse(){
69 int onrun;
71 onrun = 1;
73 while(onrun){
74 if ((bpos > blen) || (buffer == NULL))
75 return -1;
77 switch(state){
78 case 0:/* PARSES: MSG[ */
79 /* Once we reached the data marker change state */
80 if (buffer[bpos] == '['){
81 state = 1;
82 onrun = 0;
84 break;
85 case 1:/* PARSES: \d] */
86 if (buffer[bpos] == ']'){
87 state = 2;
88 onrun = 0;
89 } else{
90 /* Copy character to our temporary buffer */
91 obuffer[opos] = buffer[bpos];
92 opos++;
94 case 2:/* PARSES: [ */
95 /* Once we reached the data marker change state */
96 if (buffer[bpos] == '['){
97 state = 3;
98 onrun = 0;
100 break;
101 case 3:/* PARSES: \d] */
102 if (buffer[bpos] == ']'){
103 state = 4;
104 onrun = 0;
105 } else{
106 /* Copy character to our temporary buffer */
107 obuffer[opos] = buffer[bpos];
108 opos++;
110 break;
111 case 4:/* Reads l chars */
112 /* Copy character to our temporary buffer */
113 obuffer[opos] = buffer[bpos];
114 opos++;
116 if (opos == olen){
117 state = 5;
118 onrun = 0;
120 break;
121 case 5:/* Check Param Count */
122 if (olen == 0){
123 state = 10;
124 } else{
125 state = 6;
127 onrun = 0;
128 break;
129 case 6:/* PARSES: PARAM[ */
130 /* Once we reached the data marker change state */
131 if (buffer[bpos] == '['){
132 state = 7;
134 break;
135 case 7:/* PARSES: \d] */
136 if (buffer[bpos] == ']'){
137 state = 8;
138 onrun = 0;
139 } else{
140 /* Copy character to our temporary buffer */
141 obuffer[opos] = buffer[bpos];
142 opos++;
144 break;
145 case 8:/* Reads l chars */
146 /* Copy character to our temporary buffer */
147 obuffer[opos] = buffer[bpos];
148 opos++;
150 if (opos == olen){
151 state = 9;
152 onrun = 0;
154 break;
155 default:
156 onrun = 0;
157 break;
160 bpos++;
161 if (opos > olen)
162 return -2;
165 return state;
169 SocketIPC::SocketIPC(Socket * s){
170 sock = s;
173 /* Create a socket IPC from uri, create
174 * sockets and parse host and port. */
175 SocketIPC::SocketIPC(char * uri){
176 string shost;
177 string options;
178 pcrecpp::RE re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
180 re.PartialMatch(uri, &shost, &port, &options);
181 host = shost.c_str();
183 sock = Socket::CreateSocket(SOCKET_INET, 0);
184 sock->setPort(port);
185 sock->setAddress(host);
186 auxsock = NULL;
189 /* Connect to remote process for IPC */
190 int SocketIPC::RequestIPC(){
191 int ret;
192 int i;
194 for (i = 0; i < 20; i++){
195 ret = sock->Connect();
196 if (ret == 0)
197 break;
200 return ret;
203 /* Wait for incoming IPC */
204 int SocketIPC::ListenIPC(){
205 sock->Bind();
206 sock->Listen(10);
208 auxsock = sock->Accept();
209 return 0;
212 /* Close IPC Session */
213 int SocketIPC::CloseIPC(){
214 sock->Close();
215 if (auxsock)
216 auxsock->Close();
217 return 0;
220 /* Lousy Parser, just works
221 * TODO: May not work with fragmented buffers.
222 * TODO: Write something more robust.
223 * */
224 int SocketIPC::ParseLoose(){
225 char buffer[255], abuffer[255];
226 int ret, state, onParse;
227 int l, r, argc, argn;
228 IPCMessage * msg = NULL;
229 SISM parser;
231 //printf("Loose Parsing\n");
233 /* Initialize our state variables */
234 state = 0;
235 onParse = 1;
236 argc = 0;
237 argn = 0;
238 l = 0;
240 while(onParse){
241 switch (state){
242 case -1:
243 case 0:
244 ret = sock->Poll(100000, SOCKET_POLL_READ | SOCKET_POLL_WRITE | SOCKET_POLL_ERROR);
246 if (ret & SOCKET_POLL_READ){
247 memset(buffer, 0, 255);
248 r = sock->Read(buffer, 255);
250 if (r > 0)
251 parser.setBuffer(buffer, r);
252 } else{
253 return 0;
255 break;
256 case 1:
257 //printf("S1\n");
258 memset(abuffer, 0, 255);
259 parser.setOBuffer(abuffer, 255);
260 break;
261 case 2:
262 l = atoi(abuffer);
263 //printf("S2: %i\n", l);
264 memset(abuffer, 0, 255);
265 parser.setOBuffer(abuffer, 255);
266 break;
267 case 4:
268 argc = atoi(abuffer);
269 //printf("S4: %i\n", argc);
270 memset(abuffer, 0, 255);
271 parser.setOBuffer(abuffer, l);
272 break;
273 case 5:
274 msg = new IPCMessage(abuffer);
275 //printf("New IPC Message: %s\n", abuffer);
276 if (argc){
277 memset(abuffer, 0, 255);
278 parser.setOBuffer(abuffer, 255);
279 argn++;
280 } else{
281 parser.setOBuffer(NULL, 0);
283 break;
284 case 8:
285 l = atoi(abuffer);
286 memset(abuffer, 0, 255);
287 parser.setOBuffer(abuffer, l);
288 break;
289 case 9:
290 msg->PushParam(abuffer);
291 //printf("\tParam: %s\n", abuffer);
292 parser.state = 5;
293 if (argn < argc){
294 memset(abuffer, 0, 255);
295 parser.setOBuffer(abuffer, 255);
296 argn++;
297 } else{
298 parser.setOBuffer(NULL, 0);
300 break;
301 case 10:
302 onParse = 0;
303 parser.state = -1;
304 break;
305 default:
306 break;
308 state = parser.Parse();
311 if (msg != NULL){
312 msgQueue.push(msg);
313 return 1;
315 return 0;
318 int SocketIPC::PeekMessage(){
319 if(ParseLoose())
320 return 1;
322 if (msgQueue.size() > 0)
323 return 1;
325 return 0;
328 int SocketIPC::SendMessage(IPCMessage * msg){
329 return PushMessage(msg);
332 IPCMessage * SocketIPC::ReciveMessage(){
333 IPCMessage * msg;
335 //printf("IPC: Recive Message\n");
336 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
338 if (msgQueue.size() > 0){
339 //printf("\tMSG Already on Queue\n");
340 msg = msgQueue.front();
341 msgQueue.pop();
342 return msg;
345 //printf("\tPolling for Incoming Message\n");
347 /* TODO: Add a timeout or max try limit */
348 while(msgQueue.size() < 1){
349 ParseLoose();
352 //printf("\tmsgQueue Size: %i\n", msgQueue.size());
354 msg = msgQueue.front();
355 msgQueue.pop();
357 //printf("IPC: Message Retrieved\n");
358 return msg;
361 char *makemsg(IPCMessage * msg, int * r){
362 char * ret, * tmp;
363 int len, i, j;
364 size_t offset;
366 len = 40;
368 len += msg->GetPayload();
369 len += msg->ParamCount() * 15;
371 ret = (char *)malloc(sizeof(char) * len);
372 if (!ret)
373 return 0;
374 sprintf(ret, "MSG[%i][%i]%s", strlen(msg->GetMessageName()), msg->ParamCount(), msg->GetMessageName());
376 offset = strlen(ret);
377 j = msg->ParamCount();
379 for (i = 0; i < j; i++){
380 tmp = msg->PopParam();
381 len = strlen(tmp);
382 if (len < 10){
383 len++;
384 } else if(len < 100){
385 len += 2;
386 } else if(len < 1000){
387 len += 3;
389 len += 7;
391 sprintf((char *)((int)ret+offset), "PARAM[%i]%s", strlen(tmp), tmp);
392 offset += len;
393 free(tmp);
396 *r = offset;
397 return ret;
400 /* TODO: Better memory management for IPCMessage */
401 int SocketIPC::PushMessage(IPCMessage * msg){
402 char * tmp;
403 int l;
405 tmp = makemsg(msg, &l);
406 //printf("IPCMSG: %s\n", tmp);
408 sock->Write(tmp, l);
409 free(tmp);
411 return 0;
414 IPCMessage * SocketIPC::PopMessage(){
415 IPCMessage * msg;
417 msg = msgQueue.front();
418 msgQueue.pop();
420 return msg;
423 int SocketIPC::RawRead(char * buff, int size){
424 return sock->Read(buff, size);
427 int SocketIPC::RawWrite(char * buff, int size){
428 return sock->Write(buff, size);