6 #include <ext/hash_map>
11 #include <boost/bind.hpp>
12 #include <boost/function.hpp>
13 #include <boost/thread.hpp>
17 #include "simplearray.h"
19 //FIXME: I'm using the union in the code with UInt32, but in 64-bit machines that probably isn't best
20 #define actorId_t uint32_t
21 #define threadId_t uint32_t
23 const int RECEIVER_CACHE_SIZE
=3;
24 const unsigned int TIMESLICE_QUANTUM
=2000;
26 class ThreadType
{ public: enum Type
{ THREAD
, MAILMAN
, KERNEL
}; };
27 class ActorState
{ public: enum State
{ ACTIVE
, WAITING_FOR_ACTION
, WAITING_FOR_DATA
, DELETED
, MOVED
}; };
28 class MessageType
{ public: enum Type
{ ACTION_MESSAGE
, DATA_MESSAGE
, ADD_ACTOR_ID
, MOVE_ACTOR_ID
, DELETE_ACTOR_ID
, DELETE_ACTOR_IDS
, CREATE_ISOLATED_ACTOR
, LOAD_STATUS
, PLEASE_MOVE_ACTOR
, PLEASE_RECV_ACTOR
, PLEASE_RECV_ACTORS
}; };
30 typedef struct Thread
;
34 Hold a meta container for all basic types.
35 As a side note: All functions are re-entrant, using the actor->heapStack to store state, so they only need a pointer to that actor for all their activities.
49 void(*Function
)(Actor
*);
54 Message container. For messages with longer number of arguments, create a vector and use VoidPtr in TypeUnion.
57 MessageType::Type messageType
;
59 uint32_t dataTaskTypeId
;
66 Channel between threads, and between threads and the kernel.
69 std::vector
<Message
> *msgChannelIncoming
, *msgChannel1
, *msgChannel2
;
70 volatile int currentChannel
;
71 boost::mutex mailLock
;
72 volatile bool isEmpty
;
75 void DebugMessage(const Message &message) {
76 std::cout << " Message: " << message.messageType << " rec:" << message.recipient << " taskId:";
77 std::cout << message.dataTaskTypeId << " nArgs:" << message.numArgs << " arg0:" << message.arg[0].UInt32 << " " << message.arg[1].UInt32 << " " << message.arg[2].UInt32 << std::endl;
82 void sendMessage(const Message
&message
) {
83 boost::mutex::scoped_lock
lock(mailLock
);
84 msgChannelIncoming
->push_back(message
);
88 std::vector
<Message
> *recvMessages() {
89 boost::mutex::scoped_lock
lock(mailLock
);
91 if (currentChannel
== 1) {
92 msgChannelIncoming
= msgChannel2
;
94 isEmpty
= msgChannelIncoming
->empty();
98 msgChannelIncoming
= msgChannel1
;
100 isEmpty
= msgChannelIncoming
->empty();
106 //boost::mutex::scoped_lock lock(mailLock);
111 msgChannel1
= new std::vector
<Message
>();
112 msgChannel2
= new std::vector
<Message
>();
114 msgChannelIncoming
= msgChannel1
;
120 struct ThreadPoolThread
{
123 boost::thread
*thread
;
126 struct ActorWrapper
{
128 actorId_t containedId
;
129 uint32_t actorPoolRevId
;
133 The microthread task, which we call an actor.
137 ActorState::State actorState
;
138 ActorState::State actorStateBeforeMove
;
140 void(*task
)(Actor
*);
141 Thread
*parentThread
;
143 struct ActorWrapper receiverCache
[RECEIVER_CACHE_SIZE
];
146 uint32_t runQueueRevId
;
148 //incoming DATA_MESSAGE messages have a queue and handlers for each enumerated type. In the action, a switch statement allows for continuations
149 //based on the id of the continuation (basically which slot to jump back into when the function is restarted).
150 std::vector
<Message
> dataMessages
;
151 __gnu_cxx::hash_map
<int, int> dataHandlers
;
156 std::vector
<Message
> actionMessages
;
157 std::vector
<TypeUnion
> heapStack
;
158 //SimpleArray actionMessages;
159 //SimpleArray heapStack;
161 Actor() : runQueueRevId(0), isResuming(false) {}
166 The actual OS-level thread. This will act as a scheduler for actors(microthreads).
170 __gnu_cxx::hash_map
<actorId_t
, Actor
*> actorIds
;
171 std::vector
<Message
> localMail
;
173 actorId_t nextActorId
;
176 int32_t numberActiveActors
;
177 int32_t previousActiveActors
;
180 ThreadType::Type threadType
;
181 std::vector
<Actor
*> runningActors
;
183 MailChannel
*incomingChannel
;
184 MailChannel
*outgoingChannel
;
187 std::vector
<MailChannel
*> *mailListIncoming
;
188 __gnu_cxx::hash_map
<actorId_t
, threadId_t
> *mailAddresses
;
189 __gnu_cxx::hash_map
<threadId_t
, MailChannel
*> *mailChannelOutgoing
;
192 __gnu_cxx::hash_map
<threadId_t
, int> *scheduleWeights
; //I'm assuming int is big enough
193 std::vector
<ThreadPoolThread
> *threadPoolThreads
;
195 //For local receiver caching, whenever our actor pool members change, we rev this so that local receiver caches can be recreated
196 uint32_t actorPoolRevId
;
198 //For more efficient run queue handling, this prevents actors from rescheduling themselves in loop scenerios
199 uint32_t runQueueRevId
;
201 //For scheduling tasks, we need to know how much time is remaining
202 int timeSliceEndTime
;
205 Thread(threadId_t id
, actorId_t nextActor
) {
206 threadType
= ThreadType::THREAD
;
209 nextActorId
= nextActor
;
214 runQueueRevId
= 1; //0 denotes "never scheduled", and is the reset value
216 incomingChannel
= new MailChannel();
217 outgoingChannel
= new MailChannel();
219 Actor
*maintenance
= new Actor();
220 maintenance
->actorId
= 0xFFFFFFFF;
221 maintenance
->actorState
= ActorState::ACTIVE
;
222 runningActors
.push_back(maintenance
);
225 void SendMessage(const Message
&message
);
227 void ReceiveMessages();
228 void ScheduleExistingActor(Actor
*actor
);
229 void ScheduleNewActor(Actor
*actor
);
230 void ScheduleNewIsolatedActor(Actor
*actor
);
231 void RemoveRunningActor(Actor
*actor
);
232 void RemoveActor(Actor
*actor
);
233 void RemoveActors(std::vector
<Actor
*> *actors
);
234 void RemoveActor(Actor
*actor
, bool sendDeleteMsg
);
236 void DeleteActor(Actor
*actor
);
238 void MoveHeaviestActor(threadId_t threadId
, uint32_t amount
);
239 Actor
* ActorIfLocal(actorId_t actorId
, Actor
*local
);
240 void TaskRebalance();
242 void SchedulerLoop();
245 //Main startup procedure
246 int VM_Main(int argc
, char *argv
[], void(*task
)(Actor
*), bool passCmdLine
);
248 #endif //AQUARIUM_HPP