bumping version to 3.5-rc1
[supercollider.git] / server / supernova / sc / sc_osc_handler.hpp
blob9b1d1d375701755604177cab85d7c6987f77e67f
1 // osc handler for supercollider-style communication
2 // Copyright (C) 2009, 2010 Tim Blechmann
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef SERVER_SC_OSC_HANDLER_HPP
20 #define SERVER_SC_OSC_HANDLER_HPP
22 #include <vector>
23 #include <algorithm>
25 #include <boost/date_time/microsec_time_clock.hpp>
26 #include <boost/intrusive/treap_set.hpp>
28 #include "osc/OscReceivedElements.h"
30 #include "../server/dynamic_endpoint.hpp"
31 #include "../server/memory_pool.hpp"
32 #include "../server/server_args.hpp"
33 #include "../server/server_scheduler.hpp"
34 #include "../utilities/osc_server.hpp"
35 #include "../utilities/sized_array.hpp"
36 #include "../utilities/static_pool.hpp"
37 #include "../utilities/time_tag.hpp"
39 namespace nova
41 typedef bool (*AsyncStageFn)(World *inWorld, void* cmdData);
42 typedef void (*AsyncFreeFn)(World *inWorld, void* cmdData);
44 namespace detail
46 using namespace boost::asio;
47 using namespace boost::asio::ip;
50 /**
51 * observer to receive osc notifications
53 * \todo shall we use a separate thread for observer notifications?
54 * */
55 class sc_notify_observers
57 typedef std::vector<nova_endpoint> observer_vector;
59 public:
60 sc_notify_observers(boost::asio::io_service & io_service):
61 udp_socket(io_service), tcp_socket(io_service)
64 void add_observer(nova_endpoint const & ep)
66 observers.push_back(ep);
69 void remove_observer(nova_endpoint const & ep)
71 observer_vector::iterator it = std::find(observers.begin(),
72 observers.end(), ep);
73 assert (it != observers.end());
75 observers.erase(it);
79 /* @{ */
80 /** notifications, should be called from the real-time thread */
81 void notification_node_started(const server_node * node)
83 notify("/n_go", node);
86 void notification_node_ended(const server_node * node)
88 notify("/n_end", node);
91 void notification_node_turned_off(const server_node * node)
93 notify("/n_off", node);
96 void notification_node_turned_on(const server_node * node)
98 notify("/n_on", node);
101 void notification_node_moved(const server_node * node)
103 notify("/n_move", node);
106 void send_trigger(int32_t node_id, int32_t trigger_id, float value);
108 void send_node_reply(int32_t node_id, int reply_id, const char* command_name, int argument_count, const float* values);
109 /* @} */
111 /** send notifications, should not be called from the real-time thread */
112 void send_notification(const char * data, size_t length);
114 /* @{ */
115 /** sending functions */
116 void send(const char * data, size_t size, nova_endpoint const & endpoint)
118 nova_protocol prot = endpoint.protocol();
119 if (prot.family() == AF_INET && prot.type() == SOCK_DGRAM)
121 udp::endpoint ep(endpoint.address(), endpoint.port());
122 send_udp(data, size, ep);
124 else if (prot.family() == AF_INET && prot.type() == SOCK_STREAM)
126 tcp::endpoint ep(endpoint.address(), endpoint.port());
127 send_tcp(data, size, ep);
131 void send_udp(const char * data, unsigned int size, udp::endpoint const & receiver)
133 boost::mutex::scoped_lock lock(udp_mutex);
134 sc_notify_observers::udp_socket.send_to(boost::asio::buffer(data, size), receiver);
137 void send_tcp(const char * data, unsigned int size, tcp::endpoint const & receiver)
139 boost::mutex::scoped_lock lock(tcp_mutex);
140 tcp_socket.connect(receiver);
141 boost::asio::write(tcp_socket, boost::asio::buffer(data, size));
143 /* @} */
145 private:
146 void notify(const char * address_pattern, const server_node * node);
147 void send_notification(const char * data, size_t length, nova_endpoint const & endpoint);
149 observer_vector observers;
151 protected:
152 udp::socket udp_socket;
153 tcp::socket tcp_socket;
154 boost::mutex udp_mutex, tcp_mutex;
157 class sc_scheduled_bundles
159 public:
160 struct bundle_node:
161 public boost::intrusive::bs_set_base_hook<>
163 bundle_node(time_tag const & timeout, const char * data, nova_endpoint const & endpoint):
164 timeout_(timeout), data_(data), endpoint_(endpoint)
167 void run(void);
169 const time_tag timeout_;
170 const char * const data_;
171 const nova_endpoint endpoint_;
173 friend bool operator< (const bundle_node & lhs, const bundle_node & rhs)
175 return priority_order(lhs, rhs);
178 friend bool priority_order (const bundle_node & lhs, const bundle_node & rhs)
180 return lhs.timeout_ < rhs.timeout_; // lower value, higher priority
184 typedef boost::intrusive::treap_multiset<bundle_node> bundle_queue_t;
186 void insert_bundle(time_tag const & timeout, const char * data, size_t length,
187 nova_endpoint const & endpoint);
189 void execute_bundles(time_tag const & last, time_tag const & now);
191 void clear_bundles(void)
193 bundle_q.clear_and_dispose(dispose_bundle);
196 static void dispose_bundle(bundle_node * node)
198 node->~bundle_node();
199 rt_pool.free(node);
202 private:
203 bundle_queue_t bundle_q;
206 class sc_osc_handler:
207 private detail::network_thread,
208 public sc_notify_observers
210 /* @{ */
211 /** constructor helpers */
212 void open_tcp_acceptor(tcp const & protocol, unsigned int port);
213 void open_udp_socket(udp const & protocol, unsigned int port);
214 bool open_socket(int family, int type, int protocol, unsigned int port);
215 /* @} */
217 public:
218 sc_osc_handler(server_arguments const & args):
219 sc_notify_observers(detail::network_thread::io_service_),
220 dump_osc_packets(0), error_posting(1), quit_received(false),
221 tcp_acceptor_(detail::network_thread::io_service_),
222 tcp_password_(args.server_password.c_str())
224 if (args.tcp_port && !open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, args.tcp_port))
225 throw std::runtime_error("cannot open socket");
226 if (args.udp_port && !open_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, args.udp_port))
227 throw std::runtime_error("cannot open socket");
230 void start_receive_thread(void)
232 detail::network_thread::start_receive();
235 ~sc_osc_handler(void)
238 typedef osc::ReceivedPacket osc_received_packet;
239 typedef osc::ReceivedBundle received_bundle;
240 typedef osc::ReceivedMessage received_message;
242 struct received_packet:
243 public audio_sync_callback
245 received_packet(const char * dat, size_t length, nova_endpoint const & endpoint):
246 data(dat), length(length), endpoint_(endpoint)
249 void * operator new(std::size_t size, void* ptr)
251 return ::operator new(size, ptr);
254 static received_packet * alloc_packet(const char * data, size_t length,
255 nova_endpoint const & remote_endpoint);
257 void run(void);
259 const char * const data;
260 const size_t length;
261 const nova_endpoint endpoint_;
264 private:
265 /* @{ */
266 /** udp socket handling */
267 void start_receive_udp(void)
269 sc_notify_observers::udp_socket.async_receive_from(
270 buffer(recv_buffer_), udp_remote_endpoint_,
271 boost::bind(&sc_osc_handler::handle_receive_udp, this,
272 placeholders::error, placeholders::bytes_transferred));
275 void handle_receive_udp(const boost::system::error_code& error,
276 std::size_t bytes_transferred);
277 /* @} */
279 /* @{ */
280 /** tcp connection handling */
281 class tcp_connection:
282 public boost::enable_shared_from_this<tcp_connection>
284 public:
285 typedef boost::shared_ptr<tcp_connection> pointer;
287 static pointer create(boost::asio::io_service& io_service)
289 return pointer(new tcp_connection(io_service));
292 tcp::socket& socket()
294 return socket_;
297 void start(sc_osc_handler * self);
299 private:
300 tcp_connection(boost::asio::io_service& io_service)
301 : socket_(io_service)
304 tcp::socket socket_;
307 void start_accept(void)
309 tcp_connection::pointer new_connection = tcp_connection::create(tcp_acceptor_.get_io_service());
311 tcp_acceptor_.async_accept(new_connection->socket(),
312 boost::bind(&sc_osc_handler::handle_accept, this, new_connection,
313 boost::asio::placeholders::error));
316 void handle_accept(tcp_connection::pointer new_connection,
317 const boost::system::error_code& error)
319 if (!error)
321 new_connection->start(this);
322 start_accept();
325 /* @} */
327 public:
328 void dumpOSC(int i)
330 dump_osc_packets = i;
333 private:
334 int dump_osc_packets;
336 /* @{ */
337 public:
338 /** \todo how to handle temporary message error suppression? */
340 void set_error_posting(int val)
342 error_posting = val;
345 private:
346 int error_posting;
347 /* @} */
349 /* @{ */
350 /** packet handling */
351 public:
352 void handle_packet_async(const char* data, size_t length, nova::nova_endpoint const & endpoint);
353 void handle_packet(const char* data, size_t length, nova::nova_endpoint const & endpoint);
354 time_tag handle_bundle_nrt(const char * data_, std::size_t length);
356 private:
357 template <bool realtime>
358 void handle_bundle(received_bundle const & bundle, nova_endpoint const & endpoint);
359 template <bool realtime>
360 void handle_message(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
361 template <bool realtime>
362 void handle_message_int_address(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
363 template <bool realtime>
364 void handle_message_sym_address(received_message const & message, size_t msg_size, nova_endpoint const & endpoint);
366 friend class sc_scheduled_bundles::bundle_node;
367 /* @} */
369 /* @{ */
370 /** bundle scheduling */
371 public:
372 void clear_scheduled_bundles(void)
374 scheduled_bundles.clear_bundles();
377 void execute_scheduled_bundles(void)
379 scheduled_bundles.execute_bundles(last, now);
382 void increment_logical_time(time_tag const & diff)
384 last = now;
385 now += diff;
388 void update_time_from_system(void)
390 now = time_tag::from_ptime(boost::date_time::microsec_clock<boost::posix_time::ptime>::universal_time());
391 last = now - time_per_tick;
394 time_tag const & current_time(void) const
396 return now;
399 sc_scheduled_bundles scheduled_bundles;
400 time_tag now, last;
401 time_tag time_per_tick;
402 /* @} */
405 void do_asynchronous_command(World * world, void* replyAddr, const char* cmdName, void *cmdData,
406 AsyncStageFn stage2, AsyncStageFn stage3, AsyncStageFn stage4, AsyncFreeFn cleanup,
407 int completionMsgSize, void* completionMsgData);
409 bool quit_received;
411 private:
412 /* @{ */
413 /* udp::socket udp_socket_;*/
414 udp::endpoint udp_remote_endpoint_;
416 tcp::acceptor tcp_acceptor_;
417 const char * tcp_password_; /* we are not owning this! */
419 boost::array<char, 1<<15 > recv_buffer_;
421 std::vector<char> overflow_vector;
422 /* @} */
425 } /* namespace detail */
427 using detail::sc_osc_handler;
429 } /* namespace nova */
432 #endif /* SERVER_SC_OSC_HANDLER_HPP */