Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / tests / Proactor_File_Test.cpp
blob39614badfa5aa8331253297f33a9a9ed74562e32
1 // ============================================================================
2 /**
3 * @file Proactor_File_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does asynchronous file IO
7 * operations.
9 * @author Martin Corino <mcorino@remedy.nl>
11 // ============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
16 // This only works on Win32 platforms and on Unix platforms
17 // supporting POSIX aio calls.
19 //////////////////////////////////////////////////////////////////
21 // This sample application integrates asynch file
22 // read/write operations with the Proactor, using an ACE_FILE_Connector,
23 // ACE_FILE_IO, ACE_Asynch_Read_File, ACE_Asynch_Write_File in an ACE_Handler.
24 // The program sets up asynch read and write on a temporary file, and sends
25 // out 16 16-character bursts of data at timed intervals.
27 #include <stdio.h>
28 #include "ace/OS_NS_stdio.h"
29 #include "ace/OS_NS_unistd.h"
30 #include "ace/OS_NS_errno.h"
31 #include "ace/OS_NS_string.h"
32 #include "ace/OS_NS_sys_time.h"
34 #include "ace/FILE_Connector.h"
35 #include "ace/FILE_IO.h"
37 #include "ace/Proactor.h"
38 #include "ace/Asynch_Connector.h"
39 #include "ace/Time_Value.h"
42 // How long are our fake serial I/O frames?
43 #define FILE_FRAME_SIZE 16
45 class FileIOHandler : public ACE_Handler
47 public:
48 FileIOHandler ();
49 ~FileIOHandler () override;
51 int
52 Connect();
54 // This method will be called when an asynchronous read
55 // completes on a file.
56 void
57 handle_read_file(const ACE_Asynch_Read_File::Result &result) override;
59 // This method will be called when an asynchronous write
60 // completes on a file.
61 void
62 handle_write_file(const ACE_Asynch_Write_File::Result &result) override;
64 // Callback hook invoked by the Proactor's Timer_Queue.
65 void
66 handle_time_out(const ACE_Time_Value &tv, const void *arg) override;
68 // Our I/O objects; they're public so others can access them
69 ACE_Asynch_Read_File reader_;
70 ACE_Asynch_Write_File writer_;
71 private:
72 int block_count_;
73 #if defined (ACE_WIN32)
74 bool read_pending_;
75 #endif
76 ACE_FILE_IO peer_;
77 ACE_FILE_Connector connector_;
80 FileIOHandler::FileIOHandler ()
81 : ACE_Handler ()
82 , block_count_ (0)
83 #if defined (ACE_WIN32)
84 , read_pending_ (false)
85 #endif
89 FileIOHandler::~FileIOHandler ()
91 ACE_FILE_Addr tmp_addr;
92 peer_.get_local_addr (tmp_addr);
93 if (tmp_addr.get_path_name ())
95 peer_.remove ();
99 //***************************************************************************
101 // Method: Connect
103 // Description: Establishes connection, primes read process
105 // Inputs: port name, port parameter block
107 // Returns: none
109 //***************************************************************************
111 int FileIOHandler::Connect()
113 int result = 0;
115 // create an empty temporary file for the test
116 if(connector_.connect(peer_,
117 ACE_sap_any_cast (ACE_FILE_Addr &)) != 0)
119 ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
120 ACE_TEXT("FileIOHandler connect failed to create file")));
121 result = -1;
124 // close opened file but leave it where it is
125 if (peer_.close () != 0)
127 ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
128 ACE_TEXT("FileIOHandler connect failed to close file")));
129 peer_.remove ();
130 result = -1;
133 // get file address
134 ACE_FILE_Addr tmp_addr;
135 peer_.get_local_addr (tmp_addr);
137 // reopen new file for asynch IO
138 if(connector_.connect(peer_,
139 tmp_addr,
140 0, //timeout
141 ACE_Addr::sap_any,
142 0, //reuse
143 O_RDWR |FILE_FLAG_OVERLAPPED) != 0)
145 ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
146 ACE_TEXT("FileIOHandler connect failed to open file")));
147 peer_.remove ();
148 result = -1;
150 else // device connected successfully
152 // keep track of our writes for offset calculations (can't use O_APPEND since
153 // this is not supported for the Win32_Asynch implementation) and data verifications
154 this->block_count_ = 0; // start counting
156 // Set our I/O handle to that of the peer_ object handling our connection
157 handle(peer_.get_handle());
159 if (writer_.open(*this) != 0 || reader_.open(*this) != 0)
161 ACE_ERROR(
162 (LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("FileIOHandler reader or writer open failed")));
163 result = -1;
165 else // reader and writer opened successfully
167 // Allocate a new message block and initiate a read operation on it
168 // to prime the asynchronous read pipeline
169 // The message block is sized for the largest message we expect
170 ACE_Message_Block *mb;
171 ACE_NEW_NORETURN(mb, ACE_Message_Block(FILE_FRAME_SIZE));
172 if (reader_.read(*mb, mb->space()) != 0)
174 int errnr = ACE_OS::last_error ();
175 ACE_DEBUG(
176 (LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler begin read failed"), errnr));
177 mb->release();
178 #if defined (ACE_WIN32)
179 // On older Win32 versions (WinXP, Win2003/2008) asynch IO with disk files is not
180 // reliable and may perform sync IO in certain cases like when the read offset denotes
181 // current end of file. Instead of scheduling a write operation the read will immediately
182 // return with an EOF error.
183 // We circumvent that situation here by not reporting an error and scheduling a read operation
184 // later when we are sure data has been written at the offset in question (after the write finishes).
185 if (errnr != ERROR_HANDLE_EOF)
186 #endif
187 result = -1;
189 #if defined (ACE_WIN32)
190 else
192 this->read_pending_ = true;
194 #endif
195 // If read worked, psMsg is now controlled by Proactor framework.
198 return result;
201 //***************************************************************************
203 // Method: handle_read_file
205 // Description: Callback used when a read completes
207 // Inputs: read file result structure containing message block
209 // Returns: none
211 //***************************************************************************
212 void
213 FileIOHandler::handle_read_file(const ACE_Asynch_Read_File::Result &result)
215 ACE_Message_Block &mb = result.message_block();
216 // If the read failed, queue up another one using the same message block
217 if (!result.success() || result.bytes_transferred() == 0)
219 //ACE_DEBUG((LM_INFO, ACE_TEXT("FileIOHandler receive timeout.\n")));
220 reader_.read(mb,
221 mb.space(),
222 result.offset () + result.bytes_transferred ());
224 else
226 // We have a message block with some read data in it. Send it onward
227 ACE_DEBUG((LM_INFO, ACE_TEXT("FileIOHandler received %d bytes of data at offset %d\n"),
228 result.bytes_transferred(), result.offset ()));
230 // TODO: Process this data in some meaningful way
231 if (result.offset () != (unsigned long)*reinterpret_cast<unsigned char*> (mb.rd_ptr ()))
233 ACE_DEBUG((LM_ERROR, ACE_TEXT("FileIOHandler received incorrect data: got [%u] expected [%u]\n"),
234 *reinterpret_cast<unsigned char*> (mb.rd_ptr ()), result.offset ()));
237 // Release the message block when we're done with it
238 mb.release();
240 if ((result.offset () + result.bytes_transferred ()) < 256)
242 // Our processing is done; prime the read process again
243 ACE_Message_Block *new_mb;
244 ACE_NEW_NORETURN(new_mb, ACE_Message_Block(FILE_FRAME_SIZE));
245 if (reader_.read(*new_mb, new_mb->space(),
246 result.offset () + result.bytes_transferred ()) != 0)
248 int errnr = ACE_OS::last_error ();
249 ACE_DEBUG(
250 (LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler continuing read failed"), errnr));
251 new_mb->release();
252 #if defined (ACE_WIN32)
253 this->read_pending_ = false;
255 else
257 this->read_pending_ = true;
258 #endif
261 else
263 // we have it all; stop the proactor
264 ACE_Proactor::instance ()->proactor_end_event_loop ();
269 //***************************************************************************
271 // Method: handle_write_file
273 // Description: Callback used when a write completes
275 // Inputs: write file result structure containing message block
277 // Returns: none
279 //***************************************************************************
280 void
281 FileIOHandler::handle_write_file(const ACE_Asynch_Write_File::Result &result)
283 ACE_DEBUG((LM_INFO, ACE_TEXT("Finished write\n")));
284 // When the write completes, we get the message block. It's been sent,
285 // so we just deallocate it.
286 result.message_block().release();
287 #if defined (ACE_WIN32)
288 // to circumvent problems on older Win32 (see above) we schedule a read here if none
289 // is pending yet.
290 if (!this->read_pending_)
292 ACE_Message_Block *mb;
293 ACE_NEW_NORETURN(mb, ACE_Message_Block(FILE_FRAME_SIZE));
294 if (reader_.read(*mb, mb->space(),
295 (this->block_count_ - 1) * FILE_FRAME_SIZE) != 0)
297 int errnr = ACE_OS::last_error ();
298 ACE_DEBUG(
299 (LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler read after write failed"), errnr));
300 mb->release();
302 else
304 this->read_pending_ = true;
307 #endif
310 //***************************************************************************
312 // Method: handle_time_out
314 // Description: Hook method called when a timer expires
316 // Inputs: time value, completion token passed to timer at scheduling
317 // The token tells us which timer we're handling
319 // Returns: none
321 //***************************************************************************
322 void
323 FileIOHandler::handle_time_out(const ACE_Time_Value & /*tv*/, const void * /*act*/)
325 // do not schedule more than 16 writes
326 if (this->block_count_ < 16)
328 // In our example, we send a bunch of data every time the timer expires
330 // setup the next payload
331 char payload[FILE_FRAME_SIZE];
332 for (int i=0; i<FILE_FRAME_SIZE ;++i)
334 payload[i] = (this->block_count_ * FILE_FRAME_SIZE) + i;
336 ACE_Message_Block *new_mb;
337 ACE_NEW_NORETURN(new_mb, ACE_Message_Block(FILE_FRAME_SIZE));
338 new_mb->copy(payload, FILE_FRAME_SIZE);
340 // queue up a write (append to end of file) operation, give visual feedback on success or failure.
341 if (this->writer_.write(*new_mb, new_mb->length(),
342 this->block_count_ * FILE_FRAME_SIZE) == 0)
344 ACE_DEBUG((LM_INFO, ACE_TEXT("Successfully queued write of %d bytes\n"), new_mb->length ())); // success
345 this->block_count_ ++; // next block
347 else
349 ACE_DEBUG((LM_ERROR, ACE_TEXT("FAILED to queue write operation\n"))); // failure
356 run_main(int /*argc*/, ACE_TCHAR * /*argv*/[])
358 ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
360 int rc = 0;
361 FileIOHandler fileIOHandler;
363 // Initialize the serial port handler
364 if (0 != fileIOHandler.Connect())
366 rc = 1;
368 else
370 ACE_DEBUG((LM_INFO, ACE_TEXT(" File I/O Handler connected.\n")));
372 // start the repeating timer for data transmission
374 ACE_Time_Value repeatTime(0, 50000); // 0.05 second time interval
375 ACE_Proactor::instance()->schedule_repeating_timer(fileIOHandler,
376 (void *) (100),
377 repeatTime);
379 // Run the Proactor
380 ACE_Proactor::instance()->proactor_run_event_loop();
383 ACE_END_TEST;
385 return rc;
388 #else
391 run_main (int, ACE_TCHAR *[])
393 ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
395 ACE_DEBUG ((LM_INFO,
396 ACE_TEXT ("Asynchronous IO is unsupported.\n")
397 ACE_TEXT ("Proactor_File_Test will not be run.\n")));
399 ACE_END_TEST;
401 return 0;
404 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */