Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / ASX / UPIPE_Event_Server / Consumer_Router.cpp
blob14f92f992820989849fcec8a9089f36a202ac382
1 #include "ace/OS_NS_stdio.h"
2 #include "ace/OS_NS_string.h"
3 #include "ace/Truncate.h"
4 #include "Consumer_Router.h"
5 #include "Options.h"
8 #if defined (ACE_HAS_THREADS)
10 typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
12 int
13 Consumer_Handler::open (void *a)
15 CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
16 this->router_task_ = af->router ();
17 return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
20 Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
21 : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
25 // Create a new handler that will interact with a consumer and point
26 // its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
28 Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
29 : CONSUMER_ROUTER (tm)
33 // Initialize the Router..
35 int
36 Consumer_Router::open (void *)
38 ACE_ASSERT (this->is_reader ());
39 ACE_TCHAR *argv[3];
41 argv[0] = (ACE_TCHAR *) this->name ();
42 argv[1] = (ACE_TCHAR *) options.consumer_file ();
43 argv[2] = 0;
45 if (this->init (1, &argv[1]) == -1)
46 return -1;
48 // Make this an active object.
49 // return this->activate (options.t_flags ());
51 // Until that's done, return 1 to indicate that the object wasn't activated.
52 return 1;
55 int
56 Consumer_Router::close (u_long)
58 ACE_ASSERT (this->is_reader ());
59 this->peer_map_.close ();
60 this->msg_queue ()->deactivate();
61 return 0;
65 // Handle incoming messages in a separate thread..
67 int
68 Consumer_Router::svc ()
70 ACE_Message_Block *mb = 0;
72 ACE_ASSERT (this->is_reader ());
74 if (options.debug ())
75 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting svc in %s\n"),
76 this->name ()));
78 while (this->getq (mb) > 0)
79 if (this->put_next (mb) == -1)
80 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) put_next failed in %s\n"),
81 this->name ()), -1);
83 return 0;
84 // Note the implicit ACE_OS::thr_exit() via destructor.
87 // Send a MESSAGE_BLOCK to the supplier(s)..
89 int
90 Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
92 ACE_ASSERT (this->is_reader ());
94 if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
96 this->control (mb);
97 return this->put_next (mb);
99 else
101 //printf("consumer-Router is routing : send_peers\n");
102 return this->send_peers (mb);
106 // Return information about the Client_Router ACE_Module..
109 Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
111 ACE_TCHAR buf[BUFSIZ];
112 ACE_UPIPE_Addr addr;
113 const ACE_TCHAR *module_name = this->name ();
114 ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_;
116 if (sa.get_local_addr (addr) == -1)
117 return -1;
119 ACE_OS::sprintf (buf,
120 ACE_TEXT ("%") ACE_TEXT_PRIs
121 ACE_TEXT ("\t %") ACE_TEXT_PRIs
122 ACE_TEXT ("/ %") ACE_TEXT_PRIs,
123 module_name, ACE_TEXT ("upipe"),
124 ACE_TEXT ("# consumer router\n"));
126 if (*strp == 0 && (*strp = ACE_OS::strdup (module_name)) == 0)
127 return -1;
128 else
129 ACE_OS::strncpy (*strp, module_name, length);
131 return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (module_name));
134 #endif /* ACE_HAS_THREADS */