btrfs: [] on the end of a struct field is a variable length array.
[haiku.git] / src / add-ons / kernel / file_systems / nfs4 / WorkQueue.cpp
blob524b98346dc602435e35d4c244201828982113e0
1 /*
2 * Copyright 2012 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
5 * Authors:
6 * Paweł Dziepak, pdziepak@quarnos.org
7 */
10 #include "WorkQueue.h"
12 #include <io_requests.h>
15 #define MAX_BUFFER_SIZE (1024 * 1024)
17 WorkQueue* gWorkQueue = NULL;
20 WorkQueue::WorkQueue()
22 fQueueSemaphore(create_sem(0, NULL)),
23 fThreadCancel(create_sem(0, NULL))
25 mutex_init(&fQueueLock, NULL);
27 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
28 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
29 if (fThread < B_OK) {
30 fInitError = fThread;
31 return;
34 status_t result = resume_thread(fThread);
35 if (result != B_OK) {
36 kill_thread(fThread);
37 fInitError = result;
38 return;
41 fInitError = B_OK;
45 WorkQueue::~WorkQueue()
47 release_sem(fThreadCancel);
49 status_t result;
50 wait_for_thread(fThread, &result);
52 mutex_destroy(&fQueueLock);
53 delete_sem(fThreadCancel);
54 delete_sem(fQueueSemaphore);
58 status_t
59 WorkQueue::EnqueueJob(JobType type, void* args)
61 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
62 if (entry == NULL)
63 return B_NO_MEMORY;
65 entry->fType = type;
66 entry->fArguments = args;
67 if (type == IORequest)
68 reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
70 MutexLocker locker(fQueueLock);
71 fQueue.InsertAfter(fQueue.Tail(), entry);
72 locker.Unlock();
74 release_sem(fQueueSemaphore);
75 return B_OK;
79 status_t
80 WorkQueue::LaunchWorkingThread(void* object)
82 ASSERT(object != NULL);
84 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
85 return queue->WorkingThread();
89 status_t
90 WorkQueue::WorkingThread()
92 while (true) {
93 object_wait_info object[2];
94 object[0].object = fThreadCancel;
95 object[0].type = B_OBJECT_TYPE_SEMAPHORE;
96 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
98 object[1].object = fQueueSemaphore;
99 object[1].type = B_OBJECT_TYPE_SEMAPHORE;
100 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
102 status_t result = wait_for_objects(object, 2);
104 if (result < B_OK
105 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
106 return result;
107 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
108 continue;
110 acquire_sem(fQueueSemaphore);
112 DequeueJob();
115 return B_OK;
119 void
120 WorkQueue::DequeueJob()
122 MutexLocker locker(fQueueLock);
123 WorkQueueEntry* entry = fQueue.RemoveHead();
124 locker.Unlock();
125 ASSERT(entry != NULL);
127 void* args = entry->fArguments;
128 switch (entry->fType) {
129 case DelegationRecall:
130 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
131 break;
132 case IORequest:
133 JobIO(reinterpret_cast<IORequestArgs*>(args));
134 break;
137 delete entry;
141 void
142 WorkQueue::JobRecall(DelegationRecallArgs* args)
144 ASSERT(args != NULL);
145 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
149 void
150 WorkQueue::JobIO(IORequestArgs* args)
152 ASSERT(args != NULL);
154 uint64 offset = io_request_offset(args->fRequest);
155 uint64 length = io_request_length(args->fRequest);
157 size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
158 char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
159 if (buffer == NULL) {
160 notify_io_request(args->fRequest, B_NO_MEMORY);
161 args->fInode->EndAIOOp();
162 return;
165 status_t result;
166 if (io_request_is_write(args->fRequest)) {
167 if (offset + length > args->fInode->MaxFileSize())
168 length = args->fInode->MaxFileSize() - offset;
170 uint64 position = 0;
171 do {
172 size_t size = 0;
173 size_t thisBufferLength = min_c(bufferLength, length - position);
175 result = read_from_io_request(args->fRequest, buffer,
176 thisBufferLength);
178 while (size < thisBufferLength && result == B_OK) {
179 size_t bytesWritten = thisBufferLength - size;
180 result = args->fInode->WriteDirect(NULL,
181 offset + position + size, buffer + size, &bytesWritten);
182 size += bytesWritten;
185 position += thisBufferLength;
186 } while (position < length && result == B_OK);
187 } else {
188 bool eof = false;
189 uint64 position = 0;
190 do {
191 size_t size = 0;
192 size_t thisBufferLength = min_c(bufferLength, length - position);
194 do {
195 size_t bytesRead = thisBufferLength - size;
196 result = args->fInode->ReadDirect(NULL,
197 offset + position + size, buffer + size, &bytesRead, &eof);
198 if (result != B_OK)
199 break;
201 result = write_to_io_request(args->fRequest, buffer + size,
202 bytesRead);
203 if (result != B_OK)
204 break;
206 size += bytesRead;
207 } while (size < length && result == B_OK && !eof);
209 position += thisBufferLength;
210 } while (position < length && result == B_OK && !eof);
213 free(buffer);
215 notify_io_request(args->fRequest, result);
216 args->fInode->EndAIOOp();