2 FancyMail: Queue Manager
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 /* TODO: Add multiple Queues support */
24 #include <libfmail/socketipc.h>
31 if ((v
>= 'A') && (v
<= 'Z'))
37 if ((v
>= 'a') && (v
<= 'z'))
52 Subscriber(const char * uri
){
56 void Alert(char * qname
){
60 ipc
= IPC::CreateIPC((char *)subs_uri
.c_str());
63 msg
= new IPCMessage("alert");
64 msg
->PushParam(qname
);
65 ipc
->SendMessage(msg
);
73 /* TODO: Change to state based machine, add error handling */
74 class QueueHandler
: public ProtocolHandler
{
75 Semaphore qlock
;/* Message Queue Semaphore*/
76 queue
<string
*> msg_queue
;/* Message Queue */
77 Semaphore slock
;/* Subscriber Queue Semaphore */
78 queue
<Subscriber
*> subs_queue
;/* Subscribe Queue */
81 QueueHandler(const char * qpath
, const char * name
){
82 /* Initialize Queue Path */
83 sprintf(path
, "%s/%s/", qpath
, name
);
85 /* Initialize semaphore value to 1 */
89 int Handle(Socket
* s
){
90 char buffer
[255], * queue_name
;
100 /* Start IPC in the socket */
101 ipc
= new SocketIPC(s
);
104 /* Get our message */
105 msg
= ipc
->ReciveMessage();
112 if (!strcmp(msg
->GetMessageName(), "slot")){
113 /* TODO: check that queue exists */
114 /* We recived a slot request, get for which queue*/
115 queue_name
= msg
->PopParam();
118 /* Get current time*/
120 gtime
= gmtime(&ctime
);
122 /* Generate a Random number */
125 /* Construct a unique filename for the slot */
126 sprintf(buffer
, "%s%i%i%i%i%i", path
, gtime
->tm_yday
, gtime
->tm_hour
, gtime
->tm_min
, gtime
->tm_sec
, r
);
128 /* Send positive response, with the new slot name */
129 msg
= new IPCMessage("ok");
130 msg
->PushParam(buffer
);
131 ipc
->SendMessage(msg
);
134 } else if (!strcmp(msg
->GetMessageName(), "push")){
135 /* Get the slot name */
136 slot
= new string(msg
->PopParam());
138 /* Decrement semaphore */
141 /* Push the slot to the queue */
142 msg_queue
.push(slot
);
144 /* Increment semaphore */
148 } else if (!strcmp(msg
->GetMessageName(), "pop")){
149 /* Decrement semaphore */
152 /* Check if there is anything in the queue */
153 if (msg_queue
.size() == 0){
156 /* If there is messages in the queue pop them */
157 slot
= msg_queue
.front();
162 /* Increment sempahore */
167 /* Send a positive response with the message filename */
168 msg
= new IPCMessage("ok");
169 msg
->PushParam((char *)slot
->c_str());
171 /* There is no messages in the queue, return error */
172 msg
= new IPCMessage("error");
174 ipc
->SendMessage(msg
);
176 } else if (!strcmp(msg
->GetMessageName(), "quit")){
180 } else if (!strcmp(msg
->GetMessageName(), "subscribe")){
182 } else if (!strcmp(msg
->GetMessageName(), "unsubscribe")){
191 class SimpleLoad
: public LoadHandler
{
193 int Dispatch(Socket
* sock
, ProtocolHandler
* ph
){
195 return ph
->Handle(sock
);
202 QueueHandler
* handler
;
204 int ret
, port
, tcount
;
205 std::string conf_handler
, qname
, qpath
;
206 Configuration
conf("queue.conf");
208 /* Load configuration */
209 port
= conf
.getInt("port", 14001);
210 conf_handler
= conf
.getString("loadhandler", "simple");
212 /* Create socket and bind to QUEUE port */
213 s
= Socket::CreateSocket(SOCKET_INET
, 0);
214 s
->setAddress("127.0.0.1");
219 printf("Error: Unable to bind to port\n");
225 /* Read Queue configuration */
226 qname
= conf
.getString("queue.name", "incoming");
227 qpath
= conf
.getString("queue.path", "/tmp/fmail/queue");
229 /* Create our Queue handler and Threaded LoadHandler */
230 handler
= new QueueHandler(qpath
.c_str(), qname
.c_str());
232 if (conf_handler
== "thread"){
233 tcount
= conf
.getInt("thread.count", 10);
234 lh
= new ThreadLoad(handler
, tcount
);
236 lh
= new SimpleLoad();
239 /* TODO: Exit gracefuly, don't serve forever. */
241 /* Wait for incoming connections */
242 ret
= s
->Poll(1000000, SOCKET_POLL_READ
| SOCKET_POLL_ERROR
);
244 if (ret
& SOCKET_POLL_READ
){
245 /* Accept client and dispatch */
247 lh
->Dispatch(cl
, handler
);
248 } else if(ret
& SOCKET_POLL_ERROR
){ //error