2 //=============================================================================
4 * @file Message_Block_Test.cpp
6 * This test program is a torture test that illustrates how
7 * <ACE_Message_Block> reference counting works in multi-threaded
10 * @author Doug Schmidt <d.schmidt@vanderbilt.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "ace/OS_NS_stdio.h"
16 #include "ace/OS_NS_string.h"
18 #include "ace/Malloc_T.h"
19 #include "ace/Profile_Timer.h"
20 #include "ace/Free_List.h"
22 // Number of memory allocation strategies used in this test.
23 static const int ACE_ALLOC_STRATEGY_NO
= 2;
25 // Size of a memory block (multiple of ACE_MALLOC_ALIGN).
26 static const int ACE_ALLOC_SIZE
= 5;
28 // Amount of memory block preallocated.
29 static const size_t ACE_ALLOC_AMOUNT
= 48;
31 // For the user-defined data block test
32 static bool user_data_dtor_called
= false;
33 class User_Data
: public ACE_Data_Block
40 ACE_DEBUG((LM_DEBUG
, ACE_TEXT ("User_Data dtor\n")));
41 user_data_dtor_called
= true;
45 #if defined (ACE_HAS_THREADS)
47 #include "ace/Lock_Adapter_T.h"
48 #include "ace/Synch_Traits.h"
50 // Number of iterations to run the test.
51 static size_t n_iterations
= ACE_MAX_ITERATIONS
;
53 static ACE_Lock_Adapter
<ACE_SYNCH_MUTEX
> lock_adapter_
;
54 // Serialize access to <ACE_Message_Block> reference count, which will
55 // be decremented from multiple threads.
57 class Worker_Task
: public ACE_Task
<ACE_MT_SYNCH
>
60 /// Activate the task.
63 /// Iterate <n_iterations> time printing off a message and "waiting"
64 /// for all other threads to complete this iteration.
67 /// Allows the producer to pass messages to the <Message_Block>.
68 int put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
= 0) override
;
71 //FUZZ: disable check_for_lack_ACE_OS
73 ///FUZZ: enable check_for_lack_ACE_OS
74 int close (u_long
) override
;
78 Worker_Task::close (u_long
)
80 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) close of worker\n")));
84 // Simply enqueue the Worker_Task into the end of the queue.
87 Worker_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
89 return this->msg_queue ()->enqueue_prio (mb
, tv
);
92 // Iterate <n_iterations> printing off a message and "waiting" for all
93 // other threads to complete this iteration.
98 // The <ACE_Task::svc_run()> method automatically adds us to the
99 // process-wide <ACE_Thread_Manager> when the thread begins.
101 ACE_DEBUG ((LM_DEBUG
,
102 ACE_TEXT ("(%t) starting svc() method\n")));
104 // Keep looping, reading a message out of the queue, until we get a
105 // message with a length == 0, which signals us to quit.
107 for (int count
= 0; ; count
++)
109 ACE_Message_Block
*mb
= 0;
111 if (-1 == this->msg_queue ()->dequeue_head (mb
))
112 ACE_ERROR_BREAK ((LM_ERROR
,
113 ACE_TEXT ("(%t) %p\n"),
114 ACE_TEXT ("Worker_Task dequeue_head")));
116 size_t length
= mb
->length ();
118 // If there's a next() Task then "logically" copy the message by
119 // calling <duplicate> and send it on down the pipeline. Note
120 // that this doesn't actually make a copy of the message
121 // contents (i.e., the Data_Block portion), it just makes a copy
122 // of the header and reference counts the data.
123 if (this->next () != 0)
125 if (-1 == this->put_next (mb
->duplicate ()))
126 ACE_ERROR_BREAK ((LM_ERROR
,
127 ACE_TEXT ("(%t) %p\n"),
128 ACE_TEXT ("Worker_Task put_next")));
131 // If there's no next() Task to send to, then we'll consume the
135 int current_count
= ACE_OS::atoi ((ACE_TCHAR
*)(mb
->rd_ptr ()));
138 if (count
!= current_count
)
139 ACE_ERROR_BREAK ((LM_ERROR
,
140 ACE_TEXT ("(%t) count from block should be %d ")
141 ACE_TEXT ("but is %d\n"),
142 count
, current_count
));
144 ACE_DEBUG ((LM_DEBUG
,
145 ACE_TEXT ("(%t) enqueueing %d duplicates\n"),
148 ACE_Message_Block
*dup
;
150 // Enqueue <current_count> duplicates with msg_priority == 1.
151 for (i
= current_count
; i
> 0; i
--)
153 ACE_ALLOCATOR_RETURN (dup
,
156 // Set the priority to be greater than "normal"
157 // messages. Therefore, all of these messages should go
158 // to the "front" of the queue, i.e., ahead of all the
159 // other messages that are being enqueued by other
161 dup
->msg_priority (ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
+ 1);
163 int enqueue_prio_result
=
164 this->msg_queue ()->enqueue_prio
166 // Don't block indefinitely if we flow control...
167 (ACE_Time_Value
*) &ACE_Time_Value::zero
);
169 if (enqueue_prio_result
== -1)
170 ACE_ERROR_BREAK ((LM_ERROR
,
171 ACE_TEXT ("(%t) Pass %d %p\n"),
173 ACE_TEXT ("Worker_Task enqueue_prio")));
176 ACE_DEBUG ((LM_DEBUG
,
177 ACE_TEXT ("(%t) dequeueing %d duplicates\n"),
180 // Dequeue the same <current_count> duplicates.
181 for (i
= current_count
; i
> 0; i
--)
183 if (-1 == this->msg_queue ()->dequeue_head (dup
))
184 ACE_ERROR_BREAK ((LM_ERROR
,
185 ACE_TEXT ("(%t) Dup %d, %p\n"),
187 ACE_TEXT ("Worker_Task dequeue dups")));
188 if (count
!= ACE_OS::atoi ((ACE_TCHAR
*)(dup
->rd_ptr ())))
189 ACE_ERROR ((LM_ERROR
,
190 ACE_TEXT ("(%t) line %l, Dup %d, block's count ")
191 ACE_TEXT ("is %d but should be %d\n"),
193 ACE_OS::atoi ((ACE_TCHAR
*)(dup
->rd_ptr ())),
195 if (0 != ACE_OS::strcmp ((ACE_TCHAR
*)mb
->rd_ptr (),
196 (ACE_TCHAR
*)dup
->rd_ptr ()))
197 ACE_ERROR ((LM_ERROR
,
198 ACE_TEXT ("(%t) Dup %d text is %s; ")
199 ACE_TEXT ("should be %s\n"),
203 if (dup
->msg_priority () != ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
+ 1)
204 ACE_ERROR ((LM_ERROR
,
205 ACE_TEXT ("(%t) Dup %d block priority is %u; ")
206 ACE_TEXT ("should be %u\n"),
208 (unsigned int)dup
->msg_priority (),
209 (unsigned int)(ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
+ 1)));
213 ACE_DEBUG ((LM_DEBUG
,
214 ACE_TEXT ("(%t) in iteration %d, length = %B, prio = %d, text = \"%*s\"\n"),
218 (int)(length
- 2), // remove the trailing "\n\0"
222 // We're responsible for deallocating this.
227 //FUZZ: disable check_for_NULL
228 ACE_DEBUG ((LM_DEBUG
,
229 ACE_TEXT ("(%t) in iteration %d, queue len = %B, got NULL message, exiting\n"),
230 count
, this->msg_queue ()->message_count ()));
231 //FUZZ: enable check_for_NULL
236 // Note that the ACE_Task::svc_run () method automatically removes
237 // us from the Thread_Manager when the thread exits.
241 Worker_Task::Worker_Task ()
243 // Make us an Active Object.
244 if (this->activate (THR_NEW_LWP
) == -1)
245 ACE_ERROR ((LM_ERROR
,
247 ACE_TEXT ("activate failed")));
251 produce (Worker_Task
&worker_task
,
252 ACE_Allocator
*alloc_strategy
)
254 ACE_Message_Block
*mb
= 0;
257 // Send <n_iteration> messages through the pipeline.
258 for (size_t count
= 0; count
< n_iterations
; count
++)
260 ACE_TCHAR buf
[BUFSIZ
];
261 ACE_OS::snprintf (buf
, BUFSIZ
, ACE_SIZE_T_FORMAT_SPECIFIER
, count
);
263 size_t n
= (ACE_OS::strlen (buf
) + 1) * sizeof (ACE_TCHAR
);
265 // Allocate a new message.
267 ACE_Message_Block (n
, // size
268 ACE_Message_Block::MB_DATA
, // type
271 alloc_strategy
, // allocator
272 &lock_adapter_
, // locking strategy
273 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
), // priority
276 // Try once to copy in more than the block will hold; should yield an
277 // error with ENOSPC.
280 status
= mb
->copy ((char *) buf
, n
+ 1);
282 ACE_ERROR ((LM_ERROR
,
283 ACE_TEXT (" (%t) Copy %B bytes into %B byte block ")
284 ACE_TEXT ("should fail but didn't\n"),
287 else if (errno
!= ENOSPC
)
289 ACE_ERROR ((LM_ERROR
,
290 ACE_TEXT (" (%t) Copy into too-small block failed ")
291 ACE_TEXT ("but with %p; should be ENOSPC\n"),
292 ACE_TEXT ("wrong error")));
296 ACE_TEXT (" (%t) Copy too-long test succeeded\n")));
298 // Copy buf into the Message_Block and update the wr_ptr ().
299 status
= mb
->copy ((char *) buf
, n
);
302 ACE_ERROR ((LM_ERROR
,
303 ACE_TEXT (" (%t) Copy to block should be good but %p\n"),
304 ACE_TEXT ("failed")));
306 // Pass the message to the Worker_Task.
307 if (worker_task
.put (mb
,
308 // Don't block indefinitely if we flow control...
309 (ACE_Time_Value
*) &ACE_Time_Value::zero
) == -1)
310 ACE_ERROR ((LM_ERROR
,
311 ACE_TEXT (" (%t) %p\n"),
315 // Send a shutdown message to the waiting threads and exit.
316 ACE_DEBUG ((LM_DEBUG
,
317 ACE_TEXT (" (%t) sending shutdown message\n")));
320 ACE_Message_Block (0,
321 ACE_Message_Block::MB_DATA
,
328 if (worker_task
.put (mb
) == -1)
329 ACE_ERROR ((LM_ERROR
,
330 ACE_TEXT (" (%t) %p\n"),
333 ACE_DEBUG ((LM_DEBUG
,
334 ACE_TEXT (" (%t) end producer\n")));
338 typedef ACE_TCHAR MEMORY_CHUNK
[ACE_MALLOC_ALIGN
* ACE_ALLOC_SIZE
];
340 ACE_Cached_Allocator
<MEMORY_CHUNK
,
342 mem_allocator (ACE_ALLOC_AMOUNT
);
343 struct alloc_struct_type
345 ACE_Allocator
*strategy_
;
346 const ACE_TCHAR
*name_
;
347 ACE_Profile_Timer::ACE_Elapsed_Time et_
;
350 alloc_struct_type alloc_struct
[ACE_ALLOC_STRATEGY_NO
] =
352 { 0, ACE_TEXT ("Default"), {0,0,0} },
353 { &mem_allocator
, ACE_TEXT ("Cached Memory"), {0,0,0} }
356 #endif /* ACE_HAS_THREADS */
359 run_main (int, ACE_TCHAR
*[])
361 ACE_START_TEST (ACE_TEXT ("Message_Block_Test"));
363 // A quick user-defined data block test, then the main event
364 User_Data
*user_data_block
= 0;
365 ACE_NEW_MALLOC_RETURN (user_data_block
,
366 static_cast<User_Data
*>(
367 ACE_Allocator::instance()->malloc(sizeof (User_Data
))),
371 // Create a new message block referring to the User_Data block and
372 // ensure it is released and freed correctly.
373 ACE_Message_Block
*wrapper_mb
= 0;
374 ACE_NEW_RETURN (wrapper_mb
,
375 ACE_Message_Block (user_data_block
),
378 wrapper_mb
->release ();
380 if (!user_data_dtor_called
)
381 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("User-defined data block not freed correctly.\n")));
383 #if defined (ACE_HAS_THREADS)
384 int n_threads
= ACE_MAX_THREADS
;
386 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) threads = %d\n"), n_threads
));
388 ACE_Profile_Timer ptime
;
392 for (i
= 0; i
< ACE_ALLOC_STRATEGY_NO
; i
++)
394 ACE_DEBUG ((LM_DEBUG
,
395 ACE_TEXT ("(%t) Start Message_Block_Test using %s allocation strategy\n"),
396 alloc_struct
[i
].name_
));
398 // Create the worker tasks.
399 Worker_Task worker_task
[ACE_MAX_THREADS
] ;
401 // Link all the tasks together into a simple pipeline.
402 for (size_t j
= 1; j
< ACE_MAX_THREADS
; j
++)
403 worker_task
[j
- 1].next (&worker_task
[j
]);
406 // Generate messages and pass them through the pipeline.
407 produce (worker_task
[0], alloc_struct
[i
].strategy_
);
409 // Wait for all the threads to reach their exit point.
411 ACE_DEBUG ((LM_DEBUG
,
412 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
414 ACE_Thread_Manager::instance ()->wait ();
416 ptime
.elapsed_time (alloc_struct
[i
].et_
);
418 ACE_DEBUG ((LM_DEBUG
,
419 ACE_TEXT ("(%t) destroying worker tasks\n")));
422 for (i
= 0; i
< ACE_ALLOC_STRATEGY_NO
; i
++)
423 ACE_DEBUG ((LM_DEBUG
,
424 ACE_TEXT ("Elapsed time using %s allocation strategy: %f sec\n"),
425 alloc_struct
[i
].name_
,
426 alloc_struct
[i
].et_
.real_time
));
428 ACE_DEBUG ((LM_DEBUG
,
429 ACE_TEXT ("(%t) Exiting...\n")));
432 ACE_TEXT ("threads not supported on this platform\n")));
433 #endif /* ACE_HAS_THREADS */