1 // osc handler for supercollider-style communication
2 // Copyright (C) 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef SERVER_SC_OSC_HANDLER_HPP
20 #define SERVER_SC_OSC_HANDLER_HPP
25 #include <boost/date_time/microsec_time_clock.hpp>
26 #include <boost/intrusive/treap_set.hpp>
28 #include "osc/OscReceivedElements.h"
30 #include "../server/dynamic_endpoint.hpp"
31 #include "../server/memory_pool.hpp"
32 #include "../server/server_args.hpp"
33 #include "../server/server_scheduler.hpp"
34 #include "../utilities/osc_server.hpp"
35 #include "../utilities/sized_array.hpp"
36 #include "../utilities/static_pool.hpp"
37 #include "../utilities/time_tag.hpp"
41 typedef bool (*AsyncStageFn
)(World
*inWorld
, void* cmdData
);
42 typedef void (*AsyncFreeFn
)(World
*inWorld
, void* cmdData
);
46 using namespace boost::asio
;
47 using namespace boost::asio::ip
;
51 * observer to receive osc notifications
53 * \todo shall we use a separate thread for observer notifications?
55 class sc_notify_observers
57 typedef std::vector
<nova_endpoint
> observer_vector
;
60 sc_notify_observers(boost::asio::io_service
& io_service
):
61 udp_socket(io_service
), tcp_socket(io_service
)
64 void add_observer(nova_endpoint
const & ep
)
66 observers
.push_back(ep
);
69 void remove_observer(nova_endpoint
const & ep
)
71 observer_vector::iterator it
= std::find(observers
.begin(),
73 assert (it
!= observers
.end());
80 /** notifications, should be called from the real-time thread */
81 void notification_node_started(const server_node
* node
)
83 notify("/n_go", node
);
86 void notification_node_ended(const server_node
* node
)
88 notify("/n_end", node
);
91 void notification_node_turned_off(const server_node
* node
)
93 notify("/n_off", node
);
96 void notification_node_turned_on(const server_node
* node
)
98 notify("/n_on", node
);
101 void notification_node_moved(const server_node
* node
)
103 notify("/n_move", node
);
106 void send_trigger(int32_t node_id
, int32_t trigger_id
, float value
);
108 void send_node_reply(int32_t node_id
, int reply_id
, const char* command_name
, int argument_count
, const float* values
);
111 /** send notifications, should not be called from the real-time thread */
112 void send_notification(const char * data
, size_t length
);
115 /** sending functions */
116 void send(const char * data
, size_t size
, nova_endpoint
const & endpoint
)
118 nova_protocol prot
= endpoint
.protocol();
119 if (prot
.family() == AF_INET
&& prot
.type() == SOCK_DGRAM
)
121 udp::endpoint
ep(endpoint
.address(), endpoint
.port());
122 send_udp(data
, size
, ep
);
124 else if (prot
.family() == AF_INET
&& prot
.type() == SOCK_STREAM
)
126 tcp::endpoint
ep(endpoint
.address(), endpoint
.port());
127 send_tcp(data
, size
, ep
);
131 void send_udp(const char * data
, unsigned int size
, udp::endpoint
const & receiver
)
133 boost::mutex::scoped_lock
lock(udp_mutex
);
134 sc_notify_observers::udp_socket
.send_to(boost::asio::buffer(data
, size
), receiver
);
137 void send_tcp(const char * data
, unsigned int size
, tcp::endpoint
const & receiver
)
139 boost::mutex::scoped_lock
lock(tcp_mutex
);
140 tcp_socket
.connect(receiver
);
141 boost::asio::write(tcp_socket
, boost::asio::buffer(data
, size
));
146 void notify(const char * address_pattern
, const server_node
* node
);
147 void send_notification(const char * data
, size_t length
, nova_endpoint
const & endpoint
);
149 observer_vector observers
;
152 udp::socket udp_socket
;
153 tcp::socket tcp_socket
;
154 boost::mutex udp_mutex
, tcp_mutex
;
157 class sc_scheduled_bundles
161 public boost::intrusive::bs_set_base_hook
<>
163 bundle_node(time_tag
const & timeout
, const char * data
, nova_endpoint
const & endpoint
):
164 timeout_(timeout
), data_(data
), endpoint_(endpoint
)
169 const time_tag timeout_
;
170 const char * const data_
;
171 const nova_endpoint endpoint_
;
173 friend bool operator< (const bundle_node
& lhs
, const bundle_node
& rhs
)
175 return priority_order(lhs
, rhs
);
178 friend bool priority_order (const bundle_node
& lhs
, const bundle_node
& rhs
)
180 return lhs
.timeout_
< rhs
.timeout_
; // lower value, higher priority
184 typedef boost::intrusive::treap_multiset
<bundle_node
> bundle_queue_t
;
186 void insert_bundle(time_tag
const & timeout
, const char * data
, size_t length
,
187 nova_endpoint
const & endpoint
);
189 void execute_bundles(time_tag
const & last
, time_tag
const & now
);
191 void clear_bundles(void)
193 bundle_q
.clear_and_dispose(dispose_bundle
);
196 static void dispose_bundle(bundle_node
* node
)
198 node
->~bundle_node();
203 bundle_queue_t bundle_q
;
206 class sc_osc_handler
:
207 private detail::network_thread
,
208 public sc_notify_observers
211 /** constructor helpers */
212 void open_tcp_acceptor(tcp
const & protocol
, unsigned int port
);
213 void open_udp_socket(udp
const & protocol
, unsigned int port
);
214 bool open_socket(int family
, int type
, int protocol
, unsigned int port
);
218 sc_osc_handler(server_arguments
const & args
):
219 sc_notify_observers(detail::network_thread::io_service_
),
220 dump_osc_packets(0), error_posting(1), quit_received(false),
221 tcp_acceptor_(detail::network_thread::io_service_
),
222 tcp_password_(args
.server_password
.c_str())
224 if (args
.tcp_port
&& !open_socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
, args
.tcp_port
))
225 throw std::runtime_error("cannot open socket");
226 if (args
.udp_port
&& !open_socket(AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
, args
.udp_port
))
227 throw std::runtime_error("cannot open socket");
230 void start_receive_thread(void)
232 detail::network_thread::start_receive();
235 ~sc_osc_handler(void)
238 typedef osc::ReceivedPacket osc_received_packet
;
239 typedef osc::ReceivedBundle received_bundle
;
240 typedef osc::ReceivedMessage received_message
;
242 struct received_packet
:
243 public audio_sync_callback
245 received_packet(const char * dat
, size_t length
, nova_endpoint
const & endpoint
):
246 data(dat
), length(length
), endpoint_(endpoint
)
249 void * operator new(std::size_t size
, void* ptr
)
251 return ::operator new(size
, ptr
);
254 static received_packet
* alloc_packet(const char * data
, size_t length
,
255 nova_endpoint
const & remote_endpoint
);
259 const char * const data
;
261 const nova_endpoint endpoint_
;
266 /** udp socket handling */
267 void start_receive_udp(void)
269 sc_notify_observers::udp_socket
.async_receive_from(
270 buffer(recv_buffer_
), udp_remote_endpoint_
,
271 boost::bind(&sc_osc_handler::handle_receive_udp
, this,
272 placeholders::error
, placeholders::bytes_transferred
));
275 void handle_receive_udp(const boost::system::error_code
& error
,
276 std::size_t bytes_transferred
);
280 /** tcp connection handling */
281 class tcp_connection
:
282 public boost::enable_shared_from_this
<tcp_connection
>
285 typedef boost::shared_ptr
<tcp_connection
> pointer
;
287 static pointer
create(boost::asio::io_service
& io_service
)
289 return pointer(new tcp_connection(io_service
));
292 tcp::socket
& socket()
297 void start(sc_osc_handler
* self
);
300 tcp_connection(boost::asio::io_service
& io_service
)
301 : socket_(io_service
)
307 void start_accept(void)
309 tcp_connection::pointer new_connection
= tcp_connection::create(tcp_acceptor_
.get_io_service());
311 tcp_acceptor_
.async_accept(new_connection
->socket(),
312 boost::bind(&sc_osc_handler::handle_accept
, this, new_connection
,
313 boost::asio::placeholders::error
));
316 void handle_accept(tcp_connection::pointer new_connection
,
317 const boost::system::error_code
& error
)
321 new_connection
->start(this);
330 dump_osc_packets
= i
;
334 int dump_osc_packets
;
338 /** \todo how to handle temporary message error suppression? */
340 void set_error_posting(int val
)
350 /** packet handling */
352 void handle_packet_async(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
353 void handle_packet(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
354 time_tag
handle_bundle_nrt(const char * data_
, std::size_t length
);
357 template <bool realtime
>
358 void handle_bundle(received_bundle
const & bundle
, nova_endpoint
const & endpoint
);
359 template <bool realtime
>
360 void handle_message(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
361 template <bool realtime
>
362 void handle_message_int_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
363 template <bool realtime
>
364 void handle_message_sym_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
366 friend class sc_scheduled_bundles::bundle_node
;
370 /** bundle scheduling */
372 void clear_scheduled_bundles(void)
374 scheduled_bundles
.clear_bundles();
377 void execute_scheduled_bundles(void)
379 scheduled_bundles
.execute_bundles(last
, now
);
382 void increment_logical_time(time_tag
const & diff
)
388 void update_time_from_system(void)
390 now
= time_tag::from_ptime(boost::date_time::microsec_clock
<boost::posix_time::ptime
>::universal_time());
391 last
= now
- time_per_tick
;
394 time_tag
const & current_time(void) const
399 sc_scheduled_bundles scheduled_bundles
;
401 time_tag time_per_tick
;
405 void do_asynchronous_command(World
* world
, void* replyAddr
, const char* cmdName
, void *cmdData
,
406 AsyncStageFn stage2
, AsyncStageFn stage3
, AsyncStageFn stage4
, AsyncFreeFn cleanup
,
407 int completionMsgSize
, void* completionMsgData
);
413 /* udp::socket udp_socket_;*/
414 udp::endpoint udp_remote_endpoint_
;
416 tcp::acceptor tcp_acceptor_
;
417 const char * tcp_password_
; /* we are not owning this! */
419 boost::array
<char, 1<<15 > recv_buffer_
;
421 std::vector
<char> overflow_vector
;
425 } /* namespace detail */
427 using detail::sc_osc_handler
;
429 } /* namespace nova */
432 #endif /* SERVER_SC_OSC_HANDLER_HPP */