1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
18 #include<SessionImpl.h>
22 #include<Transaction.h>
24 #include<CacheTableLoader.h>
25 char* version
= "csql-linux-i686-3.0GA";
31 bool recoverFlag
=false;
32 bool monitorServer
= false;
33 SessionImpl
*session
= NULL
;
34 static void sigTermHandler(int sig
)
36 printf("Received signal %d\nStopping the server\n", sig
);
40 static void sigChildHandler(int sig
)
42 os::signal(SIGCHLD
, sigChildHandler
);
44 waitpid(-1, &stat
, WNOHANG
);
45 //TODO::move waitpid to os wrapper
48 bool checkDead(pid_t pid
)
50 int ret
= os::kill(pid
, 0);
53 printError(ErrWarning
, "No permission to check process %d is alive.");
60 DbRetVal
releaseAllResources(Database
*sysdb
, ThreadInfo
*info
)
62 printf("Releasing all the resources for process %d %lu\n", info
->pid_
, info
->thrid_
);
63 //recover for all the mutexes in has_
64 for (int i
=0; i
< MAX_MUTEX_PER_THREAD
; i
++)
66 if (info
->has_
[i
] != NULL
)
68 printf("Dead Procs: %d %lu holding mutex %x %s \n", info
->pid_
, info
->thrid_
, info
->has_
[i
], info
->has_
[i
]->name
);
69 logFine(Conf::logger
, "Dead Procs: %d %lu holding mutex %x %s \n", info
->pid_
, info
->thrid_
, info
->has_
[i
], info
->has_
[i
]->name
);
70 //TODO::recovery of mutexes
71 sysdb
->recoverMutex(info
->has_
[i
]);
76 TransactionManager
*tm
= new TransactionManager();
77 LockManager
*lm
= new LockManager(sysdb
);
78 if (info
->thrTrans_
.trans_
!= NULL
&& info
->thrTrans_
.trans_
->status_
== TransRunning
)
80 printf("Rollback Transaction %x\n", info
->thrTrans_
.trans_
);
81 tm
->rollback(lm
, info
->thrTrans_
.trans_
);
82 info
->thrTrans_
.trans_
->status_
= TransNotUsed
;
90 DbRetVal
cleanupDeadProcs(Database
*sysdb
)
92 DbRetVal rv
= sysdb
->getProcessTableMutex(false);
95 printError(rv
,"Unable to get process table mutex");
100 pthread_t thrid
= os::getthrid();
103 ThreadInfo
* tInfo
= sysdb
->getThreadInfo(0);
105 ThreadInfo
* freeSlot
= NULL
;
106 for (; i
< Conf::config
.getMaxProcs(); i
++)
108 //check whether it is alive
109 if (tInfo
->pid_
!=0 && checkDead(tInfo
->pid_
)) releaseAllResources(sysdb
, tInfo
);
112 sysdb
->releaseProcessTableMutex(false);
117 DbRetVal
logActiveProcs(Database
*sysdb
)
119 DbRetVal rv
= sysdb
->getProcessTableMutex(false);
122 printError(rv
,"Unable to get process table mutex");
125 ThreadInfo
* tInfo
= sysdb
->getThreadInfo(0);
127 ThreadInfo
* freeSlot
= NULL
;
128 for (; i
< Conf::config
.getMaxProcs(); i
++)
130 if (tInfo
->pid_
!=0 ) {
131 logFine(Conf::logger
, "Registered Procs: %d %lu\n", tInfo
->pid_
, tInfo
->thrid_
);
132 printf("Client process with pid %d is still registered\n", tInfo
->pid_
);
133 if( tInfo
->pid_
!= asyncpid
&& tInfo
->pid_
!= cachepid
&&
134 tInfo
->pid_
!= sqlserverpid
)
139 sysdb
->releaseProcessTableMutex(false);
140 if (count
) return ErrSysInternal
; else return OK
;
142 void startCacheServer()
145 sprintf(execName
, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT"));
147 //printf("filename is %s\n", execName);
148 cachepid
= os::createProcess(execName
, "csqlcacheserver");
150 printf("Cache Receiver Started\t [PID=%d]\n",cachepid
);
154 void startServiceClient()
157 sprintf(execName
, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT"));
158 //printf("filename is %s\n", execName);
160 sqlserverpid
= os::createProcess(execName
, "csqlsqlserver");
161 if (sqlserverpid
!= -1)
162 printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid
,Conf::config
.getPort());
167 void startAsyncServer()
170 sprintf(execName
, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT"));
171 //printf("filename is %s\n", execName);
173 asyncpid
= os::createProcess(execName
, "csqlasyncserver");
175 printf("Async Cache Server Started [PID=%d]\n", asyncpid
);
178 void startCheckpointServer()
181 sprintf(execName
, "%s/bin/csqlcheckpointserver", os::getenv("CSQL_INSTALL_ROOT"));
183 chkptpid
= os::createProcess(execName
, "csqlcheckpointserver");
184 if (chkptpid
!= -1) {
185 printf("Checkpoint Server Started [PID=%d]\n", chkptpid
);
186 logFine(Conf::logger
, "Checkpoint Server Started pid:%d", chkptpid
);
191 int recoverAndCheckPoint()
193 char dbRedoFileName
[MAX_FILE_LEN
];
194 char dbChkptSchema
[MAX_FILE_LEN
];
195 char dbChkptMap
[MAX_FILE_LEN
];
196 char dbChkptData
[MAX_FILE_LEN
];
197 char dbBackupFile
[MAX_FILE_LEN
];
200 //check for check point file if present recover
201 sprintf(dbChkptSchema
, "%s/db.chkpt.schema1", Conf::config
.getDbFile());
202 if (FILE *file
= fopen(dbChkptSchema
, "r")) {
204 sprintf(cmd
, "cp -f %s %s/db.chkpt.schema", dbChkptSchema
,
205 Conf::config
.getDbFile());
206 int ret
= system(cmd
);
208 printError(ErrOS
, "backup schema file: Recovery failed.");
213 sprintf(dbChkptMap
, "%s/db.chkpt.map1", Conf::config
.getDbFile());
214 if (FILE *file
= fopen(dbChkptMap
, "r")) {
216 sprintf(cmd
, "cp -f %s %s/db.chkpt.map", dbChkptMap
,
217 Conf::config
.getDbFile());
218 int ret
= system(cmd
);
220 printError(ErrOS
, "backup map file: Recovery failed.");
225 int chkptID
= Database::getCheckpointID();
226 sprintf(dbChkptData
, "%s/db.chkpt.data%d", Conf::config
.getDbFile(),
228 sprintf(dbBackupFile
, "%s/db.chkpt.data1", Conf::config
.getDbFile());
231 if (!Conf::config
.useMmap() && (fl
= fopen(dbBackupFile
, "r"))) {
233 sprintf(cmd
, "cp -f %s/db.chkpt.data1 %s", Conf::config
.getDbFile(),
235 int ret
= system(cmd
);
237 printError(ErrOS
, "backup data file. Recovery failed.");
241 if (FILE *file
= fopen(dbChkptData
, "r")) {
243 int ret
= system("recover");
245 printError(ErrSysInternal
, "recover: Recovery failed\n");
250 //check for redo log file if present apply redo logs
251 sprintf(dbRedoFileName
, "%s/csql.db.cur", Conf::config
.getDbFile());
252 if (FILE *file
= fopen(dbRedoFileName
, "r")) {
254 int ret
= system("redo -a");
256 printError(ErrSysInternal
, "redo: Recovery failed.\n");
259 DatabaseManager
*dbMgr
= session
->getDatabaseManager();
260 DbRetVal rv
= dbMgr
->checkPoint();
262 printError(ErrSysInternal
,"checkpoint: Recovery failed.");
269 DbRetVal
recoverCachedTables()
271 printf("Database server recovering cached tables...\n");
272 logFine(Conf::logger
, "Recovering Cached tables");
273 int ret
= system("cachetable -R");
275 printError(ErrSysInternal
, "cachetable: Recovery failed %d\n", ret
);
276 return ErrSysInternal
;
278 printf("Cached Tables recovered\n");
279 logFine(Conf::logger
, "Cache Tables Recovery Complete");
284 printf("Usage: csqlserver [-c] [-v]\n");
285 printf(" v -> print the version.\n");
286 printf(" c -> recover all cached tables from the target database.\n");
287 printf("Description: Start the csql server and initialize the database.\n");
291 int main(int argc
, char **argv
)
294 bool freshStart
= false;
295 while ((c
= getopt(argc
, argv
, "cvi?")) != EOF
)
299 case '?' : { opt
= 10; break; } //print help
300 case 'c' : { opt
= 1; break; } //recover all the tables from cache
301 case 'v' : { opt
= 2; break; } //print version
302 case 'i' : { freshStart
= true; break; }
307 if (opt
== 10) { printUsage(); return 0; }
308 else if (opt
==2) { printf("%s\n",version
); return 0; }
310 session
= new SessionImpl();
311 DbRetVal rv
= session
->readConfigFile();
314 printf("Unable to read the configuration file \n");
321 sprintf(cmd
, "rm -rf %s/*", Conf::config
.getDbFile());
322 int ret
= system(cmd
);
323 if (ret
!= 0) { delete session
; return 2; }
324 if (Conf::config
.useDurability()) {
325 FILE *fp
= fopen(Conf::config
.getTableConfigFile(), "w+");
330 os::signal(SIGINT
, sigTermHandler
);
331 os::signal(SIGTERM
, sigTermHandler
);
332 os::signal(SIGCHLD
, sigChildHandler
);
333 rv
= Conf::logger
.startLogger(Conf::config
.getLogFile(), true);
336 printf("Unable to start the Conf::logger\n");
341 logFine(Conf::logger
, "Server Started");
342 int ret
= session
->initSystemDatabase();
344 printf("Attaching to exising database\n");
345 logFine(Conf::logger
, "Attaching to existing database instance");
348 session
= new SessionImpl();
349 ret
= session
->open(DBAUSER
, DBAPASS
);
351 printError(ErrSysInternal
,
352 "Unable to attach to existing database\n");
353 Conf::logger
.stopLogger();
354 session
->destroySystemDatabase();
360 struct timeval timeout
, tval
;
363 Database
* sysdb
= session
->getSystemDatabase();
367 if (isInit
) UID
.create();
369 if(isInit
&& Conf::config
.useDurability()) {
370 ret
= recoverAndCheckPoint();
372 Conf::logger
.stopLogger();
373 session
->destroySystemDatabase();
380 bool isAsyncReq
= Conf::config
.useCache() &&
381 Conf::config
.getCacheMode() == ASYNC_MODE
;
382 bool isCacheReq
= Conf::config
.useCache() && Conf::config
.useTwoWayCache()
383 && Conf::config
.getCacheMode() != OFFLINE_MODE
;
384 bool isSQLReq
= Conf::config
.useCsqlSqlServer();
385 bool isChkptReq
= Conf::config
.useDurability();
387 if (isInit
&& !Conf::config
.useDurability() && Conf::config
.useCache()) {
388 rv
= recoverCachedTables();
390 Conf::logger
.stopLogger();
391 session
->destroySystemDatabase();
397 //TODO:: kill all the child servers and restart if !isInit
399 if(isSQLReq
) startServiceClient();
401 int msgid
= os::msgget(Conf::config
.getMsgKey(), 0666);
402 if (msgid
!= -1) os::msgctl(msgid
, IPC_RMID
, NULL
);
405 if (isCacheReq
) startCacheServer();
406 if(isChkptReq
) startCheckpointServer();
408 printf("Database Server Started...\n");
409 logFine(Conf::logger
, "Database Server Started");
410 monitorServer
= Conf::config
.useMonitorServers();
415 tval
.tv_sec
= timeout
.tv_sec
;
416 tval
.tv_usec
= timeout
.tv_usec
;
417 os::select(0, 0, 0, 0, &tval
);
419 //send signal to all the registered process..check they are alive
420 cleanupDeadProcs(sysdb
);
423 if (isCacheReq
&& cachepid
!=0 && checkDead(cachepid
)) {
424 logFine(Conf::logger
, "Cache Receiver Died pid:%d", cachepid
);
427 if (isAsyncReq
&& asyncpid
!=0 && checkDead(asyncpid
)) {
428 logFine(Conf::logger
, "Async Server Died pid:%d", asyncpid
);
429 int msgid
= os::msgget(Conf::config
.getMsgKey(), 0666);
430 if (msgid
!= -1) os::msgctl(msgid
, IPC_RMID
, NULL
);
433 if (isSQLReq
&& sqlserverpid
!=0 && checkDead(sqlserverpid
)) {
434 logFine(Conf::logger
, "Network Server Died pid:%d", sqlserverpid
);
435 startServiceClient();
437 if (isChkptReq
&& chkptpid
!=0 && checkDead(chkptpid
)) {
438 logFine(Conf::logger
, "Checkpoint Server Died pid:%d", chkptpid
);
439 startCheckpointServer();
443 if (logActiveProcs(sysdb
) != OK
) {srvStop
= 0;
444 monitorServer
= Conf::config
.useMonitorServers();
447 if (cachepid
) os::kill(cachepid
, SIGTERM
);
448 if(asyncpid
) os::kill(asyncpid
, SIGTERM
);
449 if (sqlserverpid
) os::kill(sqlserverpid
, SIGTERM
);
450 if (chkptpid
) os::kill(chkptpid
, SIGTERM
);
452 if (Conf::config
.useDurability() && Conf::config
.useMmap()) {
454 char *startAddr
= (char *) sysdb
->getMetaDataPtr();
455 msync(startAddr
+ Conf::config
.getMaxSysDbSize(),
456 Conf::config
.getMaxDbSize(), MS_SYNC
);
457 munmap(startAddr
+ Conf::config
.getMaxSysDbSize(),
458 Conf::config
.getMaxDbSize());
460 logFine(Conf::logger
, "Server Exiting");
461 printf("Server Exiting\n");
462 logFine(Conf::logger
, "Server Ended");
464 session
->destroySystemDatabase();
465 Conf::logger
.stopLogger();