1 // osc handler for supercollider-style communication
2 // Copyright (C) 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef SERVER_SC_OSC_HANDLER_HPP
20 #define SERVER_SC_OSC_HANDLER_HPP
26 #include <boost/enable_shared_from_this.hpp>
27 #include <boost/date_time/microsec_time_clock.hpp>
28 #include <boost/intrusive/treap_set.hpp>
30 #include "osc/OscReceivedElements.h"
32 #include "../server/dynamic_endpoint.hpp"
33 #include "../server/memory_pool.hpp"
34 #include "../server/server_args.hpp"
35 #include "../server/server_scheduler.hpp"
36 #include "../utilities/osc_server.hpp"
37 #include "../utilities/sized_array.hpp"
38 #include "../utilities/static_pool.hpp"
39 #include "../utilities/time_tag.hpp"
43 typedef bool (*AsyncStageFn
)(World
*inWorld
, void* cmdData
);
44 typedef void (*AsyncFreeFn
)(World
*inWorld
, void* cmdData
);
48 using namespace boost::asio
;
49 using namespace boost::asio::ip
;
53 * observer to receive osc notifications
55 * \todo shall we use a separate thread for observer notifications?
57 class sc_notify_observers
59 typedef std::vector
<nova_endpoint
> observer_vector
;
62 sc_notify_observers(boost::asio::io_service
& io_service
):
63 udp_socket(io_service
), tcp_socket(io_service
)
66 void add_observer(nova_endpoint
const & ep
)
68 observers
.push_back(ep
);
71 void remove_observer(nova_endpoint
const & ep
)
73 observer_vector::iterator it
= std::find(observers
.begin(),
75 assert (it
!= observers
.end());
82 /** notifications, should be called from the real-time thread */
83 void notification_node_started(const server_node
* node
)
85 notify("/n_go", node
);
88 void notification_node_ended(const server_node
* node
)
90 notify("/n_end", node
);
93 void notification_node_turned_off(const server_node
* node
)
95 notify("/n_off", node
);
98 void notification_node_turned_on(const server_node
* node
)
100 notify("/n_on", node
);
103 void notification_node_moved(const server_node
* node
)
105 notify("/n_move", node
);
108 void send_trigger(int32_t node_id
, int32_t trigger_id
, float value
);
110 void send_node_reply(int32_t node_id
, int reply_id
, const char* command_name
, int argument_count
, const float* values
);
113 /** send notifications, should not be called from the real-time thread */
114 void send_notification(const char * data
, size_t length
);
117 /** sending functions */
118 void send(const char * data
, size_t size
, nova_endpoint
const & endpoint
)
120 if (!endpoint
.is_valid())
122 nova_protocol prot
= endpoint
.protocol();
123 if (prot
.family() == AF_INET
&& prot
.type() == SOCK_DGRAM
)
125 udp::endpoint
ep(endpoint
.address(), endpoint
.port());
126 send_udp(data
, size
, ep
);
128 else if (prot
.family() == AF_INET
&& prot
.type() == SOCK_STREAM
)
130 tcp::endpoint
ep(endpoint
.address(), endpoint
.port());
131 send_tcp(data
, size
, ep
);
135 void send_udp(const char * data
, unsigned int size
, udp::endpoint
const & receiver
)
137 std::lock_guard
<std::mutex
> lock(udp_mutex
);
138 sc_notify_observers::udp_socket
.send_to(boost::asio::buffer(data
, size
), receiver
);
141 void send_tcp(const char * data
, unsigned int size
, tcp::endpoint
const & receiver
)
143 std::lock_guard
<std::mutex
> lock(tcp_mutex
);
144 tcp_socket
.connect(receiver
);
145 boost::asio::write(tcp_socket
, boost::asio::buffer(data
, size
));
150 void notify(const char * address_pattern
, const server_node
* node
);
151 void send_notification(const char * data
, size_t length
, nova_endpoint
const & endpoint
);
153 observer_vector observers
;
156 udp::socket udp_socket
;
157 tcp::socket tcp_socket
;
158 std::mutex udp_mutex
, tcp_mutex
;
161 class sc_scheduled_bundles
165 public boost::intrusive::bs_set_base_hook
<>
167 bundle_node(time_tag
const & timeout
, const char * data
, nova_endpoint
const & endpoint
):
168 timeout_(timeout
), data_(data
), endpoint_(endpoint
)
173 const time_tag timeout_
;
174 const char * const data_
;
175 const nova_endpoint endpoint_
;
177 friend bool operator< (const bundle_node
& lhs
, const bundle_node
& rhs
)
179 return priority_order(lhs
, rhs
);
182 friend bool priority_order (const bundle_node
& lhs
, const bundle_node
& rhs
)
184 return lhs
.timeout_
< rhs
.timeout_
; // lower value, higher priority
188 typedef boost::intrusive::treap_multiset
<bundle_node
> bundle_queue_t
;
190 void insert_bundle(time_tag
const & timeout
, const char * data
, size_t length
,
191 nova_endpoint
const & endpoint
);
193 void execute_bundles(time_tag
const & last
, time_tag
const & now
);
195 void clear_bundles(void)
197 bundle_q
.clear_and_dispose(dispose_bundle
);
200 static void dispose_bundle(bundle_node
* node
)
202 node
->~bundle_node();
207 bundle_queue_t bundle_q
;
210 class sc_osc_handler
:
211 private detail::network_thread
,
212 public sc_notify_observers
215 /** constructor helpers */
216 void open_tcp_acceptor(tcp
const & protocol
, unsigned int port
);
217 void open_udp_socket(udp
const & protocol
, unsigned int port
);
218 bool open_socket(int family
, int type
, int protocol
, unsigned int port
);
222 sc_osc_handler(server_arguments
const & args
):
223 sc_notify_observers(detail::network_thread::io_service_
),
224 dump_osc_packets(0), error_posting(1), quit_received(false),
225 tcp_acceptor_(detail::network_thread::io_service_
),
226 tcp_password_(args
.server_password
.c_str())
228 if (args
.tcp_port
&& !open_socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
, args
.tcp_port
))
229 throw std::runtime_error("cannot open socket");
230 if (args
.udp_port
&& !open_socket(AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
, args
.udp_port
))
231 throw std::runtime_error("cannot open socket");
234 void start_receive_thread(void)
236 detail::network_thread::start_receive();
239 ~sc_osc_handler(void)
242 typedef osc::ReceivedPacket osc_received_packet
;
243 typedef osc::ReceivedBundle received_bundle
;
244 typedef osc::ReceivedMessage received_message
;
246 struct received_packet
:
247 public audio_sync_callback
249 received_packet(const char * dat
, size_t length
, nova_endpoint
const & endpoint
):
250 data(dat
), length(length
), endpoint_(endpoint
)
253 void * operator new(std::size_t size
, void* ptr
)
255 return ::operator new(size
, ptr
);
258 static received_packet
* alloc_packet(const char * data
, size_t length
,
259 nova_endpoint
const & remote_endpoint
);
263 const char * const data
;
265 const nova_endpoint endpoint_
;
270 /** udp socket handling */
271 void start_receive_udp(void)
273 sc_notify_observers::udp_socket
.async_receive_from(
274 buffer(recv_buffer_
), udp_remote_endpoint_
,
275 boost::bind(&sc_osc_handler::handle_receive_udp
, this,
276 placeholders::error
, placeholders::bytes_transferred
));
279 void handle_receive_udp(const boost::system::error_code
& error
,
280 std::size_t bytes_transferred
);
284 /** tcp connection handling */
285 class tcp_connection
:
286 public boost::enable_shared_from_this
<tcp_connection
>
289 typedef std::shared_ptr
<tcp_connection
> pointer
;
291 static pointer
create(boost::asio::io_service
& io_service
)
293 return pointer(new tcp_connection(io_service
));
296 tcp::socket
& socket()
301 void start(sc_osc_handler
* self
);
304 tcp_connection(boost::asio::io_service
& io_service
)
305 : socket_(io_service
)
311 void start_accept(void)
313 tcp_connection::pointer new_connection
= tcp_connection::create(tcp_acceptor_
.get_io_service());
315 tcp_acceptor_
.async_accept(new_connection
->socket(),
316 boost::bind(&sc_osc_handler::handle_accept
, this, new_connection
,
317 boost::asio::placeholders::error
));
320 void handle_accept(tcp_connection::pointer new_connection
,
321 const boost::system::error_code
& error
)
325 new_connection
->start(this);
334 dump_osc_packets
= i
;
338 int dump_osc_packets
;
342 /** \todo how to handle temporary message error suppression? */
344 void set_error_posting(int val
)
354 /** packet handling */
356 void handle_packet_async(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
357 void handle_packet(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
358 time_tag
handle_bundle_nrt(const char * data_
, std::size_t length
);
361 template <bool realtime
>
362 void handle_bundle(received_bundle
const & bundle
, nova_endpoint
const & endpoint
);
363 template <bool realtime
>
364 void handle_message(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
365 template <bool realtime
>
366 void handle_message_int_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
367 template <bool realtime
>
368 void handle_message_sym_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
370 friend class sc_scheduled_bundles::bundle_node
;
374 /** bundle scheduling */
376 void clear_scheduled_bundles(void)
378 scheduled_bundles
.clear_bundles();
381 void execute_scheduled_bundles(void)
383 scheduled_bundles
.execute_bundles(last
, now
);
386 void increment_logical_time(time_tag
const & diff
)
392 void update_time_from_system(void)
394 now
= time_tag::from_ptime(boost::date_time::microsec_clock
<boost::posix_time::ptime
>::universal_time());
395 last
= now
- time_per_tick
;
398 time_tag
const & current_time(void) const
403 sc_scheduled_bundles scheduled_bundles
;
405 time_tag time_per_tick
;
409 void do_asynchronous_command(World
* world
, void* replyAddr
, const char* cmdName
, void *cmdData
,
410 AsyncStageFn stage2
, AsyncStageFn stage3
, AsyncStageFn stage4
, AsyncFreeFn cleanup
,
411 int completionMsgSize
, void* completionMsgData
);
417 /* udp::socket udp_socket_;*/
418 udp::endpoint udp_remote_endpoint_
;
420 tcp::acceptor tcp_acceptor_
;
421 const char * tcp_password_
; /* we are not owning this! */
423 boost::array
<char, 1<<15 > recv_buffer_
;
425 std::vector
<char> overflow_vector
;
429 } /* namespace detail */
431 using detail::sc_osc_handler
;
433 } /* namespace nova */
436 #endif /* SERVER_SC_OSC_HANDLER_HPP */