1 #include "ace/Proactor.h"
2 #include "ace/Filecache.h"
3 #include "ace/OS_NS_unistd.h"
4 #include "ace/OS_NS_sys_socket.h"
6 #include "JAWS/Jaws_IO.h"
7 #include "JAWS/IO_Handler.h"
8 #include "JAWS/IO_Acceptor.h"
9 #include "JAWS/Data_Block.h"
10 #include "JAWS/Policy.h"
11 #include "JAWS/Waiter.h"
12 #include "JAWS/Filecache.h"
14 JAWS_Abstract_IO_Handler::~JAWS_Abstract_IO_Handler ()
18 JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory ()
23 JAWS_IO_Handler_Factory::create_io_handler ()
25 JAWS_TRACE ("JAWS_IO_Handler_Factory::create");
27 JAWS_IO_Handler
*handler
;
28 handler
= new JAWS_IO_Handler (this);
34 JAWS_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler
*handler
)
36 JAWS_TRACE ("JAWS_IO_Handler_Factory::destroy");
39 delete handler
->message_block ();
44 JAWS_IO_Handler::JAWS_IO_Handler (JAWS_IO_Handler_Factory
*factory
)
47 handle_ (ACE_INVALID_HANDLE
),
53 JAWS_IO_Handler::~JAWS_IO_Handler ()
60 ACE_OS::closesocket (this->handle_
);
61 this->handle_
= ACE_INVALID_HANDLE
;
65 JAWS_IO_Handler::accept_complete (ACE_HANDLE handle
)
67 // callback into pipeline task, notify that the accept has completed
68 this->handle_
= handle
;
69 this->status_
|= ACCEPT_OK
;
70 this->status_
&= (ACCEPT_OK
+1);
72 JAWS_Dispatch_Policy
*policy
= this->mb_
->policy ();
74 // Do this so that Thread Per Request can spawn a new thread
75 policy
->concurrency ()->activate_hook ();
79 JAWS_IO_Handler::accept_error ()
81 // callback into pipeline task, notify that the accept has failed
82 this->status_
|= ACCEPT_ERROR
;
83 this->status_
&= (ACCEPT_ERROR
+1);
87 JAWS_IO_Handler::read_complete (ACE_Message_Block
*data
)
89 ACE_UNUSED_ARG (data
);
90 // We can call back into the pipeline task at this point
91 // this->pipeline_->read_complete (data);
92 this->status_
|= READ_OK
;
93 this->status_
&= (READ_OK
+1);
97 JAWS_IO_Handler::read_error ()
99 // this->pipeline_->read_error ();
100 this->status_
|= READ_ERROR
;
101 this->status_
&= (READ_ERROR
+1);
105 JAWS_IO_Handler::transmit_file_complete ()
107 JAWS_TRACE ("JAWS_IO_Handler::transmit_file_complete");
108 // this->pipeline_->transmit_file_complete ();
109 this->status_
|= TRANSMIT_OK
;
110 this->status_
&= (TRANSMIT_OK
+1);
114 JAWS_IO_Handler::transmit_file_error (int result
)
116 JAWS_TRACE ("JAWS_IO_Handler::transmit_file_error");
117 ACE_UNUSED_ARG (result
);
118 // this->pipeline_->transmit_file_complete (result);
119 this->status_
|= TRANSMIT_ERROR
;
120 this->status_
&= (TRANSMIT_ERROR
+1);
124 JAWS_IO_Handler::receive_file_complete ()
126 this->status_
|= RECEIVE_OK
;
127 this->status_
&= (RECEIVE_OK
+1);
131 JAWS_IO_Handler::receive_file_error (int result
)
133 ACE_UNUSED_ARG(result
);
134 this->status_
|= RECEIVE_ERROR
;
135 this->status_
&= (RECEIVE_ERROR
+1);
139 JAWS_IO_Handler::write_error ()
141 ACE_DEBUG ((LM_DEBUG
, " (%t) error in writing response\n"));
143 this->status_
|= WRITE_ERROR
;
144 this->status_
&= (WRITE_ERROR
+1);
149 JAWS_IO_Handler::confirmation_message_complete ()
151 this->status_
|= WRITE_OK
;
152 this->status_
&= (WRITE_OK
+1);
156 JAWS_IO_Handler::error_message_complete ()
158 this->status_
|= WRITE_OK
;
159 this->status_
&= (WRITE_OK
+1);
162 JAWS_IO_Handler_Factory
*
163 JAWS_IO_Handler::factory ()
165 return this->factory_
;
169 JAWS_IO_Handler::handle () const
171 return this->handle_
;
175 JAWS_IO_Handler::task (JAWS_Pipeline_Handler
*ph
)
180 JAWS_Pipeline_Handler
*
181 JAWS_IO_Handler::task ()
187 JAWS_IO_Handler::message_block (JAWS_Data_Block
*mb
)
193 JAWS_IO_Handler::message_block ()
199 JAWS_IO_Handler::done ()
201 this->factory ()->destroy_io_handler (this);
205 JAWS_IO_Handler::status ()
207 return this->status_
;
211 JAWS_IO_Handler::idle ()
213 this->status_
&= (IDLE
+1);
217 JAWS_IO_Handler::acquire ()
222 JAWS_IO_Handler::lock ()
227 JAWS_IO_Handler::release ()
231 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
233 JAWS_Asynch_IO_Handler_Factory::~JAWS_Asynch_IO_Handler_Factory ()
238 JAWS_Asynch_IO_Handler_Factory::create_io_handler ()
240 JAWS_TRACE ("JAWS_Asynch_IO_Handler_Factory::create");
242 JAWS_Asynch_IO_Handler
*handler
= 0;
243 handler
= new JAWS_Asynch_IO_Handler (this);
249 JAWS_Asynch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler
*handler
)
251 JAWS_TRACE ("JAWS_IO_Handler_Factory::destroy");
255 //cerr << "(" << thr_self () << ") locking for destruction: " << handler << endl;
257 delete handler
->message_block ();
258 handler
->message_block (0);
264 JAWS_Asynch_IO_Handler::JAWS_Asynch_IO_Handler (JAWS_Asynch_IO_Handler_Factory
*factory
)
265 : JAWS_IO_Handler (factory
),
271 JAWS_Asynch_IO_Handler::~JAWS_Asynch_IO_Handler ()
273 delete this->handler_
;
278 JAWS_Asynch_IO_Handler::handler ()
280 return this->handler_
;
284 JAWS_Asynch_IO_Handler::acquire ()
286 //cerr << "(" << thr_self () << ") acquire handler: " << this << endl;
287 this->count_
.acquire_read ();
291 JAWS_Asynch_IO_Handler::lock ()
293 //cerr << "(" << thr_self () << ") locking handler: " << this << endl;
294 this->count_
.acquire_write ();
298 JAWS_Asynch_IO_Handler::release ()
300 //cerr << "(" << thr_self () << ") release handler: " << this << endl;
301 this->count_
.release ();
304 JAWS_Asynch_Handler::JAWS_Asynch_Handler ()
307 this->proactor (ACE_Proactor::instance ());
310 JAWS_Asynch_Handler::~JAWS_Asynch_Handler ()
315 JAWS_Asynch_Handler::open (ACE_HANDLE h
,
316 ACE_Message_Block
&mb
)
318 JAWS_TRACE ("JAWS_Asynch_Handler::open");
320 // This currently does nothing, but just in case.
321 ACE_Service_Handler::open (h
, mb
);
323 // ioh_ set from the ACT hopefully
324 //this->dispatch_handler ();
326 #if !defined (ACE_WIN32)
327 // Assume at this point there is no data.
328 mb
.rd_ptr (mb
.wr_ptr ());
331 // AcceptEx reads some initial data from the socket.
332 this->handler ()->message_block ()->copy (mb
.rd_ptr (), mb
.length ());
335 ACE_Asynch_Accept_Result_Impl
*fake_result
336 = ACE_Proactor::instance ()->create_asynch_accept_result
337 (this->proxy (), JAWS_IO_Asynch_Acceptor_Singleton::instance ()->get_handle (),
338 h
, mb
, JAWS_Data_Block::JAWS_DATA_BLOCK_SIZE
,
339 this->ioh_
, ACE_INVALID_HANDLE
, 0);
341 this->handler ()->handler_
= this;
343 fake_result
->complete (0, 1, 0);
347 JAWS_Asynch_Handler::act (const void *act_ref
)
349 JAWS_TRACE ("JAWS_Asynch_Handler::act");
351 // Set the ioh from the act
352 this->ioh_
= (JAWS_Asynch_IO_Handler
*) act_ref
;
357 JAWS_Asynch_Handler::handle () const
359 return this->ioh_
->handle ();
364 JAWS_Asynch_Handler::dispatch_handler ()
367 // A future version of ACE will support this.
368 ACE_Thread_ID tid
= ACE_OS::thr_self ();
370 // Do it this way for now
371 ACE_thread_t thr_name
;
372 thr_name
= ACE_OS::thr_self ();
374 JAWS_Thread_ID
tid (thr_name
);
377 JAWS_IO_Handler
**iohref
= JAWS_Waiter_Singleton::instance ()->find (tid
);
379 *iohref
= this->handler ();
383 JAWS_Asynch_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result
386 JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream");
388 this->dispatch_handler ();
390 if (result
.act () != 0)
392 // This callback is for io->receive_file()
393 JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream (recv_file)");
396 if (result
.success () && result
.bytes_transferred () != 0)
398 if (result
.message_block ().length ()
399 == result
.message_block ().size ())
400 code
= ACE_Filecache_Handle::ACE_SUCCESS
;
403 ACE_Asynch_Read_Stream ar
;
404 if (ar
.open (*this, this->handler ()->handle ()) == -1
405 || ar
.read (result
.message_block (),
406 result
.message_block ().size ()
407 - result
.message_block ().length (),
408 result
.act ()) == -1)
417 if (code
== ACE_Filecache_Handle::ACE_SUCCESS
)
418 this->handler ()->receive_file_complete ();
420 this->handler ()->receive_file_error (code
);
422 result
.message_block ().release ();
423 delete (ACE_Filecache_Handle
*) result
.act ();
427 // This callback is for this->read()
428 JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream (read)");
430 if (result
.success ()
431 && result
.bytes_transferred () != 0)
432 this->handler ()->read_complete (&result
.message_block ());
434 this->handler ()->read_error ();
439 JAWS_Asynch_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result
442 this->dispatch_handler ();
444 result
.message_block ().release ();
446 if (result
.act () == (void *) JAWS_Asynch_IO::CONFIRMATION
)
447 this->handler ()->confirmation_message_complete ();
449 this->handler ()->error_message_complete ();
453 JAWS_Asynch_Handler::handle_transmit_file (const
454 ACE_Asynch_Transmit_File::Result
457 this->dispatch_handler ();
459 if (result
.success ())
460 this->handler ()->transmit_file_complete ();
462 this->handler ()->transmit_file_error (-1);
464 result
.header_and_trailer ()->header ()->release ();
465 result
.header_and_trailer ()->trailer ()->release ();
466 delete result
.header_and_trailer ();
467 delete (JAWS_Cached_FILE
*) result
.act ();
471 JAWS_Asynch_Handler::handle_accept (const ACE_Asynch_Accept::Result
&result
)
473 JAWS_TRACE ("JAWS_Asynch_Handler::handle_accept");
474 this->dispatch_handler ();
476 if (result
.success ())
478 JAWS_TRACE ("JAWS_Asynch_Handler::handle_accept, success");
479 this->handler ()->accept_complete (result
.accept_handle ());
482 this->handler ()->accept_error ();
486 JAWS_Asynch_Handler::handler (JAWS_Asynch_IO_Handler
*ioh
)
491 JAWS_Asynch_IO_Handler
*
492 JAWS_Asynch_Handler::handler ()
497 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */