Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / APG / ThreadPools / Task_ThreadPool.cpp
blob0b87733cde7248e882f39034ced9c557200b566a
1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_time.h"
6 #include "ace/Task.h"
7 #include "ace/Synch.h"
8 #include "ace/SString.h"
10 // Listing 2 code/ch16
11 class Workers : public ACE_Task<ACE_MT_SYNCH>
13 public:
14 Workers ()
15 { }
17 virtual int svc ()
19 while (1)
21 ACE_Message_Block *mb = 0;
22 if (this->getq (mb) == -1)
24 ACE_DEBUG ((LM_INFO,
25 ACE_TEXT ("(%t) Shutting down\n")));
26 break;
29 // Process the message.
30 process_message (mb);
33 return 0;
35 // Listing 2
37 private:
38 void process_message (ACE_Message_Block *mb)
40 ACE_TRACE ("Workers::process_message");
41 int msgId;
42 ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
43 mb->release ();
45 ACE_DEBUG ((LM_DEBUG,
46 ACE_TEXT ("(%t) Started processing message %d\n"),
47 msgId));
48 ACE_OS::sleep (3);
49 ACE_DEBUG ((LM_DEBUG,
50 ACE_TEXT ("(%t) Finished processing message %d\n"),
51 msgId));
55 // Listing 1 code/ch16
56 class Manager : public ACE_Task<ACE_MT_SYNCH>
58 public:
59 enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
61 Manager () : shutdown_(0)
63 ACE_TRACE ("Manager::Manager");
66 int svc ()
68 ACE_TRACE ("Manager::svc");
70 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
72 // Create pool.
73 Workers pool;
74 pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE);
76 while (!done ())
78 ACE_Message_Block *mb = 0;
79 ACE_Time_Value tv ((long)MAX_TIMEOUT);
80 tv += ACE_OS::time (0);
82 // Get a message request.
83 if (this->getq (mb, &tv) < 0)
85 pool.msg_queue ()->deactivate ();
86 pool.wait ();
87 break;
90 // Ask the worker pool to do the job.
91 pool.putq (mb);
94 return 0;
97 private:
98 int done ();
100 int shutdown_;
102 // Listing 1
104 int Manager::done ()
106 return (shutdown_ == 1);
110 int ACE_TMAIN (int, ACE_TCHAR *[])
112 Manager tp;
113 tp.activate ();
115 // Wait for a moment every time you send a message.
116 ACE_Time_Value tv;
117 tv.msec (100);
119 ACE_Message_Block *mb = 0;
120 for (int i = 0; i < 30; i++)
122 ACE_NEW_RETURN
123 (mb, ACE_Message_Block(sizeof(int)), -1);
125 ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
127 ACE_OS::sleep (tv);
129 // Add a new work item.
130 tp.putq (mb);
133 ACE_Thread_Manager::instance ()->wait ();
134 return 0;
137 #else
138 #include "ace/OS_main.h"
139 #include "ace/OS_NS_stdio.h"
141 int ACE_TMAIN (int, ACE_TCHAR *[])
143 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
144 return 0;
147 #endif /* ACE_HAS_THREADS */