supernova: include missing header
[supercollider.git] / server / supernova / sc / sc_osc_handler.hpp
blob16fa74c187932a9eb61ac8a8b83eb3a0b1e6ab40
1 // osc handler for supercollider-style communication
2 // Copyright (C) 2009, 2010 Tim Blechmann
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef SERVER_SC_OSC_HANDLER_HPP
20 #define SERVER_SC_OSC_HANDLER_HPP
22 #include <vector>
23 #include <algorithm>
25 #include <boost/enable_shared_from_this.hpp>
26 #include <boost/date_time/microsec_time_clock.hpp>
27 #include <boost/intrusive/treap_set.hpp>
29 #include "osc/OscReceivedElements.h"
31 #include "../server/dynamic_endpoint.hpp"
32 #include "../server/memory_pool.hpp"
33 #include "../server/server_args.hpp"
34 #include "../server/server_scheduler.hpp"
35 #include "../utilities/osc_server.hpp"
36 #include "../utilities/sized_array.hpp"
37 #include "../utilities/static_pool.hpp"
38 #include "../utilities/time_tag.hpp"
40 namespace nova
42 typedef bool (*AsyncStageFn)(World *inWorld, void* cmdData);
43 typedef void (*AsyncFreeFn)(World *inWorld, void* cmdData);
45 namespace detail
47 using namespace boost::asio;
48 using namespace boost::asio::ip;
51 /**
52 * observer to receive osc notifications
54 * \todo shall we use a separate thread for observer notifications?
55 * */
56 class sc_notify_observers
58 typedef std::vector<nova_endpoint> observer_vector;
60 public:
61 sc_notify_observers(boost::asio::io_service & io_service):
62 udp_socket(io_service), tcp_socket(io_service)
65 void add_observer(nova_endpoint const & ep)
67 observers.push_back(ep);
70 void remove_observer(nova_endpoint const & ep)
72 observer_vector::iterator it = std::find(observers.begin(),
73 observers.end(), ep);
74 assert (it != observers.end());
76 observers.erase(it);
80 /* @{ */
81 /** notifications, should be called from the real-time thread */
82 void notification_node_started(const server_node * node)
84 notify("/n_go", node);
87 void notification_node_ended(const server_node * node)
89 notify("/n_end", node);
92 void notification_node_turned_off(const server_node * node)
94 notify("/n_off", node);
97 void notification_node_turned_on(const server_node * node)
99 notify("/n_on", node);
102 void notification_node_moved(const server_node * node)
104 notify("/n_move", node);
107 void send_trigger(int32_t node_id, int32_t trigger_id, float value);
109 void send_node_reply(int32_t node_id, int reply_id, const char* command_name, int argument_count, const float* values);
110 /* @} */
112 /** send notifications, should not be called from the real-time thread */
113 void send_notification(const char * data, size_t length);
115 /* @{ */
116 /** sending functions */
117 void send(const char * data, size_t size, nova_endpoint const & endpoint)
119 nova_protocol prot = endpoint.protocol();
120 if (prot.family() == AF_INET && prot.type() == SOCK_DGRAM)
122 udp::endpoint ep(endpoint.address(), endpoint.port());
123 send_udp(data, size, ep);
125 else if (prot.family() == AF_INET && prot.type() == SOCK_STREAM)
127 tcp::endpoint ep(endpoint.address(), endpoint.port());
128 send_tcp(data, size, ep);
132 void send_udp(const char * data, unsigned int size, udp::endpoint const & receiver)
134 boost::mutex::scoped_lock lock(udp_mutex);
135 sc_notify_observers::udp_socket.send_to(boost::asio::buffer(data, size), receiver);
138 void send_tcp(const char * data, unsigned int size, tcp::endpoint const & receiver)
140 boost::mutex::scoped_lock lock(tcp_mutex);
141 tcp_socket.connect(receiver);
142 boost::asio::write(tcp_socket, boost::asio::buffer(data, size));
144 /* @} */
146 private:
147 void notify(const char * address_pattern, const server_node * node);
148 void send_notification(const char * data, size_t length, nova_endpoint const & endpoint);
150 observer_vector observers;
152 protected:
153 udp::socket udp_socket;
154 tcp::socket tcp_socket;
155 boost::mutex udp_mutex, tcp_mutex;
158 class sc_scheduled_bundles
160 public:
161 struct bundle_node:
162 public boost::intrusive::bs_set_base_hook<>
164 bundle_node(time_tag const & timeout, const char * data, nova_endpoint const & endpoint):
165 timeout_(timeout), data_(data), endpoint_(endpoint)
168 void run(void);
170 const time_tag timeout_;
171 const char * const data_;
172 const nova_endpoint endpoint_;
174 friend bool operator< (const bundle_node & lhs, const bundle_node & rhs)
176 return priority_order(lhs, rhs);
179 friend bool priority_order (const bundle_node & lhs, const bundle_node & rhs)
181 return lhs.timeout_ < rhs.timeout_; // lower value, higher priority
185 typedef boost::intrusive::treap_multiset<bundle_node> bundle_queue_t;
187 void insert_bundle(time_tag const & timeout, const char * data, size_t length,
188 nova_endpoint const & endpoint);
190 void execute_bundles(time_tag const & last, time_tag const & now);
192 void clear_bundles(void)
194 bundle_q.clear_and_dispose(dispose_bundle);
197 static void dispose_bundle(bundle_node * node)
199 node->~bundle_node();
200 rt_pool.free(node);
203 private:
204 bundle_queue_t bundle_q;
207 class sc_osc_handler:
208 private detail::network_thread,
209 public sc_notify_observers
211 /* @{ */
212 /** constructor helpers */
213 void open_tcp_acceptor(tcp const & protocol, unsigned int port);
214 void open_udp_socket(udp const & protocol, unsigned int port);
215 bool open_socket(int family, int type, int protocol, unsigned int port);
216 /* @} */
218 public:
219 sc_osc_handler(server_arguments const & args):
220 sc_notify_observers(detail::network_thread::io_service_),
221 dump_osc_packets(0), error_posting(1), quit_received(false),
222 tcp_acceptor_(detail::network_thread::io_service_),
223 tcp_password_(args.server_password.c_str())
225 if (args.tcp_port && !open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, args.tcp_port))
226 throw std::runtime_error("cannot open socket");
227 if (args.udp_port && !open_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, args.udp_port))
228 throw std::runtime_error("cannot open socket");
231 void start_receive_thread(void)
233 detail::network_thread::start_receive();
236 ~sc_osc_handler(void)
239 typedef osc::ReceivedPacket osc_received_packet;
240 typedef osc::ReceivedBundle received_bundle;
241 typedef osc::ReceivedMessage received_message;
243 struct received_packet:
244 public audio_sync_callback
246 received_packet(const char * dat, size_t length, nova_endpoint const & endpoint):
247 data(dat), length(length), endpoint_(endpoint)
250 void * operator new(std::size_t size, void* ptr)
252 return ::operator new(size, ptr);
255 static received_packet * alloc_packet(const char * data, size_t length,
256 nova_endpoint const & remote_endpoint);
258 void run(void);
260 const char * const data;
261 const size_t length;
262 const nova_endpoint endpoint_;
265 private:
266 /* @{ */
267 /** udp socket handling */
268 void start_receive_udp(void)
270 sc_notify_observers::udp_socket.async_receive_from(
271 buffer(recv_buffer_), udp_remote_endpoint_,
272 boost::bind(&sc_osc_handler::handle_receive_udp, this,
273 placeholders::error, placeholders::bytes_transferred));
276 void handle_receive_udp(const boost::system::error_code& error,
277 std::size_t bytes_transferred);
278 /* @} */
280 /* @{ */
281 /** tcp connection handling */
282 class tcp_connection:
283 public boost::enable_shared_from_this<tcp_connection>
285 public:
286 typedef boost::shared_ptr<tcp_connection> pointer;
288 static pointer create(boost::asio::io_service& io_service)
290 return pointer(new tcp_connection(io_service));
293 tcp::socket& socket()
295 return socket_;
298 void start(sc_osc_handler * self);
300 private:
301 tcp_connection(boost::asio::io_service& io_service)
302 : socket_(io_service)
305 tcp::socket socket_;
308 void start_accept(void)
310 tcp_connection::pointer new_connection = tcp_connection::create(tcp_acceptor_.get_io_service());
312 tcp_acceptor_.async_accept(new_connection->socket(),
313 boost::bind(&sc_osc_handler::handle_accept, this, new_connection,
314 boost::asio::placeholders::error));
317 void handle_accept(tcp_connection::pointer new_connection,
318 const boost::system::error_code& error)
320 if (!error)
322 new_connection->start(this);
323 start_accept();
326 /* @} */
328 public:
329 void dumpOSC(int i)
331 dump_osc_packets = i;
334 private:
335 int dump_osc_packets;
337 /* @{ */
338 public:
339 /** \todo how to handle temporary message error suppression? */
341 void set_error_posting(int val)
343 error_posting = val;
346 private:
347 int error_posting;
348 /* @} */
350 /* @{ */
351 /** packet handling */
352 public:
353 void handle_packet_async(const char* data, size_t length, nova::nova_endpoint const & endpoint);
354 void handle_packet(const char* data, size_t length, nova::nova_endpoint const & endpoint);
355 time_tag handle_bundle_nrt(const char * data_, std::size_t length);
357 private:
358 template <bool realtime>
359 void handle_bundle(received_bundle const & bundle, nova_endpoint const & endpoint);
360 template <bool realtime>
361 void handle_message(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
362 template <bool realtime>
363 void handle_message_int_address(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
364 template <bool realtime>
365 void handle_message_sym_address(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
367 friend class sc_scheduled_bundles::bundle_node;
368 /* @} */
370 /* @{ */
371 /** bundle scheduling */
372 public:
373 void clear_scheduled_bundles(void)
375 scheduled_bundles.clear_bundles();
378 void execute_scheduled_bundles(void)
380 scheduled_bundles.execute_bundles(last, now);
383 void increment_logical_time(time_tag const & diff)
385 last = now;
386 now += diff;
389 void update_time_from_system(void)
391 now = time_tag::from_ptime(boost::date_time::microsec_clock<boost::posix_time::ptime>::universal_time());
392 last = now - time_per_tick;
395 time_tag const & current_time(void) const
397 return now;
400 sc_scheduled_bundles scheduled_bundles;
401 time_tag now, last;
402 time_tag time_per_tick;
403 /* @} */
406 void do_asynchronous_command(World * world, void* replyAddr, const char* cmdName, void *cmdData,
407 AsyncStageFn stage2, AsyncStageFn stage3, AsyncStageFn stage4, AsyncFreeFn cleanup,
408 int completionMsgSize, void* completionMsgData);
410 bool quit_received;
412 private:
413 /* @{ */
414 /* udp::socket udp_socket_;*/
415 udp::endpoint udp_remote_endpoint_;
417 tcp::acceptor tcp_acceptor_;
418 const char * tcp_password_; /* we are not owning this! */
420 boost::array<char, 1<<15 > recv_buffer_;
422 std::vector<char> overflow_vector;
423 /* @} */
426 } /* namespace detail */
428 using detail::sc_osc_handler;
430 } /* namespace nova */
433 #endif /* SERVER_SC_OSC_HANDLER_HPP */