1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
34 typedef struct FailedStmtObject
{
39 typedef struct item ITEM
;
47 // please dont touch the following code for queIterator
48 typedef class queueIterator
54 queueIterator(ITEM
*hd
) { head
= iter
= hd
; processed
= NULL
; }
55 inline void *next(ITEM
*hd
)
57 if (head
== NULL
) { head
= iter
= hd
; }
58 if (head
!= hd
) head
= hd
;
60 if (iter
== NULL
&& processed
) {
61 if (processed
->next
!= NULL
) {
62 processed
= processed
->next
;
65 } else { return NULL
; }
68 printDebug(DM_CacheServer
, "Processed ITEM: %X", processed
);
78 // array of msg indexes processed, First index for first thread and so on.
79 // As and when the msg is read from the queue by each of the threads
80 // respcective slot is filled with that index
81 long long *processedMsgIndexArray
;
82 long long minProcessedMsgIndex
;
84 long long lastFreedIndex
;
90 nItems
= 0; head
= NULL
; processedMsgIndexArray
= NULL
;
91 qIndex
= 0; qIter
= NULL
; minProcessedMsgIndex
= 0;
94 int size
= sizeof (long long) * asySites
;
95 processedMsgIndexArray
= (long long *) malloc(size
);
96 memset(processedMsgIndexArray
, 0, size
);
97 qIter
= (QITER
**) malloc(sizeof (QITER
*) * asySites
);
98 for (int i
= 0; i
< asySites
; i
++) qIter
[i
] = new QITER(head
);
101 int push(void *log
, int len
)
103 // log includes size of long (msgType) + size of (Msg data);
104 // 2nd parameter len is the size of (Msg data) excluding size of long
106 // long long for Msg Index
107 // int for size of the msg data
109 // len bytes for msg data
110 int logSize
= sizeof(long long) + sizeof(int) + sizeof(long) + len
;
111 ITEM
*item
= (ITEM
*) malloc(sizeof(ITEM
) - sizeof(void *)
112 + os::align(logSize
));
114 char *ptr
= (char *) &item
->data
;
115 *(long long *) ptr
= ++qIndex
; ptr
+= sizeof (long long);
116 *(int*) ptr
= len
; ptr
+= sizeof (int);
117 int sizeOfMsg
= len
+ sizeof(long);
118 memcpy(ptr
, log
, sizeOfMsg
);
119 if (head
== NULL
) { nItems
++; head
= item
; return 0; }
121 while (p
->next
!= NULL
) p
= p
->next
;
126 int size() { return nItems
; }
127 void *readNextMessage(int thrIndex
)
129 if (head
== NULL
) return NULL
;
130 else return qIter
[thrIndex
]->next(head
);
132 inline void updateProcessedIndex(int thrInd
, int processedIndex
)
134 processedMsgIndexArray
[thrInd
] = processedIndex
;
136 inline long long findMinIndexForFree(int asySites
)
138 long long minIndex
= processedMsgIndexArray
[0];
139 for (int i
=1; i
< asySites
; i
++) {
140 if (minIndex
> processedMsgIndexArray
[i
]) {
141 minIndex
= processedMsgIndexArray
[i
];
146 void freeMessagesFromQueue(int asySites
)
148 long long minIndex
= findMinIndexForFree(asySites
);
149 if (minIndex
<= lastFreedIndex
) return;
151 ITEM
*freeFrom
= head
;
152 ITEM
*freeUptoThis
= NULL
;
154 while (elem
!= NULL
) {
155 ind
= * (long long *) &elem
->data
;
156 if (ind
== minIndex
) {
163 ITEM
*toFree
= elem
= freeFrom
;
164 while (elem
!= freeUptoThis
) {
167 if (toFree
) { ::free(toFree
); nItems
--; }
168 printDebug(DM_CacheServer
, "FREED %X", toFree
);
170 if (elem
) { ::free(elem
); nItems
--; }
171 printDebug(DM_CacheServer
, "FREED %X", elem
);
172 lastFreedIndex
= minIndex
;
176 typedef class queue QUE
;
178 class ThreadInputData
182 ThreadInputData() { thrIndex
= 0; }
185 void *startThread(void *p
);
189 printf("Usage: csqlasyncserver \n");
190 printf("Description: Start the csql Async server.\n");
194 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *hashBucketPtr
,
195 SqlApiImplType flag
, List
*prepareFailList
);
196 void *freeMsgFromQueue(void *p
);
197 DbRetVal
handlePrepare(void *str
, void *conn
, void *stmtBuckets
,
198 SqlApiImplType flag
, List
*prepareFailList
);
199 DbRetVal
handleCommit(void *str
, int len
, void *conn
, void *stmtBuckets
,
200 List
*prepareFailList
);
201 DbRetVal
handleFree(void *str
, void *stmtBuckets
, List
*prepareFailList
);
202 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
, char *dsn
);
208 ThreadInputData
**thrInput
;
209 pthread_t freeThrId
= 0;
211 static void sigTermHandler(int sig
)
213 printf("Received signal %d\nStopping the server\n", sig
);
214 os::msgctl(msgKey
, IPC_RMID
, NULL
);
218 int main(int argc
, char **argv
)
221 while ((c
= getopt(argc
, argv
, "?")) != EOF
) {
223 case '?' : { opt
= 10; break; } //print help
228 if (opt
== 10) { printUsage(); return 0; }
230 os::signal(SIGINT
, sigTermHandler
);
231 os::signal(SIGTERM
, sigTermHandler
);
233 Conf::config
.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
234 msgKey
= os::msgget(Conf::config
.getMsgKey(), 0666);
236 printf("Message Queue creation failed\n");
240 //Only single cache async updation is supported hence hard coded.
243 // Create and Initialize repl server queue
244 que
= new queue(asyncSites
);
246 pthread_t
*thrId
=new pthread_t
[asyncSites
];
247 int thrInfoSize
= sizeof(ThreadInputData
*) * asyncSites
;
248 thrInput
= (ThreadInputData
**) malloc(thrInfoSize
);
251 if(Conf::config
.useCache() && Conf::config
.getCacheMode()==ASYNC_MODE
) {
252 thrInput
[i
] = new ThreadInputData();
253 thrInput
[i
]->thrIndex
= i
;
254 pthread_create(&thrId
[i
], NULL
, &startThread
, thrInput
[i
]);
257 pthread_create(&freeThrId
, NULL
, freeMsgFromQueue
, (void *) asyncSites
);
258 struct timeval timeout
;
259 int msgSize
= Conf::config
.getAsyncMsgMax();
265 os::select(0, 0, 0, 0, &timeout
);
266 printDebug(DM_CacheServer
, "waiting for message");
268 // pick messages from message que with key msgKey
269 long size
= os::msgrcv(msgKey
, str
, msgSize
, 0, 0666|IPC_NOWAIT
);
270 printDebug(DM_CacheServer
, "Received msg size = %d", size
);
271 if (size
== -1 || srvStop
) break;
272 // push the received msg to the repl server queue
273 que
->push(str
, size
);
277 printf("Replication Server Exiting\n");
281 void *startThread(void *thrInfo
)
284 DbRetVal proMsgRetVal
= OK
;
286 ThreadInputData
*thrInput
= (ThreadInputData
*)thrInfo
;
287 List prepareFailList
;
288 SqlApiImplType flag
= CSqlAdapter
;
289 int thrInd
= thrInput
->thrIndex
;
290 printDebug(DM_CacheServer
, "SqlAdapter Thread created");
291 AbsSqlConnection
*conn
= SqlFactory::createConnection(flag
);
293 void *stmtBuckets
= malloc (STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
294 memset(stmtBuckets
, 0, STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
295 printDebug(DM_CacheServer
, "stmtbuckets: %x", stmtBuckets
);
297 struct timeval timeout
, tval
;
300 rv
= conn
->connect(I_USER
, I_PASS
);
302 printError(rv
, "Unable to connect to peer site");
305 os::select(0, 0, 0, 0, &timeout
);
309 if (proMsgRetVal
!= ErrNoConnection
) {
311 msg
= que
->readNextMessage(thrInd
);
313 if (msg
!= NULL
) break;
316 os::select(0, 0, 0, 0, &tval
);
318 long long index
= *(long long *) msg
;
319 printDebug(DM_CacheServer
, "Received message with index: %lld",
321 int length
= *(int *)((char *)msg
+sizeof(long long));
322 char *msgptr
= (char *)msg
+ sizeof(long long) + sizeof(int);
323 printDebug(DM_CacheServer
, "entering process message");
324 proMsgRetVal
= processMessage(msgptr
, length
, conn
, stmtBuckets
,
325 flag
, &prepareFailList
);
326 if (proMsgRetVal
== ErrNoConnection
) break;
327 printDebug(DM_CacheServer
, "Processed message with index: %lld",
329 //store processed index in the processed index array
330 que
->updateProcessedIndex(thrInd
, index
);
331 printDebug(DM_CacheServer
, "Updated processed index %lld", index
);
337 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *stmtBuckets
,
338 SqlApiImplType flag
, List
*prepareFailList
)
340 long type
= *(long *) str
;
341 printDebug(DM_CacheServer
, "type = %d\n", type
);
342 char *data
= (char *) str
+ sizeof(long);
343 if (type
== 1) return handlePrepare(data
, conn
, stmtBuckets
, flag
,
345 else if (type
== 2) return handleCommit(data
, len
, conn
, stmtBuckets
,
347 else if (type
== 3) return handleFree(data
, stmtBuckets
, prepareFailList
);
350 void *freeMsgFromQueue(void *nAsync
)
352 int asySites
= (int)(long)nAsync
;
354 printDebug(DM_CacheServer
, "Waiting for free the q elements");
356 que
->freeMessagesFromQueue(asySites
);
359 os::select(0, 0, 0, 0, &tval
);
364 DbRetVal
handlePrepare(void *data
, void *conn
, void *stmtBuckets
,
365 SqlApiImplType flag
, List
*prepareFailList
)
368 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
369 AbsSqlStatement
*stmt
= SqlFactory::createStatement(flag
);
370 stmt
->setConnection(con
);
371 char *ptr
= (char *) data
;
372 int length
= *(int *) ptr
; ptr
+= sizeof(int);
373 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
374 int stmtId
= *(int *) ptr
; ptr
+= sizeof(int);
375 char *tblName
= ptr
; ptr
+= IDENTIFIER_LENGTH
;
376 char *stmtstr
= (char *)data
+ 3 * sizeof(int) + IDENTIFIER_LENGTH
;
379 unsigned int mode
= TableConf::config
.getTableMode(tblName
);
380 bool isCached
= TableConf::config
.isTableCached(mode
);
382 if ((flag
== CSqlAdapter
) && !isCached
) {
383 FailStmt
*fst
= new FailStmt();
384 fst
->stmtId
= stmtId
;
385 fst
->eType
= ErrNotCached
;
386 prepareFailList
->append(fst
);
390 printDebug(DM_CacheServer
, "stmt str: %s", stmtstr
);
391 rv
= stmt
->prepare(stmtstr
);
393 FailStmt
*fst
= new FailStmt();
394 fst
->stmtId
= stmtId
;
396 prepareFailList
->append(fst
);
400 recovery
.setStmtBucket(stmtBuckets
);
401 recovery
.addToHashTable(stmtId
, stmt
, stmtstr
);
402 printDebug(DM_CacheServer
, "returning from prepare");
406 DbRetVal
handleCommit(void *data
, int len
, void *conn
, void *stmtBuckets
,
407 List
*prepareFailList
)
410 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
411 // get dsn if adapter to write into conflict resolution file
412 char *dsstring
= NULL
;
413 SqlOdbcConnection
*adCon
= (SqlOdbcConnection
*) con
;
414 dsstring
= adCon
->dsString
;
415 char *ptr
= (char *) data
;
416 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
417 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
418 FailStmt
*elem
= NULL
;
419 rv
= con
->beginTrans();
421 printError(rv
, "Begin trans failed");
425 recovery
.setStmtBucket(stmtBuckets
);
426 while ((ptr
- (char *)data
) < len
) {
427 int stmtId
= *(int *)ptr
;
429 AbsSqlStatement
*stmt
= recovery
.getStmtFromHashTable(stmtId
);
430 printDebug(DM_CacheServer
, "commit: stmtId: %d", stmtId
);
431 printDebug(DM_CacheServer
, "commit: stmtbuckets: %x", stmtBuckets
);
432 printDebug(DM_CacheServer
, "commit: stmt: %x", stmt
);
433 ExecType type
= (ExecType
) (*(int *) ptr
);
435 if (type
== SETPARAM
) {
436 int parampos
= *(int *) ptr
;
438 int isNull
= *(int *) ptr
;
440 DataType dataType
= (DataType
) ( *(int *) ptr
);
442 int length
= *(int *) ptr
;
447 SqlStatement::setParamValues(stmt
, parampos
,
448 dataType
, length
, (char *)value
);
449 } else { if (stmt
!= NULL
) stmt
->setNull(parampos
); }
451 // start executing and committing for all active connections
453 if (stmt
!= NULL
) rv
= stmt
->execute(rows
);
455 printError(rv
, "Execute failed with return value %d", rv
);
456 if (rv
== ErrNoConnection
) return rv
;
458 // write to conflict resolution file
459 writeToConfResFile(data
, len
, stmtBuckets
, dsstring
);
465 ListIterator it
= prepareFailList
->getIterator();
467 while (it
.hasElement()) {
468 elem
= (FailStmt
*) it
.nextElement();
469 if (elem
->stmtId
== stmtId
) { found
= true; break; }
471 if (! found
) continue; // for local table
472 if ((elem
->eType
== ErrNotCached
) ||
473 elem
->eType
== ErrNotExists
)
476 // write to conflict resolution file
477 writeToConfResFile(data
, len
, stmtBuckets
, dsstring
);
485 if (rv
!= OK
) { printDebug(DM_CacheServer
, "commit failed"); }
486 else { printDebug(DM_CacheServer
, "commit passed"); }
490 DbRetVal
handleFree(void *data
, void *stmtBuckets
, List
*prepareFailList
)
493 char *ptr
= (char *) data
;
494 int len
= *(int *) ptr
; ptr
+= sizeof(int);
495 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
496 int stmtId
= *(int *)ptr
;
498 recovery
.setStmtBucket(stmtBuckets
);
499 AbsSqlStatement
*stmt
= recovery
.getStmtFromHashTable(stmtId
);
500 FailStmt
*elem
= NULL
;
502 ListIterator failListIter
= prepareFailList
->getIterator();
503 while (failListIter
.hasElement()) {
504 elem
= (FailStmt
*) failListIter
.nextElement();
505 if (elem
->stmtId
== stmtId
) break;
507 failListIter
.reset();
508 prepareFailList
->remove(elem
);
513 printError(rv
, "HandleFree failed with return vlaue %d", rv
);
516 recovery
.removeFromHashTable(stmtId
);
517 printDebug(DM_CacheServer
, "Freed the statement from hashTable");
521 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
, char *dsn
)
524 bool isPrmStmt
=false;
525 char confResFile
[1024];
526 sprintf(confResFile
, "%s", Conf::config
.getConflResoFile());
527 int fd
= open(confResFile
, O_WRONLY
|O_CREAT
| O_APPEND
, 0644);
529 printError(ErrOS
, "Could not create conflict Resolution file");
533 char paramStmtString
[1024];
535 char *ptr
= (char *) data
;
536 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
537 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
538 strcpy(buffer
, "SET AUTOCOMMIT OFF;\n");
539 int ret
= os::write(fd
, buffer
, strlen(buffer
));
540 if (ret
!= strlen(buffer
)) {
541 printError(ErrOS
, "Write error into conf resolution file");
545 int counter
= 0; // if at all the statement is parameterized
548 while ((ptr
- (char *)data
) < len
) {
549 int stmtId
= *(int *)ptr
;
551 int bucketNo
= stmtId
% STMT_BUCKET_SIZE
;
552 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
553 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
554 StmtNode
*node
= NULL
;
555 ListIterator it
= stmtBucket
->bucketList
.getIterator();
556 while(it
.hasElement()) {
557 node
= (StmtNode
*) it
.nextElement();
558 if(stmtId
== node
->stmtId
) break;
560 printf("DEBUG:node = %x\n", node
);
561 ExecType type
= (ExecType
) (*(int *) ptr
);
563 if (type
== SETPARAM
) {
567 sprintf(paramStmtString
, "%s", node
->stmtstr
);
568 char *it
= node
->stmtstr
;
570 int parampos
= *(int *)ptr
;
572 DataType dataType
= (DataType
) ( *(int *) ptr
);
574 int length
= *(int *) ptr
;
578 char * it
= paramStmtString
;
581 while (*it
!= '\0') {
584 if(pos
!= parampos
) { it
++; continue; }
589 case typeString
: case typeBinary
: case typeDate
:
590 case typeTime
: case typeTimeStamp
:
592 AllDataType::convertToString(it
, value
, dataType
, length
);
593 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
598 AllDataType::convertToString(it
, value
, dataType
, length
);
599 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
603 sprintf(it
, " %s", buffer
);
604 //strcpy(buffer, paramStmtString);
611 sprintf(buffer
, "%s", node
->stmtstr
);
612 buffer
[strlen(buffer
)] = '\n';
613 ret
= os::write(fd
, buffer
, strlen(node
->stmtstr
)+1);
614 if(ret
!= strlen(node
->stmtstr
)+1) {
615 printError(ErrOS
, "Write error into conf resolution file");
619 strcpy(buffer
, paramStmtString
);
623 int strlength
= strlen(buffer
);
624 buffer
[strlen(buffer
)] = '\n';
625 ret
= os::write(fd
, buffer
, strlength
+1);
626 if(ret
!= strlength
+1) {
627 printError(ErrOS
, "Write error into conf resolution file");
633 strcpy(buffer
, "COMMIT;\n\n");
634 ret
= os::write(fd
, buffer
, strlen(buffer
));
635 if(ret
!= strlen(buffer
)) {
636 printError(ErrOS
, "Write error into conf resolution file");