2 * Copyright 2012 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
6 * Paweł Dziepak, pdziepak@quarnos.org
10 #include "WorkQueue.h"
12 #include <io_requests.h>
15 #define MAX_BUFFER_SIZE (1024 * 1024)
17 WorkQueue
* gWorkQueue
= NULL
;
20 WorkQueue::WorkQueue()
22 fQueueSemaphore(create_sem(0, NULL
)),
23 fThreadCancel(create_sem(0, NULL
))
25 mutex_init(&fQueueLock
, NULL
);
27 fThread
= spawn_kernel_thread(&WorkQueue::LaunchWorkingThread
,
28 "NFSv4 Work Queue", B_NORMAL_PRIORITY
, this);
34 status_t result
= resume_thread(fThread
);
45 WorkQueue::~WorkQueue()
47 release_sem(fThreadCancel
);
50 wait_for_thread(fThread
, &result
);
52 mutex_destroy(&fQueueLock
);
53 delete_sem(fThreadCancel
);
54 delete_sem(fQueueSemaphore
);
59 WorkQueue::EnqueueJob(JobType type
, void* args
)
61 WorkQueueEntry
* entry
= new(std::nothrow
) WorkQueueEntry
;
66 entry
->fArguments
= args
;
67 if (type
== IORequest
)
68 reinterpret_cast<IORequestArgs
*>(args
)->fInode
->BeginAIOOp();
70 MutexLocker
locker(fQueueLock
);
71 fQueue
.InsertAfter(fQueue
.Tail(), entry
);
74 release_sem(fQueueSemaphore
);
80 WorkQueue::LaunchWorkingThread(void* object
)
82 ASSERT(object
!= NULL
);
84 WorkQueue
* queue
= reinterpret_cast<WorkQueue
*>(object
);
85 return queue
->WorkingThread();
90 WorkQueue::WorkingThread()
93 object_wait_info object
[2];
94 object
[0].object
= fThreadCancel
;
95 object
[0].type
= B_OBJECT_TYPE_SEMAPHORE
;
96 object
[0].events
= B_EVENT_ACQUIRE_SEMAPHORE
;
98 object
[1].object
= fQueueSemaphore
;
99 object
[1].type
= B_OBJECT_TYPE_SEMAPHORE
;
100 object
[1].events
= B_EVENT_ACQUIRE_SEMAPHORE
;
102 status_t result
= wait_for_objects(object
, 2);
105 || (object
[0].events
& B_EVENT_ACQUIRE_SEMAPHORE
) != 0) {
107 } else if ((object
[1].events
& B_EVENT_ACQUIRE_SEMAPHORE
) == 0)
110 acquire_sem(fQueueSemaphore
);
120 WorkQueue::DequeueJob()
122 MutexLocker
locker(fQueueLock
);
123 WorkQueueEntry
* entry
= fQueue
.RemoveHead();
125 ASSERT(entry
!= NULL
);
127 void* args
= entry
->fArguments
;
128 switch (entry
->fType
) {
129 case DelegationRecall
:
130 JobRecall(reinterpret_cast<DelegationRecallArgs
*>(args
));
133 JobIO(reinterpret_cast<IORequestArgs
*>(args
));
142 WorkQueue::JobRecall(DelegationRecallArgs
* args
)
144 ASSERT(args
!= NULL
);
145 args
->fDelegation
->GetInode()->RecallDelegation(args
->fTruncate
);
150 WorkQueue::JobIO(IORequestArgs
* args
)
152 ASSERT(args
!= NULL
);
154 uint64 offset
= io_request_offset(args
->fRequest
);
155 uint64 length
= io_request_length(args
->fRequest
);
157 size_t bufferLength
= min_c(MAX_BUFFER_SIZE
, length
);
158 char* buffer
= reinterpret_cast<char*>(malloc(bufferLength
));
159 if (buffer
== NULL
) {
160 notify_io_request(args
->fRequest
, B_NO_MEMORY
);
161 args
->fInode
->EndAIOOp();
166 if (io_request_is_write(args
->fRequest
)) {
167 if (offset
+ length
> args
->fInode
->MaxFileSize())
168 length
= args
->fInode
->MaxFileSize() - offset
;
173 size_t thisBufferLength
= min_c(bufferLength
, length
- position
);
175 result
= read_from_io_request(args
->fRequest
, buffer
,
178 while (size
< thisBufferLength
&& result
== B_OK
) {
179 size_t bytesWritten
= thisBufferLength
- size
;
180 result
= args
->fInode
->WriteDirect(NULL
,
181 offset
+ position
+ size
, buffer
+ size
, &bytesWritten
);
182 size
+= bytesWritten
;
185 position
+= thisBufferLength
;
186 } while (position
< length
&& result
== B_OK
);
192 size_t thisBufferLength
= min_c(bufferLength
, length
- position
);
195 size_t bytesRead
= thisBufferLength
- size
;
196 result
= args
->fInode
->ReadDirect(NULL
,
197 offset
+ position
+ size
, buffer
+ size
, &bytesRead
, &eof
);
201 result
= write_to_io_request(args
->fRequest
, buffer
+ size
,
207 } while (size
< length
&& result
== B_OK
&& !eof
);
209 position
+= thisBufferLength
;
210 } while (position
< length
&& result
== B_OK
&& !eof
);
215 notify_io_request(args
->fRequest
, result
);
216 args
->fInode
->EndAIOOp();