Started fixing IPC problems with SMTP and QUEUE Server
[fmail.git] / src / socketipc.cpp
blob2fb47df6a53f14ccfd87b5a9ff5e70c7f3a4c7a3
1 /*
2 libfmail: Socket Based IPC mechanism
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <stdio.h>
22 #include <string.h>
23 #include <malloc.h>
25 #include <pcrecpp.h>
27 #include <queue>
29 #include <libfmail/socket.h>
30 #include <libfmail/ipcmsg.h>
31 #include <libfmail/ipc.h>
32 #include <libfmail/socketipc.h>
34 SocketIPC::SocketIPC(Socket *s){
35 sock = s;
38 SocketIPC::SocketIPC(char *uri){
39 char *str;
40 string shost;
41 string options;
42 pcrecpp::RE re("([\\w\\d\\.]+)\\:(\\d*)[/]*(\\w*)");
44 re.PartialMatch(uri, &shost, &port, &options);
45 host = (char*)shost.c_str();
47 printf("Started socket on: %s\n", uri);
48 printf("Host: %s\nPort: %i\n", host, port);
49 sock = Socket::CreateSocket(SOCKET_INET, 0);
50 sock->setPort(port);
51 sock->setAddress(host);
52 auxsock = NULL;
55 int SocketIPC::RequestIPC(){
56 int ret;
57 int i;
59 for (i = 0; i < 20; i++){
60 ret = sock->Connect();
61 if (ret == 0)
62 break;
65 if (ret)
66 printf("Unable to connect to IPC\n");
68 return ret;
71 int SocketIPC::ListenIPC(){
72 sock->Bind();
73 sock->Listen(10);
75 auxsock = sock->Accept();
76 return 0;
79 int SocketIPC::CloseIPC(){
80 sock->Close();
81 if (auxsock)
82 auxsock->Close();
85 /* TODO: Change to tree/graph based state machine */
86 int SocketIPC::FetchMessage2(){
87 char buffer[255], xbuff[255], tbuff[20], *msgdata;
88 int state, onParse;
89 int i, t, r, mlen, margc;
90 IPCMessage *msg;
92 //printf("IPC: Parsing Message\n");
93 state = 0;
94 onParse = 1;
96 mlen = 0;
97 margc = 0;
98 msg = NULL;
100 if (auxsock == NULL)
101 auxsock = sock;
103 while (onParse){
104 //printf("\tReading Buffer\n");
105 memset(buffer, 0, 255);
106 r = auxsock->Read(buffer, 255);
107 //printf("\tBuffer Contents: %s\n", buffer);
109 if (r < 1){
110 return 0;
111 state = -1;
112 onParse = 0;
115 for (i = 0; i < r; i++){
116 switch (state){
117 case 0:
118 if (buffer[i] == 'M')
119 state++;
120 break;
121 case 1:
122 if (buffer[i] == 'S')
123 state++;
124 break;
125 case 2:
126 if (buffer[i] == 'G')
127 state++;
128 break;
129 case 3:
130 if (buffer[i] == '['){
131 state++;
132 memset(tbuff, 0, 20);
133 t = 0;
135 break;
136 case 4:
137 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
138 tbuff[t] = buffer[i];
139 t++;
140 if (t == 20)
141 state = -1;
142 }else{
143 mlen = atoi(tbuff);
144 state++;
146 break;
147 case 5:
148 if (buffer[i] == '['){
149 state++;
150 memset(tbuff, 0, 20);
151 t = 0;
153 break;
154 case 6:
155 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
156 tbuff[t] = buffer[i];
157 t++;
158 if (t == 20)
159 state = -1;
160 }else{
161 margc = atoi(tbuff);
162 state++;
163 if (mlen < 255){
164 msgdata = xbuff;
165 }else{
166 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
168 t = 0;
170 break;
171 case 7:
172 if (t < mlen){
173 msgdata[t] = buffer[i];
174 t++;
175 if (t == mlen){
176 msgdata[t] = 0;
177 state++;
180 break;
181 case 8:
182 printf("Msg Header: %s\n", msgdata);
183 msg = new IPCMessage(msgdata);
184 if (msgdata != xbuff)
185 free(msgdata);
186 state++;
188 if (margc == 0)
189 state = 20;
190 i--;
191 break;
192 case 9:
193 if (buffer[i] == 'P')
194 state++;
195 break;
196 case 10:
197 if (buffer[i] == 'A')
198 state++;
199 break;
200 case 11:
201 if (buffer[i] == 'R')
202 state++;
203 break;
204 case 12:
205 if (buffer[i] == 'A')
206 state++;
207 break;
208 case 13:
209 if (buffer[i] == 'M')
210 state++;
211 break;
212 case 14:
213 if (buffer[i] == '['){
214 state++;
215 memset(tbuff, 0, 20);
216 t = 0;
218 break;
219 case 15:
220 if ((buffer[i] >= 48) && (buffer[i] <= 57)){
221 tbuff[t] = buffer[i];
222 t++;
223 if (t == 20)
224 state = -1;
225 }else{
226 mlen = atoi(tbuff);
227 state++;
229 break;
230 case 16:
231 if (mlen < 255){
232 msgdata = xbuff;
233 }else{
234 msgdata = (char*)malloc(sizeof(char) * mlen + 1);
236 t = 0;
237 state++;
238 i--;
239 break;
240 case 17:
241 if (t < mlen){
242 msgdata[t] = buffer[i];
243 t++;
244 if (t == mlen){
245 msgdata[t] = 0;
246 state++;
247 i--;
250 break;
251 case 18:
252 msg->PushParam(msgdata);
253 if (msgdata != xbuff)
254 free(msgdata);
255 if (msg->ParamCount() == margc){
256 state++;
257 }else{
258 state = 9;
260 onParse = 0;
261 break;
262 default:
263 onParse = 0;
264 break;
269 if (state == -1){
270 if (msg)
271 delete msg;
272 return 0;
275 printf("IPC Message Pushed\n");
276 msgQueue.push(msg);
277 return 1;
281 int FetchMessage(){
282 char buffer[255], *param;
283 int i, j, k, s, r;
284 IPCMessage *msg;
286 memset(buffer, 0, 255);
287 r = auxsock->Read(buffer, 255);
289 if (r < 3)
290 return 0;
292 printf("buffer: %s\n", buffer);
293 m = odk_regex_match(re, buffer, 0);
295 if (m == NULL)
296 return 0;
297 printf("Regex Matched!!\n");
298 if (buffer[0] != 'M')
299 return 0;
301 param = odk_submatch_copy(buffer, m, 1);
302 i = atoi(param);
303 free(param);
305 param = odk_submatch_copy(buffer, m, 2);
306 j = atoi(param);
307 free(param);
309 memset(buffer, 0, 255);
310 r = auxsock->Read(buffer, 255);
312 if (r != i)
313 return 0;
315 msg = new IPCMessage(buffer);
316 for (i = 0; i < j; i++){
317 auxsock->Read(buffer, 255);
318 m = odk_regex_match(re, buffer, 0);
320 if ((m == NULL) || (buffer[0] != 'P'))
321 return 0;
322 param = odk_submatch_copy(buffer, m, 1);
323 k = atoi(param);
324 free(param);
326 if ((k < 1) || (k > 32000))
327 return 0;
329 param = (char*)malloc(sizeof(char) * k+1);
331 memset(param, 0, k+1);
332 s = 0;
334 while (s < k){
335 r = auxsock->Read(buffer, 255);
336 memcpy((char*)((int)param+s), buffer, r);
337 s += r;
339 msg->PushParam(param);
342 msgQueue.push(msg);
343 return 1;
347 int SocketIPC::PeekMessage(){
349 if(FetchMessage2())
350 return 1;
352 if (msgQueue.size() > 0)
353 return 1;
355 return 0;
358 int SocketIPC::SendMessage(IPCMessage *msg){
359 return PushMessage(msg);
362 IPCMessage *SocketIPC::ReciveMessage(){
363 int ret;
364 IPCMessage *msg;
366 printf("IPC: Recive Message\n");
367 printf("\tmsgQueue Size: %i\n", msgQueue.size());
369 if (msgQueue.size() > 0){
370 printf("\tMSG Already on Queue\n");
371 msg = msgQueue.front();
372 msgQueue.pop();
373 return msg;
376 printf("\tPolling for Incoming Message\n");
378 while(msgQueue.size() < 1){
379 //ret = sock->Poll(1000000, SOCKET_POLL_READ);
381 if (ret == SOCKET_POLL_READ){
382 printf("\tGot Data, Parsing\n");
383 FetchMessage2();
385 FetchMessage2();
388 printf("\tmsgQueue Size: %i\n", msgQueue.size());
390 msg = msgQueue.front();
391 msgQueue.pop();
393 printf("IPC: Message Retrieved\n");
394 return msg;
397 /* TODO: Better memory management for IPCMessage */
398 int SocketIPC::PushMessage(IPCMessage *msg){
399 char *tmp, buffer[50];
400 int l, blen;
402 printf("IPC: Pushing Message\n");
404 tmp = msg->GetMessageName();
405 l = strlen(tmp);
407 sprintf(buffer, "MSG[%i][%i]", l, msg->ParamCount());
408 blen = strlen(buffer);
410 printf("\tBegin Header: %s%s\n", buffer, tmp);
412 sock->Write(buffer, blen);
413 sock->Write(tmp, l);
415 while(tmp = msg->PopParam()){
416 l = strlen(tmp);
418 sprintf(buffer, "PARAM[%i]", l);
419 blen = strlen(buffer);
421 printf("\tBegin param: %s%s\n", buffer, tmp);
423 sock->Write(buffer, blen);
424 sock->Write(tmp, l);
426 free(tmp);
429 printf("IPC Message Send OK!\n");
431 return 1;
434 IPCMessage *SocketIPC::PopMessage(){
435 IPCMessage *msg;
437 msg = msgQueue.front();
438 msgQueue.pop();
440 return msg;
443 int SocketIPC::RawRead(char *buff, int size){
444 return sock->Read(buff, size);
447 int SocketIPC::RawWrite(char *buff, int size){
448 return sock->Write(buff, size);