2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
18 #ifdef AFS_PTHREAD_ENV
19 # include <opr/lock.h>
26 #include <afs/cellconfig.h>
27 #include <afs/audit.h>
28 #include <afs/afsutil.h>
31 #include "budb_errs.h"
33 #include "budb_internal.h"
34 #include "error_macros.h"
37 afs_int32
DumpDB(struct rx_call
*, int, afs_int32
, charListT
*, afs_int32
*);
38 afs_int32
RestoreDbHeader(struct rx_call
*, struct DbHeader
*);
39 void *dumpWatcher(void *);
41 /* dump ubik database - interface routines */
44 * no checking for now.
48 badEntry(afs_uint32 dbAddr
)
55 * decode the arguments passed via LWP and dump the database.
59 setupDbDump(void *param
)
61 int writeFid
= (intptr_t)param
;
64 afs_pthread_setname_self("Database Dumper");
65 code
= InitRPC(&dumpSyncPtr
->ut
, LOCKREAD
, 1);
69 code
= writeDatabase(dumpSyncPtr
->ut
, writeFid
);
71 LogError(code
, "writeDatabase failed\n");
73 code
= close(writeFid
);
75 LogError(code
, "pipe writer close failed\n");
77 LogDebug(5, "writeDatabase complete\n");
81 ubik_EndTrans(dumpSyncPtr
->ut
);
82 return (void *)(intptr_t)(code
);
87 SBUDB_DumpDB(struct rx_call
*call
, int firstcall
, afs_int32 maxLength
,
88 charListT
*charListPtr
, afs_int32
*done
)
92 code
= DumpDB(call
, firstcall
, maxLength
, charListPtr
, done
);
93 osi_auditU(call
, BUDB_DmpDBEvent
, code
, AUD_END
);
98 DumpDB(struct rx_call
*call
,
99 int firstcall
, /* 1 - init. 0 - no init */
101 charListT
*charListPtr
,
104 #ifdef AFS_PTHREAD_ENV
105 pthread_t dumperPid
, watcherPid
;
106 pthread_attr_t dumperPid_tattr
;
107 pthread_attr_t watcherPid_tattr
;
109 PROCESS dumperPid
, watcherPid
;
114 if (callPermitted(call
) == 0)
115 ERROR(BUDB_NOTPERMITTED
);
117 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
119 /* If asking for zero bytes, then this is a call to reset the timeToLive
120 * timer. Reset it if there is a dump in progress.
122 if (maxLength
== 0) {
123 charListPtr
->charListT_val
= NULL
;
124 charListPtr
->charListT_len
= 0;
126 *done
= ((dumpSyncPtr
->statusFlags
== 0) ? 1 : 0);
128 /* reset the clock on dump timeout */
129 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
133 if (dumpSyncPtr
->statusFlags
== 0) {
135 ERROR(BUDB_DUMPFAILED
);
137 LogDebug(5, "Setup dump\n");
139 /* no dump in progress - setup and retake lock */
140 memset(dumpSyncPtr
, 0, sizeof(*dumpSyncPtr
));
141 /* ObtainWriteLock(&dumpSyncPtr->ds_lock); */
143 /* mark dump in progress */
144 dumpSyncPtr
->statusFlags
= 1;
146 code
= pipe(dumpSyncPtr
->pipeFid
);
150 #ifdef AFS_PTHREAD_ENV
151 /* Initialize the condition variables and the mutexes we use
152 * to signal and synchronize the reader and writer threads.
154 opr_cv_init(&dumpSyncPtr
->ds_readerStatus_cond
);
155 opr_cv_init(&dumpSyncPtr
->ds_writerStatus_cond
);
156 opr_mutex_init(&dumpSyncPtr
->ds_readerStatus_mutex
);
157 opr_mutex_init(&dumpSyncPtr
->ds_writerStatus_mutex
);
159 /* Initialize the thread attributes and launch the thread */
161 opr_Verify(pthread_attr_init(&dumperPid_tattr
) == 0);
162 opr_Verify(pthread_attr_setdetachstate(&dumperPid_tattr
,
163 PTHREAD_CREATE_DETACHED
) == 0);
164 opr_Verify(pthread_create(&dumperPid
,
166 (void *)setupDbDump
, NULL
) == 0);
170 LWP_CreateProcess(setupDbDump
, 16384, 1,
171 (void *)(intptr_t)dumpSyncPtr
->pipeFid
[1],
172 "Database Dumper", &dumperPid
);
177 dumpSyncPtr
->dumperPid
= dumperPid
;
178 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
180 #ifdef AFS_PTHREAD_ENV
181 /* Initialize the thread attributes and launch the thread */
183 opr_Verify(pthread_attr_init(&watcherPid_tattr
) == 0);
184 opr_Verify(pthread_attr_setdetachstate(&watcherPid_tattr
,
185 PTHREAD_CREATE_DETACHED
) == 0);
186 opr_Verify(pthread_create(&watcherPid
,
188 (void *)dumpWatcher
, NULL
) == 0);
190 /* now create the watcher thread */
192 LWP_CreateProcess(dumpWatcher
, 16384, 1, 0,
193 "Database Dump Watchdog", &watcherPid
);
195 } else if (firstcall
)
198 /* now read the database and feed it to the rpc connection */
201 while (dumpSyncPtr
->ds_bytes
== 0) {
202 /* if no more data */
203 if ((dumpSyncPtr
->ds_writerStatus
== DS_DONE
)
204 || (dumpSyncPtr
->ds_writerStatus
== DS_DONE_ERROR
)) {
208 if (dumpSyncPtr
->ds_writerStatus
== DS_WAITING
) {
209 LogDebug(6, "wakup writer\n");
210 dumpSyncPtr
->ds_writerStatus
= 0;
211 #ifdef AFS_PTHREAD_ENV
212 opr_cv_broadcast(&dumpSyncPtr
->ds_writerStatus_cond
);
214 code
= LWP_SignalProcess(&dumpSyncPtr
->ds_writerStatus
);
216 LogError(code
, "BUDB_DumpDB: signal delivery failed\n");
219 LogDebug(6, "wait for writer\n");
220 dumpSyncPtr
->ds_readerStatus
= DS_WAITING
;
221 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
222 #ifdef AFS_PTHREAD_ENV
223 opr_mutex_enter(&dumpSyncPtr
->ds_readerStatus_mutex
);
224 opr_cv_wait(&dumpSyncPtr
->ds_readerStatus_cond
, &dumpSyncPtr
->ds_readerStatus_mutex
);
225 opr_mutex_exit(&dumpSyncPtr
->ds_readerStatus_mutex
);
227 LWP_WaitProcess(&dumpSyncPtr
->ds_readerStatus
);
229 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
232 charListPtr
->charListT_val
= malloc(maxLength
);
234 read(dumpSyncPtr
->pipeFid
[0], charListPtr
->charListT_val
, maxLength
);
236 /* reset the clock on dump timeout */
237 dumpSyncPtr
->timeToLive
= time(0) + DUMP_TTL_INC
;
239 LogDebug(4, "read of len %d returned %d\n", maxLength
, readSize
);
241 charListPtr
->charListT_len
= readSize
;
243 if (readSize
== 0) { /* last chunk */
245 close(dumpSyncPtr
->pipeFid
[0]);
246 dumpSyncPtr
->statusFlags
= 0;
250 dumpSyncPtr
->ds_bytes
-= readSize
;
251 if (dumpSyncPtr
->ds_writerStatus
== DS_WAITING
) {
252 dumpSyncPtr
->ds_writerStatus
= 0;
253 #ifdef AFS_PTHREAD_ENV
254 opr_cv_broadcast(&dumpSyncPtr
->ds_writerStatus_cond
);
256 code
= LWP_SignalProcess(&dumpSyncPtr
->ds_writerStatus
);
258 LogError(code
, "BUDB_DumpDB: signal delivery failed\n");
263 if (!code
&& (dumpSyncPtr
->ds_writerStatus
== DS_DONE_ERROR
))
265 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
270 SBUDB_RestoreDbHeader(struct rx_call
*call
, struct DbHeader
*header
)
274 code
= RestoreDbHeader(call
, header
);
275 osi_auditU(call
, BUDB_RstDBHEvent
, code
, AUD_END
);
280 RestoreDbHeader(struct rx_call
*call
, struct DbHeader
*header
)
282 struct ubik_trans
*ut
= 0;
285 extern struct memoryDB db
;
287 if (callPermitted(call
) == 0)
288 ERROR(BUDB_NOTPERMITTED
);
290 code
= InitRPC(&ut
, LOCKWRITE
, 1);
294 if (header
->dbversion
!= ntohl(db
.h
.version
))
295 ERROR(BUDB_VERSIONMISMATCH
);
297 /* merge rather than replace the header information */
298 if (db
.h
.lastDumpId
< htonl(header
->lastDumpId
))
299 db
.h
.lastDumpId
= htonl(header
->lastDumpId
);
301 if (db
.h
.lastTapeId
< htonl(header
->lastTapeId
))
302 db
.h
.lastTapeId
= htonl(header
->lastTapeId
);
304 if (db
.h
.lastInstanceId
< htonl(header
->lastInstanceId
))
305 db
.h
.lastInstanceId
= htonl(header
->lastInstanceId
);
307 code
= dbwrite(ut
, 0, (char *)&db
.h
, sizeof(db
.h
));
318 * monitors the state of a database dump. If the dump calls do not
319 * reset the time to live value, the dump times out. In that case,
320 * we kill the database traversal thread and clean up all the other
321 * state. Most importantly, the database is unlocked so that other
322 * transactions can proceed.
326 dumpWatcher(void *unused
)
330 afs_pthread_setname_self("Database Dump Watchdog");
333 /* printf("dumpWatcher\n"); */
334 ObtainWriteLock(&dumpSyncPtr
->ds_lock
);
336 if (dumpSyncPtr
->statusFlags
== 0) {
337 /* dump has finished */
341 /* check time to live */
342 if (time(0) > dumpSyncPtr
->timeToLive
) { /*i */
343 /* dump has exceeded the allocated time - terminate it */
344 LogError(0, "Database dump timeout exceeded: %s",
345 ctime(&dumpSyncPtr
->timeToLive
));
346 LogError(0, "Terminating database dump\n");
348 close(dumpSyncPtr
->pipeFid
[0]);
349 close(dumpSyncPtr
->pipeFid
[1]);
350 #ifdef AFS_PTHREAD_ENV
351 opr_Verify(pthread_cancel(dumpSyncPtr
->dumperPid
) == 0);
353 code
= LWP_DestroyProcess(dumpSyncPtr
->dumperPid
);
355 LogError(code
, "dumpWatcher: failed to kill dump thread\n");
358 if (dumpSyncPtr
->ut
) {
359 code
= ubik_AbortTrans(dumpSyncPtr
->ut
);
361 LogError(code
, "Aborting dump transaction\n");
364 memset(dumpSyncPtr
, 0, sizeof(*dumpSyncPtr
));
368 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
369 #ifdef AFS_PTHREAD_ENV
377 ReleaseWriteLock(&dumpSyncPtr
->ds_lock
);
378 /* printf("dumpWatcher exit\n"); */