Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / tests / Message_Block_Test.cpp
blob2dd86736e0ebbb5d4d928f5d3737464213604219
2 //=============================================================================
3 /**
4 * @file Message_Block_Test.cpp
6 * This test program is a torture test that illustrates how
7 * <ACE_Message_Block> reference counting works in multi-threaded
8 * code.
10 * @author Doug Schmidt <d.schmidt@vanderbilt.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "ace/OS_NS_stdio.h"
16 #include "ace/OS_NS_string.h"
17 #include "ace/Task.h"
18 #include "ace/Malloc_T.h"
19 #include "ace/Profile_Timer.h"
20 #include "ace/Free_List.h"
22 // Number of memory allocation strategies used in this test.
23 static const int ACE_ALLOC_STRATEGY_NO = 2;
25 // Size of a memory block (multiple of ACE_MALLOC_ALIGN).
26 static const int ACE_ALLOC_SIZE = 5;
28 // Amount of memory block preallocated.
29 static const size_t ACE_ALLOC_AMOUNT = 48;
31 // For the user-defined data block test
32 static bool user_data_dtor_called = false;
33 class User_Data : public ACE_Data_Block
35 public:
36 User_Data() {}
38 ~User_Data() override
40 ACE_DEBUG((LM_DEBUG, ACE_TEXT ("User_Data dtor\n")));
41 user_data_dtor_called = true;
45 #if defined (ACE_HAS_THREADS)
47 #include "ace/Lock_Adapter_T.h"
48 #include "ace/Synch_Traits.h"
50 // Number of iterations to run the test.
51 static size_t n_iterations = ACE_MAX_ITERATIONS;
53 static ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_adapter_;
54 // Serialize access to <ACE_Message_Block> reference count, which will
55 // be decremented from multiple threads.
57 class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
59 public:
60 /// Activate the task.
61 Worker_Task ();
63 /// Iterate <n_iterations> time printing off a message and "waiting"
64 /// for all other threads to complete this iteration.
65 int svc () override;
67 /// Allows the producer to pass messages to the <Message_Block>.
68 int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0) override;
70 private:
71 //FUZZ: disable check_for_lack_ACE_OS
72 /// Close hook.
73 ///FUZZ: enable check_for_lack_ACE_OS
74 int close (u_long) override;
77 int
78 Worker_Task::close (u_long)
80 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) close of worker\n")));
81 return 0;
84 // Simply enqueue the Worker_Task into the end of the queue.
86 int
87 Worker_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
89 return this->msg_queue ()->enqueue_prio (mb, tv);
92 // Iterate <n_iterations> printing off a message and "waiting" for all
93 // other threads to complete this iteration.
95 int
96 Worker_Task::svc ()
98 // The <ACE_Task::svc_run()> method automatically adds us to the
99 // process-wide <ACE_Thread_Manager> when the thread begins.
101 ACE_DEBUG ((LM_DEBUG,
102 ACE_TEXT ("(%t) starting svc() method\n")));
104 // Keep looping, reading a message out of the queue, until we get a
105 // message with a length == 0, which signals us to quit.
107 for (int count = 0; ; count++)
109 ACE_Message_Block *mb = 0;
111 if (-1 == this->msg_queue ()->dequeue_head (mb))
112 ACE_ERROR_BREAK ((LM_ERROR,
113 ACE_TEXT ("(%t) %p\n"),
114 ACE_TEXT ("Worker_Task dequeue_head")));
116 size_t length = mb->length ();
118 // If there's a next() Task then "logically" copy the message by
119 // calling <duplicate> and send it on down the pipeline. Note
120 // that this doesn't actually make a copy of the message
121 // contents (i.e., the Data_Block portion), it just makes a copy
122 // of the header and reference counts the data.
123 if (this->next () != 0)
125 if (-1 == this->put_next (mb->duplicate ()))
126 ACE_ERROR_BREAK ((LM_ERROR,
127 ACE_TEXT ("(%t) %p\n"),
128 ACE_TEXT ("Worker_Task put_next")));
131 // If there's no next() Task to send to, then we'll consume the
132 // message here.
133 else if (length > 0)
135 int current_count = ACE_OS::atoi ((ACE_TCHAR *)(mb->rd_ptr ()));
136 int i;
138 if (count != current_count)
139 ACE_ERROR_BREAK ((LM_ERROR,
140 ACE_TEXT ("(%t) count from block should be %d ")
141 ACE_TEXT ("but is %d\n"),
142 count, current_count));
144 ACE_DEBUG ((LM_DEBUG,
145 ACE_TEXT ("(%t) enqueueing %d duplicates\n"),
146 current_count));
148 ACE_Message_Block *dup;
150 // Enqueue <current_count> duplicates with msg_priority == 1.
151 for (i = current_count; i > 0; i--)
153 ACE_ALLOCATOR_RETURN (dup,
154 mb->duplicate (),
155 -1);
156 // Set the priority to be greater than "normal"
157 // messages. Therefore, all of these messages should go
158 // to the "front" of the queue, i.e., ahead of all the
159 // other messages that are being enqueued by other
160 // threads.
161 dup->msg_priority (ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY + 1);
163 int enqueue_prio_result =
164 this->msg_queue ()->enqueue_prio
165 (dup,
166 // Don't block indefinitely if we flow control...
167 (ACE_Time_Value *) &ACE_Time_Value::zero);
169 if (enqueue_prio_result == -1)
170 ACE_ERROR_BREAK ((LM_ERROR,
171 ACE_TEXT ("(%t) Pass %d %p\n"),
173 ACE_TEXT ("Worker_Task enqueue_prio")));
176 ACE_DEBUG ((LM_DEBUG,
177 ACE_TEXT ("(%t) dequeueing %d duplicates\n"),
178 current_count));
180 // Dequeue the same <current_count> duplicates.
181 for (i = current_count; i > 0; i--)
183 if (-1 == this->msg_queue ()->dequeue_head (dup))
184 ACE_ERROR_BREAK ((LM_ERROR,
185 ACE_TEXT ("(%t) Dup %d, %p\n"),
187 ACE_TEXT ("Worker_Task dequeue dups")));
188 if (count != ACE_OS::atoi ((ACE_TCHAR *)(dup->rd_ptr ())))
189 ACE_ERROR ((LM_ERROR,
190 ACE_TEXT ("(%t) line %l, Dup %d, block's count ")
191 ACE_TEXT ("is %d but should be %d\n"),
193 ACE_OS::atoi ((ACE_TCHAR *)(dup->rd_ptr ())),
194 count));
195 if (0 != ACE_OS::strcmp ((ACE_TCHAR *)mb->rd_ptr (),
196 (ACE_TCHAR *)dup->rd_ptr ()))
197 ACE_ERROR ((LM_ERROR,
198 ACE_TEXT ("(%t) Dup %d text is %s; ")
199 ACE_TEXT ("should be %s\n"),
201 dup->rd_ptr (),
202 mb->rd_ptr ()));
203 if (dup->msg_priority () != ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY + 1)
204 ACE_ERROR ((LM_ERROR,
205 ACE_TEXT ("(%t) Dup %d block priority is %u; ")
206 ACE_TEXT ("should be %u\n"),
208 (unsigned int)dup->msg_priority (),
209 (unsigned int)(ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY + 1)));
210 dup->release ();
213 ACE_DEBUG ((LM_DEBUG,
214 ACE_TEXT ("(%t) in iteration %d, length = %B, prio = %d, text = \"%*s\"\n"),
215 count,
216 length,
217 mb->msg_priority (),
218 (int)(length - 2), // remove the trailing "\n\0"
219 mb->rd_ptr ()));
222 // We're responsible for deallocating this.
223 mb->release ();
225 if (length == 0)
227 //FUZZ: disable check_for_NULL
228 ACE_DEBUG ((LM_DEBUG,
229 ACE_TEXT ("(%t) in iteration %d, queue len = %B, got NULL message, exiting\n"),
230 count, this->msg_queue ()->message_count ()));
231 //FUZZ: enable check_for_NULL
232 break;
236 // Note that the ACE_Task::svc_run () method automatically removes
237 // us from the Thread_Manager when the thread exits.
238 return 0;
241 Worker_Task::Worker_Task ()
243 // Make us an Active Object.
244 if (this->activate (THR_NEW_LWP) == -1)
245 ACE_ERROR ((LM_ERROR,
246 ACE_TEXT ("%p\n"),
247 ACE_TEXT ("activate failed")));
250 static int
251 produce (Worker_Task &worker_task,
252 ACE_Allocator *alloc_strategy)
254 ACE_Message_Block *mb = 0;
255 int status;
257 // Send <n_iteration> messages through the pipeline.
258 for (size_t count = 0; count < n_iterations; count++)
260 ACE_TCHAR buf[BUFSIZ];
261 ACE_OS::snprintf (buf, BUFSIZ, ACE_SIZE_T_FORMAT_SPECIFIER, count);
263 size_t n = (ACE_OS::strlen (buf) + 1) * sizeof (ACE_TCHAR);
265 // Allocate a new message.
266 ACE_NEW_RETURN (mb,
267 ACE_Message_Block (n, // size
268 ACE_Message_Block::MB_DATA, // type
269 0, // cont
270 0, // data
271 alloc_strategy, // allocator
272 &lock_adapter_, // locking strategy
273 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY), // priority
274 -1);
276 // Try once to copy in more than the block will hold; should yield an
277 // error with ENOSPC.
278 if (count == 0)
280 status = mb->copy ((char *) buf, n + 1);
281 if (status != -1)
282 ACE_ERROR ((LM_ERROR,
283 ACE_TEXT (" (%t) Copy %B bytes into %B byte block ")
284 ACE_TEXT ("should fail but didn't\n"),
285 n + 1,
286 n));
287 else if (errno != ENOSPC)
289 ACE_ERROR ((LM_ERROR,
290 ACE_TEXT (" (%t) Copy into too-small block failed ")
291 ACE_TEXT ("but with %p; should be ENOSPC\n"),
292 ACE_TEXT ("wrong error")));
294 else
295 ACE_DEBUG ((LM_INFO,
296 ACE_TEXT (" (%t) Copy too-long test succeeded\n")));
298 // Copy buf into the Message_Block and update the wr_ptr ().
299 status = mb->copy ((char *) buf, n);
300 if (status != 0)
302 ACE_ERROR ((LM_ERROR,
303 ACE_TEXT (" (%t) Copy to block should be good but %p\n"),
304 ACE_TEXT ("failed")));
306 // Pass the message to the Worker_Task.
307 if (worker_task.put (mb,
308 // Don't block indefinitely if we flow control...
309 (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
310 ACE_ERROR ((LM_ERROR,
311 ACE_TEXT (" (%t) %p\n"),
312 ACE_TEXT ("put")));
315 // Send a shutdown message to the waiting threads and exit.
316 ACE_DEBUG ((LM_DEBUG,
317 ACE_TEXT (" (%t) sending shutdown message\n")));
319 ACE_NEW_RETURN (mb,
320 ACE_Message_Block (0,
321 ACE_Message_Block::MB_DATA,
324 alloc_strategy,
325 &lock_adapter_),
326 -1);
328 if (worker_task.put (mb) == -1)
329 ACE_ERROR ((LM_ERROR,
330 ACE_TEXT (" (%t) %p\n"),
331 ACE_TEXT ("put")));
333 ACE_DEBUG ((LM_DEBUG,
334 ACE_TEXT (" (%t) end producer\n")));
335 return 0;
338 typedef ACE_TCHAR MEMORY_CHUNK[ACE_MALLOC_ALIGN * ACE_ALLOC_SIZE];
340 ACE_Cached_Allocator<MEMORY_CHUNK,
341 ACE_SYNCH_MUTEX>
342 mem_allocator (ACE_ALLOC_AMOUNT);
343 struct alloc_struct_type
345 ACE_Allocator *strategy_;
346 const ACE_TCHAR *name_;
347 ACE_Profile_Timer::ACE_Elapsed_Time et_;
350 alloc_struct_type alloc_struct[ACE_ALLOC_STRATEGY_NO] =
352 { 0, ACE_TEXT ("Default"), {0,0,0} },
353 { &mem_allocator, ACE_TEXT ("Cached Memory"), {0,0,0} }
356 #endif /* ACE_HAS_THREADS */
359 run_main (int, ACE_TCHAR *[])
361 ACE_START_TEST (ACE_TEXT ("Message_Block_Test"));
363 // A quick user-defined data block test, then the main event
364 User_Data *user_data_block = 0;
365 ACE_NEW_MALLOC_RETURN (user_data_block,
366 static_cast<User_Data *>(
367 ACE_Allocator::instance()->malloc(sizeof (User_Data))),
368 User_Data (),
369 -1);
371 // Create a new message block referring to the User_Data block and
372 // ensure it is released and freed correctly.
373 ACE_Message_Block *wrapper_mb = 0;
374 ACE_NEW_RETURN (wrapper_mb,
375 ACE_Message_Block (user_data_block),
376 -1);
378 wrapper_mb->release ();
379 wrapper_mb = 0;
380 if (!user_data_dtor_called)
381 ACE_ERROR ((LM_ERROR, ACE_TEXT ("User-defined data block not freed correctly.\n")));
383 #if defined (ACE_HAS_THREADS)
384 int n_threads = ACE_MAX_THREADS;
386 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) threads = %d\n"), n_threads));
388 ACE_Profile_Timer ptime;
390 int i;
392 for (i = 0; i < ACE_ALLOC_STRATEGY_NO; i++)
394 ACE_DEBUG ((LM_DEBUG,
395 ACE_TEXT ("(%t) Start Message_Block_Test using %s allocation strategy\n"),
396 alloc_struct[i].name_));
398 // Create the worker tasks.
399 Worker_Task worker_task[ACE_MAX_THREADS] ;
401 // Link all the tasks together into a simple pipeline.
402 for (size_t j = 1; j < ACE_MAX_THREADS; j++)
403 worker_task[j - 1].next (&worker_task[j]);
405 ptime.start ();
406 // Generate messages and pass them through the pipeline.
407 produce (worker_task[0], alloc_struct[i].strategy_);
409 // Wait for all the threads to reach their exit point.
411 ACE_DEBUG ((LM_DEBUG,
412 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
414 ACE_Thread_Manager::instance ()->wait ();
415 ptime.stop ();
416 ptime.elapsed_time (alloc_struct[i].et_);
418 ACE_DEBUG ((LM_DEBUG,
419 ACE_TEXT ("(%t) destroying worker tasks\n")));
422 for (i = 0; i < ACE_ALLOC_STRATEGY_NO; i++)
423 ACE_DEBUG ((LM_DEBUG,
424 ACE_TEXT ("Elapsed time using %s allocation strategy: %f sec\n"),
425 alloc_struct[i].name_,
426 alloc_struct[i].et_.real_time));
428 ACE_DEBUG ((LM_DEBUG,
429 ACE_TEXT ("(%t) Exiting...\n")));
430 #else
431 ACE_ERROR ((LM_INFO,
432 ACE_TEXT ("threads not supported on this platform\n")));
433 #endif /* ACE_HAS_THREADS */
434 ACE_END_TEST;
435 return 0;