2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
6 #include "ace/CDR_Stream.h"
7 #include "ace/FILE_Addr.h"
8 #include "ace/FILE_Connector.h"
9 #include "ace/FILE_IO.h"
10 #include "ace/Message_Block.h"
11 #include "ace/Module.h"
12 #include "ace/SString.h"
13 #include "ace/Stream.h"
15 #include "ace/Thread_Manager.h"
16 #include "ace/Lock_Adapter_T.h"
17 #include "ace/OS_NS_string.h"
18 #include "ace/OS_NS_time.h"
21 class Logrec_Module
: public ACE_Module
<ACE_SYNCH
>
24 Logrec_Module (const ACE_TCHAR
*name
)
27 &task_
, // Initialize writer-side task.
28 0, // Ignore reader-side task.
30 ACE_Module
<ACE_SYNCH
>::M_DELETE_READER
);
35 #define LOGREC_MODULE(NAME) \
36 typedef Logrec_Module<NAME> NAME##_Module
38 class Logrec_Reader
: public ACE_Task
<ACE_SYNCH
>
41 ACE_TString filename_
; // Name of logfile.
42 ACE_FILE_IO logfile_
; // File containing log records.
45 enum {MB_CLIENT
= ACE_Message_Block::MB_USER
,
46 MB_TYPE
, MB_PID
, MB_TIME
, MB_TEXT
};
48 Logrec_Reader (const ACE_TString
&file
): filename_ (file
) {}
50 //FUZZ: disable check_for_lack_ACE_OS
51 virtual int open (void *) {
52 //FUZZ: enable check_for_lack_ACE_OS
54 ACE_FILE_Addr
name (filename_
.c_str ());
55 ACE_FILE_Connector connector
;
56 if (connector
.connect (logfile_
, name
) == -1)
62 const size_t FileReadSize
= 8 * 1024;
63 ACE_Message_Block
mblk (FileReadSize
);
65 for (;; mblk
.crunch ()) {
66 // Read as much as will fit in the message block.
67 ssize_t bytes_read
= logfile_
.recv (mblk
.wr_ptr (),
71 mblk
.wr_ptr (static_cast<size_t> (bytes_read
));
73 // We have a bunch of data from the log file. The data is
76 // CDR-encoded log record
77 // So, first we scan for the end of the host name, then
78 // initialize another ACE_Message_Block aligned for CDR
79 // demarshaling and copy the remainder of the block into it. We
80 // can't use duplicate() because we need to be sure the data
81 // pointer is aligned properly for CDR demarshaling. If at any
82 // point, there's not enough data left in the message block to
83 // extract what's needed, crunch the block to move all remaining
84 // data to the beginning and read more from the file.
86 size_t name_len
= ACE_OS::strnlen
87 (mblk
.rd_ptr (), mblk
.length ());
88 if (name_len
== mblk
.length ()) break;
90 char *name_p
= mblk
.rd_ptr ();
91 ACE_Message_Block
*rec
, *head
, *temp
;
93 (head
, ACE_Message_Block (name_len
, MB_CLIENT
), 0);
94 head
->copy (name_p
, name_len
);
95 mblk
.rd_ptr (name_len
+ 1); // Skip nul also
97 size_t need
= mblk
.length () + ACE_CDR::MAX_ALIGNMENT
;
98 ACE_NEW_RETURN (rec
, ACE_Message_Block (need
), 0);
99 ACE_CDR::mb_align (rec
);
100 rec
->copy (mblk
.rd_ptr (), mblk
.length ());
102 // Now rec contains the remaining data we've read so far from
103 // the file. Create an ACE_InputCDR to start demarshaling the
104 // log record, header first to find the length, then the data.
105 // Since the ACE_InputCDR constructor increases the reference count
106 // on rec, we release it upon return to prevent leaks.
107 // The cdr 'read' methods return 0 on failure, 1 on success.
108 ACE_InputCDR
cdr (rec
); rec
->release ();
109 ACE_CDR::Boolean byte_order
;
110 if (!cdr
.read_boolean (byte_order
)) {
111 head
->release (); rec
->release (); break;
113 cdr
.reset_byte_order (byte_order
);
115 // Now read the length of the record. From there, we'll know
116 // if rec contains the complete record or not.
117 ACE_CDR::ULong length
;
118 if (!cdr
.read_ulong (length
)) {
119 head
->release (); mblk
.rd_ptr (name_p
); break;
121 if (length
> cdr
.length ()) {
122 head
->release (); mblk
.rd_ptr (name_p
); break;
125 // The complete record is in rec... grab all the fields into
126 // separate, chained message blocks.
127 ACE_NEW_RETURN (temp
,
128 ACE_Message_Block (length
, MB_TEXT
),
132 ACE_Message_Block (2 * sizeof (ACE_CDR::Long
),
137 ACE_Message_Block (sizeof (ACE_CDR::Long
),
142 ACE_Message_Block (sizeof (ACE_CDR::Long
),
149 lp
= reinterpret_cast<ACE_CDR::Long
*> (temp
->wr_ptr ());
151 temp
->wr_ptr (sizeof (ACE_CDR::Long
));
152 temp
= temp
->cont ();
155 lp
= reinterpret_cast<ACE_CDR::Long
*> (temp
->wr_ptr ());
157 temp
->wr_ptr (sizeof (ACE_CDR::Long
));
158 temp
= temp
->cont ();
160 // Extract the timestamp (2 Longs)
161 lp
= reinterpret_cast<ACE_CDR::Long
*> (temp
->wr_ptr ());
162 cdr
>> *lp
; ++lp
; cdr
>> *lp
;
163 temp
->wr_ptr (2 * sizeof (ACE_CDR::Long
));
164 temp
= temp
->cont ();
166 // Demarshal the length of the message text, then demarshal
167 // the text into the block.
168 ACE_CDR::ULong text_len
;
170 cdr
.read_char_array (temp
->wr_ptr (), text_len
);
171 temp
->wr_ptr (text_len
);
173 // Forward the whole lot to the next module.
174 if (put_next (head
) == -1) break;
176 // Move the file-content block's read pointer up past whatever
177 // was just processed. Although the mblk's rd_ptr has not been
178 // moved, cdr's has. Therefore, use its length() to determine
180 mblk
.rd_ptr (mblk
.length () - cdr
.length ());
184 // Now that the file is done, send a block down the stream to tell
185 // the other modules to stop.
186 ACE_Message_Block
*stop
;
188 (stop
, ACE_Message_Block (0, ACE_Message_Block::MB_STOP
),
195 class Logrec_Reader_Module
: public ACE_Module
<ACE_SYNCH
>
198 Logrec_Reader_Module (const ACE_TString
&filename
)
201 this->open (ACE_TEXT ("Logrec Reader"),
202 &task_
, // Initialize writer-side.
203 0, // Ignore reader-side.
205 ACE_Module
<ACE_SYNCH
>::M_DELETE_READER
);
211 class Logrec_Writer
: public ACE_Task
<ACE_SYNCH
>
214 //FUZZ: disable check_for_lack_ACE_OS
215 // Initialization hook method.
216 virtual int open (void *) { return activate (); }
217 //FUZZ: enable check_for_lack_ACE_OS
219 virtual int put (ACE_Message_Block
*mblk
, ACE_Time_Value
*to
)
220 { return putq (mblk
, to
); }
224 for (ACE_Message_Block
*mb
= 0; !stop
&& getq (mb
) != -1; ) {
225 if (mb
->msg_type () == ACE_Message_Block::MB_STOP
)
228 ACE::write_n (ACE_STDOUT
, mb
);
235 LOGREC_MODULE (Logrec_Writer
);
237 class Logrec_Formatter
: public ACE_Task
<ACE_SYNCH
>
240 typedef void (*FORMATTER
[5])(ACE_Message_Block
*);
242 static FORMATTER format_
; // Array of format static methods.
245 virtual int put (ACE_Message_Block
*mblk
, ACE_Time_Value
*) {
246 if (mblk
->msg_type () == Logrec_Reader::MB_CLIENT
)
247 for (ACE_Message_Block
*temp
= mblk
;
249 temp
= temp
->cont ()) {
251 temp
->msg_type () - ACE_Message_Block::MB_USER
;
252 (*format_
[mb_type
])(temp
);
254 return put_next (mblk
);
257 static void format_client (ACE_Message_Block
*) {
261 static void format_type (ACE_Message_Block
*mblk
) {
262 ACE_CDR::Long type
= * (ACE_CDR::Long
*)mblk
->rd_ptr ();
263 mblk
->size (11); // Max size in ASCII of 32-bit word.
265 mblk
->wr_ptr ((size_t) ACE_OS::sprintf (mblk
->wr_ptr (), "%d", type
));
268 static void format_pid (ACE_Message_Block
*mblk
) {
269 ACE_CDR::Long pid
= * (ACE_CDR::Long
*)mblk
->rd_ptr ();
270 mblk
->size (11); // Max size in ASCII of 32-bit word.
272 mblk
->wr_ptr ((size_t) ACE_OS::sprintf (mblk
->wr_ptr (), "%d", pid
));
275 static void format_time (ACE_Message_Block
*mblk
) {
276 ACE_CDR::Long secs
= *(ACE_CDR::Long
*) mblk
->rd_ptr ();
277 mblk
->rd_ptr (sizeof (ACE_CDR::Long
));
278 ACE_CDR::Long usecs
= *(ACE_CDR::Long
*) mblk
->rd_ptr ();
279 ACE_TCHAR timestamp_t
[26];
280 char timestamp
[26]; // Max size of ctime_r() string.
281 time_t time_secs (secs
);
282 ACE_OS::ctime_r (&time_secs
,
284 sizeof timestamp_t
/ sizeof (ACE_TCHAR
));
285 ACE_OS::strcpy (timestamp
, ACE_TEXT_ALWAYS_CHAR (timestamp_t
));
286 mblk
->size (26); // Max size of ctime_r() string.
288 timestamp
[19] = '\0'; // NUL-terminate after the time.
289 timestamp
[24] = '\0'; // NUL-terminate after the date.
290 size_t fmt_len (ACE_OS::sprintf (mblk
->wr_ptr (),
295 mblk
->wr_ptr (fmt_len
);
298 static void format_data (ACE_Message_Block
*) {
303 Logrec_Formatter::FORMATTER
Logrec_Formatter::format_
= {
304 format_client
, format_type
, format_pid
, format_time
, format_data
307 LOGREC_MODULE (Logrec_Formatter
);
309 class Logrec_Separator
: public ACE_Task
<ACE_SYNCH
>
312 ACE_Lock_Adapter
<ACE_SYNCH_MUTEX
> lock_strategy_
;
315 virtual int put (ACE_Message_Block
*mblk
,
317 if (mblk
->msg_type () != ACE_Message_Block::MB_STOP
) {
318 ACE_Message_Block
*separator
;
321 ACE_Message_Block (ACE_OS::strlen ("|") + 1,
322 ACE_Message_Block::MB_DATA
,
323 0, 0, 0, &lock_strategy_
),
325 separator
->copy ("|");
327 ACE_Message_Block
*dup
= 0;
328 for (ACE_Message_Block
*temp
= mblk
; temp
!= 0; ) {
329 dup
= separator
->duplicate ();
330 dup
->cont (temp
->cont ());
334 ACE_Message_Block
*nl
;
335 ACE_NEW_RETURN (nl
, ACE_Message_Block (2), 0);
338 separator
->release ();
340 return put_next (mblk
);
344 LOGREC_MODULE (Logrec_Separator
);
346 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
349 ACE_ERROR_RETURN ((LM_ERROR
,
350 "usage: %s logfile\n", argv
[0]),
352 ACE_TString
logfile (argv
[1]);
353 ACE_Stream
<ACE_SYNCH
> stream
;
356 (new Logrec_Writer_Module (ACE_TEXT ("Writer"))) != -1
358 (new Logrec_Separator_Module (ACE_TEXT ("Separator"))) != -1
360 (new Logrec_Formatter_Module (ACE_TEXT ("Formatter"))) != -1
362 (new Logrec_Reader_Module (logfile
)) != -1)
363 return ACE_Thread_Manager::instance ()->wait () == 0 ? 0 : 1;