2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
19 #include <netinet/in.h>
21 #include <sys/resource.h>
25 #include <sys/types.h>
39 #include <afs/cellconfig.h>
42 #include "budb_errs.h"
44 #include "budb_internal.h"
45 #include "error_macros.h"
47 #include "afs/audit.h"
49 afs_int32
DumpDB(struct rx_call
*, int, afs_int32
, charListT
*, afs_int32
*);
50 afs_int32
RestoreDbHeader(struct rx_call
*, struct DbHeader
*);
51 void *dumpWatcher(void *);
53 /* dump ubik database - interface routines */
56 * no checking for now.
60 badEntry(afs_uint32 dbAddr
)
67 * decode the arguments passed via LWP and dump the database.
71 setupDbDump(void *param
)
73 int writeFid
= (intptr_t)param
;
76 code
= InitRPC(&dumpSyncPtr
->ut
, LOCKREAD
, 1);
80 code
= writeDatabase(dumpSyncPtr
->ut
, writeFid
);
82 LogError(code
, "writeDatabase failed\n");
84 code
= close(writeFid
);
86 LogError(code
, "pipe writer close failed\n");
88 LogDebug(5, "writeDatabase complete\n");
92 ubik_EndTrans(dumpSyncPtr
->ut
);
93 return (void *)(intptr_t)(code
);
98 SBUDB_DumpDB(struct rx_call
*call
, int firstcall
, afs_int32 maxLength
,
99 charListT
*charListPtr
, afs_int32
*done
)
103 code
= DumpDB(call
, firstcall
, maxLength
, charListPtr
, done
);
104 osi_auditU(call
, BUDB_DmpDBEvent
, code
, AUD_END
);
109 DumpDB(struct rx_call
*call
,
110 int firstcall
, /* 1 - init. 0 - no init */
112 charListT
*charListPtr
,
115 #ifdef AFS_PTHREAD_ENV
116 pthread_t dumperPid
, watcherPid
;
117 pthread_attr_t dumperPid_tattr
;
118 pthread_attr_t watcherPid_tattr
;
120 PROCESS dumperPid
, watcherPid
;
125 if (callPermitted(call
) == 0)
126 ERROR(BUDB_NOTPERMITTED
);
128 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
130 /* If asking for zero bytes, then this is a call to reset the timeToLive
131 * timer. Reset it if there is a dump in progress.
133 if (maxLength
== 0) {
134 charListPtr
->charListT_val
= NULL
;
135 charListPtr
->charListT_len
= 0;
137 *done
= ((dumpSyncPtr
->statusFlags
== 0) ? 1 : 0);
139 /* reset the clock on dump timeout */
140 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
144 if (dumpSyncPtr
->statusFlags
== 0) {
146 ERROR(BUDB_DUMPFAILED
);
148 LogDebug(5, "Setup dump\n");
150 /* no dump in progress - setup and retake lock */
151 memset(dumpSyncPtr
, 0, sizeof(*dumpSyncPtr
));
152 /* ObtainWriteLock(&dumpSyncPtr->ds_lock); */
154 /* mark dump in progress */
155 dumpSyncPtr
->statusFlags
= 1;
157 code
= pipe(dumpSyncPtr
->pipeFid
);
161 #ifdef AFS_PTHREAD_ENV
162 /* Initialize the condition variables and the mutexes we use
163 * to signal and synchronize the reader and writer threads.
165 CV_INIT(&dumpSyncPtr
->ds_readerStatus_cond
, "reader cond", CV_DEFAULT
, 0);
166 CV_INIT(&dumpSyncPtr
->ds_writerStatus_cond
, "writer cond", CV_DEFAULT
, 0);
167 MUTEX_INIT(&dumpSyncPtr
->ds_readerStatus_mutex
, "reader", MUTEX_DEFAULT
, 0);
168 MUTEX_INIT(&dumpSyncPtr
->ds_writerStatus_mutex
, "writer", MUTEX_DEFAULT
, 0);
170 /* Initialize the thread attributes and launch the thread */
172 osi_Assert(pthread_attr_init(&dumperPid_tattr
) == 0);
173 osi_Assert(pthread_attr_setdetachstate(&dumperPid_tattr
, PTHREAD_CREATE_DETACHED
) == 0);
174 osi_Assert(pthread_create(&dumperPid
, &dumperPid_tattr
, (void *)setupDbDump
, NULL
) == 0);
178 LWP_CreateProcess(setupDbDump
, 16384, 1,
179 (void *)(intptr_t)dumpSyncPtr
->pipeFid
[1],
180 "Database Dumper", &dumperPid
);
185 dumpSyncPtr
->dumperPid
= dumperPid
;
186 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
188 #ifdef AFS_PTHREAD_ENV
189 /* Initialize the thread attributes and launch the thread */
191 osi_Assert(pthread_attr_init(&watcherPid_tattr
) == 0);
192 osi_Assert(pthread_attr_setdetachstate(&watcherPid_tattr
, PTHREAD_CREATE_DETACHED
) == 0);
193 osi_Assert(pthread_create(&watcherPid
, &watcherPid_tattr
, (void *)dumpWatcher
, NULL
) == 0);
195 /* now create the watcher thread */
197 LWP_CreateProcess(dumpWatcher
, 16384, 1, 0,
198 "Database Dump Watchdog", &watcherPid
);
200 } else if (firstcall
)
203 /* now read the database and feed it to the rpc connection */
206 while (dumpSyncPtr
->ds_bytes
== 0) {
207 /* if no more data */
208 if ((dumpSyncPtr
->ds_writerStatus
== DS_DONE
)
209 || (dumpSyncPtr
->ds_writerStatus
== DS_DONE_ERROR
)) {
213 if (dumpSyncPtr
->ds_writerStatus
== DS_WAITING
) {
214 LogDebug(6, "wakup writer\n");
215 dumpSyncPtr
->ds_writerStatus
= 0;
216 #ifdef AFS_PTHREAD_ENV
217 CV_BROADCAST(&dumpSyncPtr
->ds_writerStatus_cond
);
219 code
= LWP_SignalProcess(&dumpSyncPtr
->ds_writerStatus
);
221 LogError(code
, "BUDB_DumpDB: signal delivery failed\n");
224 LogDebug(6, "wait for writer\n");
225 dumpSyncPtr
->ds_readerStatus
= DS_WAITING
;
226 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
227 #ifdef AFS_PTHREAD_ENV
228 MUTEX_ENTER(&dumpSyncPtr
->ds_readerStatus_mutex
);
229 CV_WAIT(&dumpSyncPtr
->ds_readerStatus_cond
, &dumpSyncPtr
->ds_readerStatus_mutex
);
230 MUTEX_EXIT(&dumpSyncPtr
->ds_readerStatus_mutex
);
232 LWP_WaitProcess(&dumpSyncPtr
->ds_readerStatus
);
234 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
237 charListPtr
->charListT_val
= (char *)malloc(maxLength
);
239 read(dumpSyncPtr
->pipeFid
[0], charListPtr
->charListT_val
, maxLength
);
241 /* reset the clock on dump timeout */
242 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
244 LogDebug(4, "read of len %d returned %d\n", maxLength
, readSize
);
246 charListPtr
->charListT_len
= readSize
;
248 if (readSize
== 0) { /* last chunk */
250 close(dumpSyncPtr
->pipeFid
[0]);
251 dumpSyncPtr
->statusFlags
= 0;
255 dumpSyncPtr
->ds_bytes
-= readSize
;
256 if (dumpSyncPtr
->ds_writerStatus
== DS_WAITING
) {
257 dumpSyncPtr
->ds_writerStatus
= 0;
258 #ifdef AFS_PTHREAD_ENV
259 CV_BROADCAST(&dumpSyncPtr
->ds_writerStatus_cond
);
261 code
= LWP_SignalProcess(&dumpSyncPtr
->ds_writerStatus
);
263 LogError(code
, "BUDB_DumpDB: signal delivery failed\n");
268 if (!code
&& (dumpSyncPtr
->ds_writerStatus
== DS_DONE_ERROR
))
270 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
275 SBUDB_RestoreDbHeader(struct rx_call
*call
, struct DbHeader
*header
)
279 code
= RestoreDbHeader(call
, header
);
280 osi_auditU(call
, BUDB_RstDBHEvent
, code
, AUD_END
);
285 RestoreDbHeader(struct rx_call
*call
, struct DbHeader
*header
)
287 struct ubik_trans
*ut
= 0;
290 extern struct memoryDB db
;
292 if (callPermitted(call
) == 0)
293 ERROR(BUDB_NOTPERMITTED
);
295 code
= InitRPC(&ut
, LOCKWRITE
, 1);
299 if (header
->dbversion
!= ntohl(db
.h
.version
))
300 ERROR(BUDB_VERSIONMISMATCH
);
302 /* merge rather than replace the header information */
303 if (db
.h
.lastDumpId
< htonl(header
->lastDumpId
))
304 db
.h
.lastDumpId
= htonl(header
->lastDumpId
);
306 if (db
.h
.lastTapeId
< htonl(header
->lastTapeId
))
307 db
.h
.lastTapeId
= htonl(header
->lastTapeId
);
309 if (db
.h
.lastInstanceId
< htonl(header
->lastInstanceId
))
310 db
.h
.lastInstanceId
= htonl(header
->lastInstanceId
);
312 code
= dbwrite(ut
, 0, (char *)&db
.h
, sizeof(db
.h
));
323 * monitors the state of a database dump. If the dump calls do not
324 * reset the time to live value, the dump times out. In that case,
325 * we kill the database traversal thread and clean up all the other
326 * state. Most importantly, the database is unlocked so that other
327 * transactions can proceed.
331 dumpWatcher(void *unused
)
337 /* printf("dumpWatcher\n"); */
338 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
340 if (dumpSyncPtr
->statusFlags
== 0) {
341 /* dump has finished */
345 /* check time to live */
346 if (time(0) > dumpSyncPtr
->timeToLive
) { /*i */
347 /* dump has exceeded the allocated time - terminate it */
348 LogError(0, "Database dump timeout exceeded: %s",
349 ctime(&dumpSyncPtr
->timeToLive
));
350 LogError(0, "Terminating database dump\n");
352 close(dumpSyncPtr
->pipeFid
[0]);
353 close(dumpSyncPtr
->pipeFid
[1]);
354 #ifdef AFS_PTHREAD_ENV
355 osi_Assert(pthread_cancel(dumpSyncPtr
->dumperPid
) == 0);
357 code
= LWP_DestroyProcess(dumpSyncPtr
->dumperPid
);
359 LogError(code
, "dumpWatcher: failed to kill dump thread\n");
362 if (dumpSyncPtr
->ut
) {
363 code
= ubik_AbortTrans(dumpSyncPtr
->ut
);
365 LogError(code
, "Aborting dump transaction\n");
368 memset(dumpSyncPtr
, 0, sizeof(*dumpSyncPtr
));
372 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
373 #ifdef AFS_PTHREAD_ENV
381 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
382 /* printf("dumpWatcher exit\n"); */