1 // ============================================================================
3 * @file Proactor_File_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does asynchronous file IO
9 * @author Martin Corino <mcorino@remedy.nl>
11 // ============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
16 // This only works on Win32 platforms and on Unix platforms
17 // supporting POSIX aio calls.
19 //////////////////////////////////////////////////////////////////
21 // This sample application integrates asynch file
22 // read/write operations with the Proactor, using an ACE_FILE_Connector,
23 // ACE_FILE_IO, ACE_Asynch_Read_File, ACE_Asynch_Write_File in an ACE_Handler.
24 // The program sets up asynch read and write on a temporary file, and sends
25 // out 16 16-character bursts of data at timed intervals.
28 #include "ace/OS_NS_stdio.h"
29 #include "ace/OS_NS_unistd.h"
30 #include "ace/OS_NS_errno.h"
31 #include "ace/OS_NS_string.h"
32 #include "ace/OS_NS_sys_time.h"
34 #include "ace/FILE_Connector.h"
35 #include "ace/FILE_IO.h"
37 #include "ace/Proactor.h"
38 #include "ace/Asynch_Connector.h"
39 #include "ace/Time_Value.h"
42 // How long are our fake serial I/O frames?
43 #define FILE_FRAME_SIZE 16
45 class FileIOHandler
: public ACE_Handler
49 ~FileIOHandler () override
;
54 // This method will be called when an asynchronous read
55 // completes on a file.
57 handle_read_file(const ACE_Asynch_Read_File::Result
&result
) override
;
59 // This method will be called when an asynchronous write
60 // completes on a file.
62 handle_write_file(const ACE_Asynch_Write_File::Result
&result
) override
;
64 // Callback hook invoked by the Proactor's Timer_Queue.
66 handle_time_out(const ACE_Time_Value
&tv
, const void *arg
) override
;
68 // Our I/O objects; they're public so others can access them
69 ACE_Asynch_Read_File reader_
;
70 ACE_Asynch_Write_File writer_
;
73 #if defined (ACE_WIN32)
77 ACE_FILE_Connector connector_
;
80 FileIOHandler::FileIOHandler ()
83 #if defined (ACE_WIN32)
84 , read_pending_ (false)
89 FileIOHandler::~FileIOHandler ()
91 ACE_FILE_Addr tmp_addr
;
92 peer_
.get_local_addr (tmp_addr
);
93 if (tmp_addr
.get_path_name ())
99 //***************************************************************************
103 // Description: Establishes connection, primes read process
105 // Inputs: port name, port parameter block
109 //***************************************************************************
111 int FileIOHandler::Connect()
115 // create an empty temporary file for the test
116 if(connector_
.connect(peer_
,
117 ACE_sap_any_cast (ACE_FILE_Addr
&)) != 0)
119 ACE_ERROR((LM_ERROR
, ACE_TEXT("%p\n"),
120 ACE_TEXT("FileIOHandler connect failed to create file")));
124 // close opened file but leave it where it is
125 if (peer_
.close () != 0)
127 ACE_ERROR((LM_ERROR
, ACE_TEXT("%p\n"),
128 ACE_TEXT("FileIOHandler connect failed to close file")));
134 ACE_FILE_Addr tmp_addr
;
135 peer_
.get_local_addr (tmp_addr
);
137 // reopen new file for asynch IO
138 if(connector_
.connect(peer_
,
143 O_RDWR
|FILE_FLAG_OVERLAPPED
) != 0)
145 ACE_ERROR((LM_ERROR
, ACE_TEXT("%p\n"),
146 ACE_TEXT("FileIOHandler connect failed to open file")));
150 else // device connected successfully
152 // keep track of our writes for offset calculations (can't use O_APPEND since
153 // this is not supported for the Win32_Asynch implementation) and data verifications
154 this->block_count_
= 0; // start counting
156 // Set our I/O handle to that of the peer_ object handling our connection
157 handle(peer_
.get_handle());
159 if (writer_
.open(*this) != 0 || reader_
.open(*this) != 0)
162 (LM_ERROR
, ACE_TEXT("%p\n"), ACE_TEXT("FileIOHandler reader or writer open failed")));
165 else // reader and writer opened successfully
167 // Allocate a new message block and initiate a read operation on it
168 // to prime the asynchronous read pipeline
169 // The message block is sized for the largest message we expect
170 ACE_Message_Block
*mb
;
171 ACE_NEW_NORETURN(mb
, ACE_Message_Block(FILE_FRAME_SIZE
));
172 if (reader_
.read(*mb
, mb
->space()) != 0)
174 int errnr
= ACE_OS::last_error ();
176 (LM_INFO
, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler begin read failed"), errnr
));
178 #if defined (ACE_WIN32)
179 // On older Win32 versions (WinXP, Win2003/2008) asynch IO with disk files is not
180 // reliable and may perform sync IO in certain cases like when the read offset denotes
181 // current end of file. Instead of scheduling a write operation the read will immediately
182 // return with an EOF error.
183 // We circumvent that situation here by not reporting an error and scheduling a read operation
184 // later when we are sure data has been written at the offset in question (after the write finishes).
185 if (errnr
!= ERROR_HANDLE_EOF
)
189 #if defined (ACE_WIN32)
192 this->read_pending_
= true;
195 // If read worked, psMsg is now controlled by Proactor framework.
201 //***************************************************************************
203 // Method: handle_read_file
205 // Description: Callback used when a read completes
207 // Inputs: read file result structure containing message block
211 //***************************************************************************
213 FileIOHandler::handle_read_file(const ACE_Asynch_Read_File::Result
&result
)
215 ACE_Message_Block
&mb
= result
.message_block();
216 // If the read failed, queue up another one using the same message block
217 if (!result
.success() || result
.bytes_transferred() == 0)
219 //ACE_DEBUG((LM_INFO, ACE_TEXT("FileIOHandler receive timeout.\n")));
222 result
.offset () + result
.bytes_transferred ());
226 // We have a message block with some read data in it. Send it onward
227 ACE_DEBUG((LM_INFO
, ACE_TEXT("FileIOHandler received %d bytes of data at offset %d\n"),
228 result
.bytes_transferred(), result
.offset ()));
230 // TODO: Process this data in some meaningful way
231 if (result
.offset () != (unsigned long)*reinterpret_cast<unsigned char*> (mb
.rd_ptr ()))
233 ACE_DEBUG((LM_ERROR
, ACE_TEXT("FileIOHandler received incorrect data: got [%u] expected [%u]\n"),
234 *reinterpret_cast<unsigned char*> (mb
.rd_ptr ()), result
.offset ()));
237 // Release the message block when we're done with it
240 if ((result
.offset () + result
.bytes_transferred ()) < 256)
242 // Our processing is done; prime the read process again
243 ACE_Message_Block
*new_mb
;
244 ACE_NEW_NORETURN(new_mb
, ACE_Message_Block(FILE_FRAME_SIZE
));
245 if (reader_
.read(*new_mb
, new_mb
->space(),
246 result
.offset () + result
.bytes_transferred ()) != 0)
248 int errnr
= ACE_OS::last_error ();
250 (LM_INFO
, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler continuing read failed"), errnr
));
252 #if defined (ACE_WIN32)
253 this->read_pending_
= false;
257 this->read_pending_
= true;
263 // we have it all; stop the proactor
264 ACE_Proactor::instance ()->proactor_end_event_loop ();
269 //***************************************************************************
271 // Method: handle_write_file
273 // Description: Callback used when a write completes
275 // Inputs: write file result structure containing message block
279 //***************************************************************************
281 FileIOHandler::handle_write_file(const ACE_Asynch_Write_File::Result
&result
)
283 ACE_DEBUG((LM_INFO
, ACE_TEXT("Finished write\n")));
284 // When the write completes, we get the message block. It's been sent,
285 // so we just deallocate it.
286 result
.message_block().release();
287 #if defined (ACE_WIN32)
288 // to circumvent problems on older Win32 (see above) we schedule a read here if none
290 if (!this->read_pending_
)
292 ACE_Message_Block
*mb
;
293 ACE_NEW_NORETURN(mb
, ACE_Message_Block(FILE_FRAME_SIZE
));
294 if (reader_
.read(*mb
, mb
->space(),
295 (this->block_count_
- 1) * FILE_FRAME_SIZE
) != 0)
297 int errnr
= ACE_OS::last_error ();
299 (LM_INFO
, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler read after write failed"), errnr
));
304 this->read_pending_
= true;
310 //***************************************************************************
312 // Method: handle_time_out
314 // Description: Hook method called when a timer expires
316 // Inputs: time value, completion token passed to timer at scheduling
317 // The token tells us which timer we're handling
321 //***************************************************************************
323 FileIOHandler::handle_time_out(const ACE_Time_Value
& /*tv*/, const void * /*act*/)
325 // do not schedule more than 16 writes
326 if (this->block_count_
< 16)
328 // In our example, we send a bunch of data every time the timer expires
330 // setup the next payload
331 char payload
[FILE_FRAME_SIZE
];
332 for (int i
=0; i
<FILE_FRAME_SIZE
;++i
)
334 payload
[i
] = (this->block_count_
* FILE_FRAME_SIZE
) + i
;
336 ACE_Message_Block
*new_mb
;
337 ACE_NEW_NORETURN(new_mb
, ACE_Message_Block(FILE_FRAME_SIZE
));
338 new_mb
->copy(payload
, FILE_FRAME_SIZE
);
340 // queue up a write (append to end of file) operation, give visual feedback on success or failure.
341 if (this->writer_
.write(*new_mb
, new_mb
->length(),
342 this->block_count_
* FILE_FRAME_SIZE
) == 0)
344 ACE_DEBUG((LM_INFO
, ACE_TEXT("Successfully queued write of %d bytes\n"), new_mb
->length ())); // success
345 this->block_count_
++; // next block
349 ACE_DEBUG((LM_ERROR
, ACE_TEXT("FAILED to queue write operation\n"))); // failure
356 run_main(int /*argc*/, ACE_TCHAR
* /*argv*/[])
358 ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
361 FileIOHandler fileIOHandler
;
363 // Initialize the serial port handler
364 if (0 != fileIOHandler
.Connect())
370 ACE_DEBUG((LM_INFO
, ACE_TEXT(" File I/O Handler connected.\n")));
372 // start the repeating timer for data transmission
374 ACE_Time_Value
repeatTime(0, 50000); // 0.05 second time interval
375 ACE_Proactor::instance()->schedule_repeating_timer(fileIOHandler
,
380 ACE_Proactor::instance()->proactor_run_event_loop();
391 run_main (int, ACE_TCHAR
*[])
393 ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
396 ACE_TEXT ("Asynchronous IO is unsupported.\n")
397 ACE_TEXT ("Proactor_File_Test will not be run.\n")));
404 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */