1 // osc handler for supercollider-style communication
2 // Copyright (C) 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef SERVER_SC_OSC_HANDLER_HPP
20 #define SERVER_SC_OSC_HANDLER_HPP
25 #include <boost/enable_shared_from_this.hpp>
26 #include <boost/date_time/microsec_time_clock.hpp>
27 #include <boost/intrusive/treap_set.hpp>
29 #include "osc/OscReceivedElements.h"
31 #include "../server/dynamic_endpoint.hpp"
32 #include "../server/memory_pool.hpp"
33 #include "../server/server_args.hpp"
34 #include "../server/server_scheduler.hpp"
35 #include "../utilities/osc_server.hpp"
36 #include "../utilities/sized_array.hpp"
37 #include "../utilities/static_pool.hpp"
38 #include "../utilities/time_tag.hpp"
42 typedef bool (*AsyncStageFn
)(World
*inWorld
, void* cmdData
);
43 typedef void (*AsyncFreeFn
)(World
*inWorld
, void* cmdData
);
47 using namespace boost::asio
;
48 using namespace boost::asio::ip
;
52 * observer to receive osc notifications
54 * \todo shall we use a separate thread for observer notifications?
56 class sc_notify_observers
58 typedef std::vector
<nova_endpoint
> observer_vector
;
61 sc_notify_observers(boost::asio::io_service
& io_service
):
62 udp_socket(io_service
), tcp_socket(io_service
)
65 void add_observer(nova_endpoint
const & ep
)
67 observers
.push_back(ep
);
70 void remove_observer(nova_endpoint
const & ep
)
72 observer_vector::iterator it
= std::find(observers
.begin(),
74 assert (it
!= observers
.end());
81 /** notifications, should be called from the real-time thread */
82 void notification_node_started(const server_node
* node
)
84 notify("/n_go", node
);
87 void notification_node_ended(const server_node
* node
)
89 notify("/n_end", node
);
92 void notification_node_turned_off(const server_node
* node
)
94 notify("/n_off", node
);
97 void notification_node_turned_on(const server_node
* node
)
99 notify("/n_on", node
);
102 void notification_node_moved(const server_node
* node
)
104 notify("/n_move", node
);
107 void send_trigger(int32_t node_id
, int32_t trigger_id
, float value
);
109 void send_node_reply(int32_t node_id
, int reply_id
, const char* command_name
, int argument_count
, const float* values
);
112 /** send notifications, should not be called from the real-time thread */
113 void send_notification(const char * data
, size_t length
);
116 /** sending functions */
117 void send(const char * data
, size_t size
, nova_endpoint
const & endpoint
)
119 nova_protocol prot
= endpoint
.protocol();
120 if (prot
.family() == AF_INET
&& prot
.type() == SOCK_DGRAM
)
122 udp::endpoint
ep(endpoint
.address(), endpoint
.port());
123 send_udp(data
, size
, ep
);
125 else if (prot
.family() == AF_INET
&& prot
.type() == SOCK_STREAM
)
127 tcp::endpoint
ep(endpoint
.address(), endpoint
.port());
128 send_tcp(data
, size
, ep
);
132 void send_udp(const char * data
, unsigned int size
, udp::endpoint
const & receiver
)
134 boost::mutex::scoped_lock
lock(udp_mutex
);
135 sc_notify_observers::udp_socket
.send_to(boost::asio::buffer(data
, size
), receiver
);
138 void send_tcp(const char * data
, unsigned int size
, tcp::endpoint
const & receiver
)
140 boost::mutex::scoped_lock
lock(tcp_mutex
);
141 tcp_socket
.connect(receiver
);
142 boost::asio::write(tcp_socket
, boost::asio::buffer(data
, size
));
147 void notify(const char * address_pattern
, const server_node
* node
);
148 void send_notification(const char * data
, size_t length
, nova_endpoint
const & endpoint
);
150 observer_vector observers
;
153 udp::socket udp_socket
;
154 tcp::socket tcp_socket
;
155 boost::mutex udp_mutex
, tcp_mutex
;
158 class sc_scheduled_bundles
162 public boost::intrusive::bs_set_base_hook
<>
164 bundle_node(time_tag
const & timeout
, const char * data
, nova_endpoint
const & endpoint
):
165 timeout_(timeout
), data_(data
), endpoint_(endpoint
)
170 const time_tag timeout_
;
171 const char * const data_
;
172 const nova_endpoint endpoint_
;
174 friend bool operator< (const bundle_node
& lhs
, const bundle_node
& rhs
)
176 return priority_order(lhs
, rhs
);
179 friend bool priority_order (const bundle_node
& lhs
, const bundle_node
& rhs
)
181 return lhs
.timeout_
< rhs
.timeout_
; // lower value, higher priority
185 typedef boost::intrusive::treap_multiset
<bundle_node
> bundle_queue_t
;
187 void insert_bundle(time_tag
const & timeout
, const char * data
, size_t length
,
188 nova_endpoint
const & endpoint
);
190 void execute_bundles(time_tag
const & last
, time_tag
const & now
);
192 void clear_bundles(void)
194 bundle_q
.clear_and_dispose(dispose_bundle
);
197 static void dispose_bundle(bundle_node
* node
)
199 node
->~bundle_node();
204 bundle_queue_t bundle_q
;
207 class sc_osc_handler
:
208 private detail::network_thread
,
209 public sc_notify_observers
212 /** constructor helpers */
213 void open_tcp_acceptor(tcp
const & protocol
, unsigned int port
);
214 void open_udp_socket(udp
const & protocol
, unsigned int port
);
215 bool open_socket(int family
, int type
, int protocol
, unsigned int port
);
219 sc_osc_handler(server_arguments
const & args
):
220 sc_notify_observers(detail::network_thread::io_service_
),
221 dump_osc_packets(0), error_posting(1), quit_received(false),
222 tcp_acceptor_(detail::network_thread::io_service_
),
223 tcp_password_(args
.server_password
.c_str())
225 if (args
.tcp_port
&& !open_socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
, args
.tcp_port
))
226 throw std::runtime_error("cannot open socket");
227 if (args
.udp_port
&& !open_socket(AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
, args
.udp_port
))
228 throw std::runtime_error("cannot open socket");
231 void start_receive_thread(void)
233 detail::network_thread::start_receive();
236 ~sc_osc_handler(void)
239 typedef osc::ReceivedPacket osc_received_packet
;
240 typedef osc::ReceivedBundle received_bundle
;
241 typedef osc::ReceivedMessage received_message
;
243 struct received_packet
:
244 public audio_sync_callback
246 received_packet(const char * dat
, size_t length
, nova_endpoint
const & endpoint
):
247 data(dat
), length(length
), endpoint_(endpoint
)
250 void * operator new(std::size_t size
, void* ptr
)
252 return ::operator new(size
, ptr
);
255 static received_packet
* alloc_packet(const char * data
, size_t length
,
256 nova_endpoint
const & remote_endpoint
);
260 const char * const data
;
262 const nova_endpoint endpoint_
;
267 /** udp socket handling */
268 void start_receive_udp(void)
270 sc_notify_observers::udp_socket
.async_receive_from(
271 buffer(recv_buffer_
), udp_remote_endpoint_
,
272 boost::bind(&sc_osc_handler::handle_receive_udp
, this,
273 placeholders::error
, placeholders::bytes_transferred
));
276 void handle_receive_udp(const boost::system::error_code
& error
,
277 std::size_t bytes_transferred
);
281 /** tcp connection handling */
282 class tcp_connection
:
283 public boost::enable_shared_from_this
<tcp_connection
>
286 typedef boost::shared_ptr
<tcp_connection
> pointer
;
288 static pointer
create(boost::asio::io_service
& io_service
)
290 return pointer(new tcp_connection(io_service
));
293 tcp::socket
& socket()
298 void start(sc_osc_handler
* self
);
301 tcp_connection(boost::asio::io_service
& io_service
)
302 : socket_(io_service
)
308 void start_accept(void)
310 tcp_connection::pointer new_connection
= tcp_connection::create(tcp_acceptor_
.get_io_service());
312 tcp_acceptor_
.async_accept(new_connection
->socket(),
313 boost::bind(&sc_osc_handler::handle_accept
, this, new_connection
,
314 boost::asio::placeholders::error
));
317 void handle_accept(tcp_connection::pointer new_connection
,
318 const boost::system::error_code
& error
)
322 new_connection
->start(this);
331 dump_osc_packets
= i
;
335 int dump_osc_packets
;
339 /** \todo how to handle temporary message error suppression? */
341 void set_error_posting(int val
)
351 /** packet handling */
353 void handle_packet_async(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
354 void handle_packet(const char* data
, size_t length
, nova::nova_endpoint
const & endpoint
);
355 time_tag
handle_bundle_nrt(const char * data_
, std::size_t length
);
358 template <bool realtime
>
359 void handle_bundle(received_bundle
const & bundle
, nova_endpoint
const & endpoint
);
360 template <bool realtime
>
361 void handle_message(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
362 template <bool realtime
>
363 void handle_message_int_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
364 template <bool realtime
>
365 void handle_message_sym_address(received_message
const & message
, size_t msg_size
, nova_endpoint
const & endpoint
);
367 friend class sc_scheduled_bundles::bundle_node
;
371 /** bundle scheduling */
373 void clear_scheduled_bundles(void)
375 scheduled_bundles
.clear_bundles();
378 void execute_scheduled_bundles(void)
380 scheduled_bundles
.execute_bundles(last
, now
);
383 void increment_logical_time(time_tag
const & diff
)
389 void update_time_from_system(void)
391 now
= time_tag::from_ptime(boost::date_time::microsec_clock
<boost::posix_time::ptime
>::universal_time());
392 last
= now
- time_per_tick
;
395 time_tag
const & current_time(void) const
400 sc_scheduled_bundles scheduled_bundles
;
402 time_tag time_per_tick
;
406 void do_asynchronous_command(World
* world
, void* replyAddr
, const char* cmdName
, void *cmdData
,
407 AsyncStageFn stage2
, AsyncStageFn stage3
, AsyncStageFn stage4
, AsyncFreeFn cleanup
,
408 int completionMsgSize
, void* completionMsgData
);
414 /* udp::socket udp_socket_;*/
415 udp::endpoint udp_remote_endpoint_
;
417 tcp::acceptor tcp_acceptor_
;
418 const char * tcp_password_
; /* we are not owning this! */
420 boost::array
<char, 1<<15 > recv_buffer_
;
422 std::vector
<char> overflow_vector
;
426 } /* namespace detail */
428 using detail::sc_osc_handler
;
430 } /* namespace nova */
433 #endif /* SERVER_SC_OSC_HANDLER_HPP */