Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / examples / C++NPv2 / display_logfile.cpp
blob49a58b2eb4c33946ebdd9916097f938457b3515f
1 /*
2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
3 */
5 #include "ace/ACE.h"
6 #include "ace/CDR_Stream.h"
7 #include "ace/FILE_Addr.h"
8 #include "ace/FILE_Connector.h"
9 #include "ace/FILE_IO.h"
10 #include "ace/Message_Block.h"
11 #include "ace/Module.h"
12 #include "ace/SString.h"
13 #include "ace/Stream.h"
14 #include "ace/Task.h"
15 #include "ace/Thread_Manager.h"
16 #include "ace/Lock_Adapter_T.h"
17 #include "ace/OS_NS_string.h"
18 #include "ace/OS_NS_time.h"
20 template <class TASK>
21 class Logrec_Module : public ACE_Module<ACE_SYNCH>
23 public:
24 Logrec_Module (const ACE_TCHAR *name)
26 this->open (name,
27 &task_, // Initialize writer-side task.
28 0, // Ignore reader-side task.
30 ACE_Module<ACE_SYNCH>::M_DELETE_READER);
32 private:
33 TASK task_;
35 #define LOGREC_MODULE(NAME) \
36 typedef Logrec_Module<NAME> NAME##_Module
38 class Logrec_Reader : public ACE_Task<ACE_SYNCH>
40 private:
41 ACE_TString filename_; // Name of logfile.
42 ACE_FILE_IO logfile_; // File containing log records.
44 public:
45 enum {MB_CLIENT = ACE_Message_Block::MB_USER,
46 MB_TYPE, MB_PID, MB_TIME, MB_TEXT};
48 Logrec_Reader (const ACE_TString &file): filename_ (file) {}
50 //FUZZ: disable check_for_lack_ACE_OS
51 virtual int open (void *) {
52 //FUZZ: enable check_for_lack_ACE_OS
54 ACE_FILE_Addr name (filename_.c_str ());
55 ACE_FILE_Connector connector;
56 if (connector.connect (logfile_, name) == -1)
57 return -1;
58 return activate ();
61 virtual int svc () {
62 const size_t FileReadSize = 8 * 1024;
63 ACE_Message_Block mblk (FileReadSize);
65 for (;; mblk.crunch ()) {
66 // Read as much as will fit in the message block.
67 ssize_t bytes_read = logfile_.recv (mblk.wr_ptr (),
68 mblk.space ());
69 if (bytes_read <= 0)
70 break;
71 mblk.wr_ptr (static_cast<size_t> (bytes_read));
73 // We have a bunch of data from the log file. The data is
74 // arranged like so:
75 // hostname\0
76 // CDR-encoded log record
77 // So, first we scan for the end of the host name, then
78 // initialize another ACE_Message_Block aligned for CDR
79 // demarshaling and copy the remainder of the block into it. We
80 // can't use duplicate() because we need to be sure the data
81 // pointer is aligned properly for CDR demarshaling. If at any
82 // point, there's not enough data left in the message block to
83 // extract what's needed, crunch the block to move all remaining
84 // data to the beginning and read more from the file.
85 for (;;) {
86 size_t name_len = ACE_OS::strnlen
87 (mblk.rd_ptr (), mblk.length ());
88 if (name_len == mblk.length ()) break;
90 char *name_p = mblk.rd_ptr ();
91 ACE_Message_Block *rec, *head, *temp;
92 ACE_NEW_RETURN
93 (head, ACE_Message_Block (name_len, MB_CLIENT), 0);
94 head->copy (name_p, name_len);
95 mblk.rd_ptr (name_len + 1); // Skip nul also
97 size_t need = mblk.length () + ACE_CDR::MAX_ALIGNMENT;
98 ACE_NEW_RETURN (rec, ACE_Message_Block (need), 0);
99 ACE_CDR::mb_align (rec);
100 rec->copy (mblk.rd_ptr (), mblk.length ());
102 // Now rec contains the remaining data we've read so far from
103 // the file. Create an ACE_InputCDR to start demarshaling the
104 // log record, header first to find the length, then the data.
105 // Since the ACE_InputCDR constructor increases the reference count
106 // on rec, we release it upon return to prevent leaks.
107 // The cdr 'read' methods return 0 on failure, 1 on success.
108 ACE_InputCDR cdr (rec); rec->release ();
109 ACE_CDR::Boolean byte_order;
110 if (!cdr.read_boolean (byte_order)) {
111 head->release (); rec->release (); break;
113 cdr.reset_byte_order (byte_order);
115 // Now read the length of the record. From there, we'll know
116 // if rec contains the complete record or not.
117 ACE_CDR::ULong length;
118 if (!cdr.read_ulong (length)) {
119 head->release (); mblk.rd_ptr (name_p); break;
121 if (length > cdr.length ()) {
122 head->release (); mblk.rd_ptr (name_p); break;
125 // The complete record is in rec... grab all the fields into
126 // separate, chained message blocks.
127 ACE_NEW_RETURN (temp,
128 ACE_Message_Block (length, MB_TEXT),
130 ACE_NEW_RETURN
131 (temp,
132 ACE_Message_Block (2 * sizeof (ACE_CDR::Long),
133 MB_TIME, temp),
135 ACE_NEW_RETURN
136 (temp,
137 ACE_Message_Block (sizeof (ACE_CDR::Long),
138 MB_PID, temp),
140 ACE_NEW_RETURN
141 (temp,
142 ACE_Message_Block (sizeof (ACE_CDR::Long),
143 MB_TYPE, temp),
145 head->cont (temp);
147 // Extract the type
148 ACE_CDR::Long *lp;
149 lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
150 cdr >> *lp;
151 temp->wr_ptr (sizeof (ACE_CDR::Long));
152 temp = temp->cont ();
154 // Extract the pid
155 lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
156 cdr >> *lp;
157 temp->wr_ptr (sizeof (ACE_CDR::Long));
158 temp = temp->cont ();
160 // Extract the timestamp (2 Longs)
161 lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
162 cdr >> *lp; ++lp; cdr >> *lp;
163 temp->wr_ptr (2 * sizeof (ACE_CDR::Long));
164 temp = temp->cont ();
166 // Demarshal the length of the message text, then demarshal
167 // the text into the block.
168 ACE_CDR::ULong text_len;
169 cdr >> text_len;
170 cdr.read_char_array (temp->wr_ptr (), text_len);
171 temp->wr_ptr (text_len);
173 // Forward the whole lot to the next module.
174 if (put_next (head) == -1) break;
176 // Move the file-content block's read pointer up past whatever
177 // was just processed. Although the mblk's rd_ptr has not been
178 // moved, cdr's has. Therefore, use its length() to determine
179 // how much is left.
180 mblk.rd_ptr (mblk.length () - cdr.length ());
184 // Now that the file is done, send a block down the stream to tell
185 // the other modules to stop.
186 ACE_Message_Block *stop;
187 ACE_NEW_RETURN
188 (stop, ACE_Message_Block (0, ACE_Message_Block::MB_STOP),
190 put_next (stop);
191 return 0;
195 class Logrec_Reader_Module : public ACE_Module<ACE_SYNCH>
197 public:
198 Logrec_Reader_Module (const ACE_TString &filename)
199 : task_ (filename)
201 this->open (ACE_TEXT ("Logrec Reader"),
202 &task_, // Initialize writer-side.
203 0, // Ignore reader-side.
205 ACE_Module<ACE_SYNCH>::M_DELETE_READER);
207 private:
208 Logrec_Reader task_;
211 class Logrec_Writer : public ACE_Task<ACE_SYNCH>
213 public:
214 //FUZZ: disable check_for_lack_ACE_OS
215 // Initialization hook method.
216 virtual int open (void *) { return activate (); }
217 //FUZZ: enable check_for_lack_ACE_OS
219 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
220 { return putq (mblk, to); }
222 virtual int svc () {
223 int stop = 0;
224 for (ACE_Message_Block *mb = 0; !stop && getq (mb) != -1; ) {
225 if (mb->msg_type () == ACE_Message_Block::MB_STOP)
226 stop = 1;
227 else
228 ACE::write_n (ACE_STDOUT, mb);
229 put_next (mb);
231 return 0;
235 LOGREC_MODULE (Logrec_Writer);
237 class Logrec_Formatter : public ACE_Task<ACE_SYNCH>
239 public:
240 typedef void (*FORMATTER[5])(ACE_Message_Block *);
241 private:
242 static FORMATTER format_; // Array of format static methods.
244 public:
245 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *) {
246 if (mblk->msg_type () == Logrec_Reader::MB_CLIENT)
247 for (ACE_Message_Block *temp = mblk;
248 temp != 0;
249 temp = temp->cont ()) {
250 int mb_type =
251 temp->msg_type () - ACE_Message_Block::MB_USER;
252 (*format_[mb_type])(temp);
254 return put_next (mblk);
257 static void format_client (ACE_Message_Block *) {
258 return;
261 static void format_type (ACE_Message_Block *mblk) {
262 ACE_CDR::Long type = * (ACE_CDR::Long *)mblk->rd_ptr ();
263 mblk->size (11); // Max size in ASCII of 32-bit word.
264 mblk->reset ();
265 mblk->wr_ptr ((size_t) ACE_OS::sprintf (mblk->wr_ptr (), "%d", type));
268 static void format_pid (ACE_Message_Block *mblk) {
269 ACE_CDR::Long pid = * (ACE_CDR::Long *)mblk->rd_ptr ();
270 mblk->size (11); // Max size in ASCII of 32-bit word.
271 mblk->reset ();
272 mblk->wr_ptr ((size_t) ACE_OS::sprintf (mblk->wr_ptr (), "%d", pid));
275 static void format_time (ACE_Message_Block *mblk) {
276 ACE_CDR::Long secs = *(ACE_CDR::Long *) mblk->rd_ptr ();
277 mblk->rd_ptr (sizeof (ACE_CDR::Long));
278 ACE_CDR::Long usecs = *(ACE_CDR::Long *) mblk->rd_ptr ();
279 ACE_TCHAR timestamp_t[26];
280 char timestamp[26]; // Max size of ctime_r() string.
281 time_t time_secs (secs);
282 ACE_OS::ctime_r (&time_secs,
283 timestamp_t,
284 sizeof timestamp_t / sizeof (ACE_TCHAR));
285 ACE_OS::strcpy (timestamp, ACE_TEXT_ALWAYS_CHAR (timestamp_t));
286 mblk->size (26); // Max size of ctime_r() string.
287 mblk->reset ();
288 timestamp[19] = '\0'; // NUL-terminate after the time.
289 timestamp[24] = '\0'; // NUL-terminate after the date.
290 size_t fmt_len (ACE_OS::sprintf (mblk->wr_ptr (),
291 "%s.%03d %s",
292 timestamp + 4,
293 usecs / 1000,
294 timestamp + 20));
295 mblk->wr_ptr (fmt_len);
298 static void format_data (ACE_Message_Block *) {
299 return;
303 Logrec_Formatter::FORMATTER Logrec_Formatter::format_ = {
304 format_client, format_type, format_pid, format_time, format_data
307 LOGREC_MODULE (Logrec_Formatter);
309 class Logrec_Separator : public ACE_Task<ACE_SYNCH>
311 private:
312 ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_strategy_;
314 public:
315 virtual int put (ACE_Message_Block *mblk,
316 ACE_Time_Value *) {
317 if (mblk->msg_type () != ACE_Message_Block::MB_STOP) {
318 ACE_Message_Block *separator;
319 ACE_NEW_RETURN
320 (separator,
321 ACE_Message_Block (ACE_OS::strlen ("|") + 1,
322 ACE_Message_Block::MB_DATA,
323 0, 0, 0, &lock_strategy_),
324 -1);
325 separator->copy ("|");
327 ACE_Message_Block *dup = 0;
328 for (ACE_Message_Block *temp = mblk; temp != 0; ) {
329 dup = separator->duplicate ();
330 dup->cont (temp->cont ());
331 temp->cont (dup);
332 temp = dup->cont ();
334 ACE_Message_Block *nl;
335 ACE_NEW_RETURN (nl, ACE_Message_Block (2), 0);
336 nl->copy ("\n");
337 dup->cont (nl);
338 separator->release ();
340 return put_next (mblk);
344 LOGREC_MODULE (Logrec_Separator);
346 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
348 if (argc != 2)
349 ACE_ERROR_RETURN ((LM_ERROR,
350 "usage: %s logfile\n", argv[0]),
352 ACE_TString logfile (argv[1]);
353 ACE_Stream<ACE_SYNCH> stream;
355 if (stream.push
356 (new Logrec_Writer_Module (ACE_TEXT ("Writer"))) != -1
357 && stream.push
358 (new Logrec_Separator_Module (ACE_TEXT ("Separator"))) != -1
359 && stream.push
360 (new Logrec_Formatter_Module (ACE_TEXT ("Formatter"))) != -1
361 && stream.push
362 (new Logrec_Reader_Module (logfile)) != -1)
363 return ACE_Thread_Manager::instance ()->wait () == 0 ? 0 : 1;
364 return 1;