Reduced variable scope
[amule.git] / src / LibSocketAsio.cpp
blob69cbfb59a6e4e8221b9547f7b16ef5a40a700fa8
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2011-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2011-2011 Stu Redman ( admin@amule.org / http://www.amule.org )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #ifdef HAVE_CONFIG_H
27 # include "config.h" // Needed for HAVE_BOOST_SOURCES
28 #endif
30 #ifdef _MSC_VER
31 #define _WIN32_WINNT 0x0501 // Boost complains otherwise
32 #endif
34 // Windows requires that Boost headers are included before wx headers.
35 // This works if precompiled headers are disabled for this file.
37 #define BOOST_ALL_NO_LIB
39 // Suppress warning caused by faulty boost/preprocessor/config/config.hpp in Boost 1.49
40 #if defined __GNUC__ && ! defined __GXX_EXPERIMENTAL_CXX0X__ && __cplusplus < 201103L
41 #define BOOST_PP_VARIADICS 0
42 #endif
44 #include <algorithm> // Needed for std::min - Boost up to 1.54 fails to compile with MSVC 2013 otherwise
46 #include <boost/asio.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/version.hpp>
51 // Do away with building Boost.System, adding lib paths...
52 // Just include the single file and be done.
54 #ifdef HAVE_BOOST_SOURCES
55 # include <boost/../libs/system/src/error_code.cpp>
56 #else
57 # include <boost/system/error_code.hpp>
58 #endif
60 #include "LibSocket.h"
61 #include <wx/thread.h> // wxMutex
62 #include <wx/intl.h> // _()
63 #include <common/Format.h> // Needed for CFormat
64 #include "Logger.h"
65 #include "GuiEvents.h"
66 #include "amuleIPV4Address.h"
67 #include "MuleUDPSocket.h"
68 #include "OtherFunctions.h" // DeleteContents, MuleBoostVersion
69 #include "ScopedPtr.h"
71 using namespace boost::asio;
72 using namespace boost::system; // for error_code
73 static io_service s_io_service;
75 // Number of threads in the Asio thread pool
76 const int CAsioService::m_numberOfThreads = 4;
78 /**
79 * ASIO Client TCP socket implementation
82 class CamuleIPV4Endpoint : public ip::tcp::endpoint {
83 public:
84 CamuleIPV4Endpoint() {}
86 CamuleIPV4Endpoint(const CamuleIPV4Endpoint & impl) : ip::tcp::endpoint(impl) {}
87 CamuleIPV4Endpoint(const ip::tcp::endpoint & ep) { * this = ep; }
88 CamuleIPV4Endpoint(const ip::udp::endpoint & ep) { address(ep.address()); port(ep.port()); }
90 const CamuleIPV4Endpoint& operator = (const ip::tcp::endpoint & ep)
92 * (ip::tcp::endpoint *) this = ep;
93 return *this;
97 class CAsioSocketImpl
99 public:
100 // cppcheck-suppress uninitMemberVar m_readBufferPtr
101 CAsioSocketImpl(CLibSocket * libSocket) :
102 m_libSocket(libSocket),
103 m_strand(s_io_service),
104 m_timer(s_io_service)
106 m_OK = false;
107 m_blocksRead = false;
108 m_blocksWrite = false;
109 m_ErrorCode = 0;
110 m_readBuffer = NULL;
111 m_readBufferSize = 0;
112 m_readPending = false;
113 m_readBufferContent = 0;
114 m_eventPending = false;
115 m_port = 0;
116 m_sendBuffer = NULL;
117 m_connected = false;
118 m_closed = false;
119 m_isDestroying = false;
120 m_proxyState = false;
121 m_notify = true;
122 m_sync = false;
123 m_IP = wxT("?");
124 m_IPint = 0;
125 m_socket = new ip::tcp::socket(s_io_service);
127 // Set socket to non blocking
128 m_socket->non_blocking();
131 ~CAsioSocketImpl()
133 delete[] m_readBuffer;
134 delete[] m_sendBuffer;
137 void Notify(bool notify)
139 m_notify = notify;
142 bool Connect(const amuleIPV4Address& adr, bool wait)
144 if (!m_proxyState) {
145 SetIp(adr);
147 m_port = adr.Service();
148 m_closed = false;
149 m_OK = false;
150 m_sync = !m_notify; // set this once for the whole lifetime of the socket
151 AddDebugLogLineF(logAsio, CFormat(wxT("Connect %s %p")) % m_IP % this);
153 if (wait || m_sync) {
154 error_code ec;
155 m_socket->connect(adr.GetEndpoint(), ec);
156 m_OK = !ec;
157 m_connected = m_OK;
158 return m_OK;
159 } else {
160 m_socket->async_connect(adr.GetEndpoint(),
161 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleConnect, this, placeholders::error)));
162 // m_OK and return are false because we are not connected yet
163 return false;
167 bool IsConnected() const
169 return m_connected;
172 // For wxSocketClient, Ok won't return true unless the client is connected to a server.
173 bool IsOk() const
175 return m_OK;
178 bool IsDestroying() const
180 return m_isDestroying;
183 // Returns the actual error code
184 int LastError() const
186 return m_ErrorCode;
189 // Is reading blocked?
190 bool BlocksRead() const
192 return m_blocksRead;
195 // Is writing blocked?
196 bool BlocksWrite() const
198 return m_blocksWrite;
201 // Problem: wx sends an event when data gets available, so first there is an event, then Read() is called
202 // Asio can read async with callback, so you first read, then you get an event.
203 // Strategy:
204 // - Read some data in background into a buffer
205 // - Callback posts event when something is there
206 // - Read data from buffer
207 // - If data is exhausted, start reading more in background
208 // - If not, post another event (making sure events don't pile up though)
209 uint32 Read(char * buf, uint32 bytesToRead)
211 if (bytesToRead == 0) { // huh?
212 return 0;
215 if (m_sync) {
216 return ReadSync(buf, bytesToRead);
219 if (m_ErrorCode) {
220 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Error")) % m_IP % bytesToRead);
221 return 0;
224 if (m_readPending // Background read hasn't completed.
225 || m_readBufferContent == 0) { // shouldn't be if it's not pending
227 m_blocksRead = true;
228 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Block")) % m_IP % bytesToRead);
229 return 0;
232 m_blocksRead = false; // shouldn't be needed
234 // Read from our buffer
235 uint32 readCache = std::min(m_readBufferContent, bytesToRead);
236 memcpy(buf, m_readBufferPtr, readCache);
237 m_readBufferContent -= readCache;
238 m_readBufferPtr += readCache;
240 AddDebugLogLineF(logAsio, CFormat(wxT("Read2 %s %d - %d")) % m_IP % bytesToRead % readCache);
241 if (m_readBufferContent) {
242 // Data left, post another event
243 PostReadEvent(1);
244 } else {
245 // Nothing left, read more
246 StartBackgroundRead();
248 return readCache;
252 // Make a copy of the data and send it in background
253 // - unless a background send is already going on
254 uint32 Write(const void * buf, uint32 nbytes)
256 if (m_sync) {
257 return WriteSync(buf, nbytes);
260 if (m_sendBuffer) {
261 m_blocksWrite = true;
262 AddDebugLogLineF(logAsio, CFormat(wxT("Write blocks %d %p %s")) % nbytes % m_sendBuffer % m_IP);
263 return 0;
265 AddDebugLogLineF(logAsio, CFormat(wxT("Write %d %s")) % nbytes % m_IP);
266 m_sendBuffer = new char[nbytes];
267 memcpy(m_sendBuffer, buf, nbytes);
268 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchWrite, this, nbytes));
269 m_ErrorCode = 0;
270 return nbytes;
274 void Close()
276 if (!m_closed) {
277 m_closed = true;
278 m_connected = false;
279 if (m_sync || s_io_service.stopped()) {
280 DispatchClose();
281 } else {
282 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchClose, this));
288 void Destroy()
290 if (m_isDestroying) {
291 AddDebugLogLineC(logAsio, CFormat(wxT("Destroy() already dying socket %p %p %s")) % m_libSocket % this % m_IP);
292 return;
294 m_isDestroying = true;
295 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p %s")) % m_libSocket % this % m_IP);
296 Close();
297 if (m_sync || s_io_service.stopped()) {
298 HandleDestroy();
299 } else {
300 // Close prevents creation of any more callbacks, but does not clear any callbacks already
301 // sitting in Asio's event queue (I have seen such a crash).
302 // So create a delay timer so they can be called until core is notified.
303 m_timer.expires_from_now(boost::posix_time::seconds(1));
304 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleDestroy, this)));
309 wxString GetPeer()
311 return m_IP;
314 uint32 GetPeerInt()
316 return m_IPint;
320 // Bind socket to local endpoint if user wants to choose the local address
322 void SetLocal(const amuleIPV4Address& local)
324 error_code ec;
325 if (!m_socket->is_open()) {
326 // Socket is usually still closed when this is called
327 m_socket->open(boost::asio::ip::tcp::v4(), ec);
328 if (ec) {
329 AddDebugLogLineC(logAsio, CFormat(wxT("Can't open socket : %s")) % ec.message());
333 // We are using random (OS-defined) local ports.
334 // To set a constant output port, first call
335 // m_socket->set_option(socket_base::reuse_address(true));
336 // and then set the endpoint's port to it.
338 CamuleIPV4Endpoint endpoint(local.GetEndpoint());
339 endpoint.port(0);
340 m_socket->bind(endpoint, ec);
341 if (ec) {
342 AddDebugLogLineC(logAsio, CFormat(wxT("Can't bind socket to local endpoint %s : %s"))
343 % local.IPAddress() % ec.message());
344 } else {
345 AddDebugLogLineF(logAsio, CFormat(wxT("Bound socket to local endpoint %s")) % local.IPAddress());
350 void EventProcessed()
352 m_eventPending = false;
355 void SetWrapSocket(CLibSocket * socket)
357 m_libSocket = socket;
358 // Also do some setting up
359 m_OK = true;
360 m_connected = true;
361 // Start reading
362 StartBackgroundRead();
365 bool UpdateIP()
367 error_code ec;
368 amuleIPV4Address addr = CamuleIPV4Endpoint(m_socket->remote_endpoint(ec));
369 if (SetError(ec)) {
370 AddDebugLogLineN(logAsio, CFormat(wxT("UpdateIP failed %p %s")) % this % ec.message());
371 return false;
373 SetIp(addr);
374 m_port = addr.Service();
375 AddDebugLogLineF(logAsio, CFormat(wxT("UpdateIP %s %d %p")) % m_IP % m_port % this);
376 return true;
379 const wxChar * GetIP() const { return m_IP; }
380 uint16 GetPort() const { return m_port; }
382 ip::tcp::socket & GetAsioSocket()
384 return * m_socket;
387 bool GetProxyState() const { return m_proxyState; }
389 void SetProxyState(bool state, const amuleIPV4Address * adr)
391 m_proxyState = state;
392 if (state) {
393 // Start. Get the true IP for logging.
394 wxASSERT(adr);
395 SetIp(*adr);
396 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to proxy %s")) % m_IP);
397 } else {
398 // Transition from proxy to normal mode
399 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to normal %s")) % m_IP);
400 m_ErrorCode = 0;
404 private:
406 // Dispatch handlers
407 // Access to m_socket is all bundled in the thread running s_io_service to avoid
408 // concurrent access to the socket from several threads.
409 // So once things are running (after connect), all access goes through one of these handlers.
411 void DispatchClose()
413 error_code ec;
414 m_socket->close(ec);
415 if (ec) {
416 AddDebugLogLineC(logAsio, CFormat(wxT("Close error %s %s")) % m_IP % ec.message());
417 } else {
418 AddDebugLogLineF(logAsio, CFormat(wxT("Closed %s")) % m_IP);
422 void DispatchBackgroundRead()
424 AddDebugLogLineF(logAsio, CFormat(wxT("DispatchBackgroundRead %s")) % m_IP);
425 m_socket->async_read_some(null_buffers(),
426 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleRead, this, placeholders::error)));
429 void DispatchWrite(uint32 nbytes)
431 async_write(*m_socket, buffer(m_sendBuffer, nbytes),
432 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleSend, this, placeholders::error, placeholders::bytes_transferred)));
436 // Completion handlers for async requests
439 void HandleConnect(const error_code& err)
441 m_OK = !err;
442 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect %d %s")) % m_OK % m_IP);
443 if (m_isDestroying) {
444 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect: socket pending for deletion %s")) % m_IP);
445 } else {
446 CoreNotify_LibSocketConnect(m_libSocket, err.value());
447 if (m_OK) {
448 // After connect also send a OUTPUT event to show data is available
449 CoreNotify_LibSocketSend(m_libSocket, 0);
450 // Start reading
451 StartBackgroundRead();
452 m_connected = true;
457 void HandleSend(const error_code& err, size_t bytes_transferred)
459 delete[] m_sendBuffer;
460 m_sendBuffer = NULL;
462 if (m_isDestroying) {
463 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend: socket pending for deletion %s")) % m_IP);
464 } else {
465 if (SetError(err)) {
466 AddDebugLogLineN(logAsio, CFormat(wxT("HandleSend Error %d %s")) % bytes_transferred % m_IP);
467 PostLostEvent();
468 } else {
469 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend %d %s")) % bytes_transferred % m_IP);
470 m_blocksWrite = false;
471 CoreNotify_LibSocketSend(m_libSocket, m_ErrorCode);
476 void HandleRead(const error_code & ec)
478 if (m_isDestroying) {
479 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead: socket pending for deletion %s")) % m_IP);
482 if (SetError(ec)) {
483 // This is what we get in Windows when a connection gets closed from remote.
484 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError %s %s")) % m_IP % ec.message());
485 PostLostEvent();
486 return;
489 error_code ec2;
490 uint32 avail = m_socket->available(ec2);
491 if (SetError(ec2)) {
492 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError available %d %s %s")) % avail % m_IP % ec2.message());
493 PostLostEvent();
494 return;
496 if (avail == 0) {
497 // This is what we get in Linux when a connection gets closed from remote.
498 AddDebugLogLineF(logAsio, CFormat(wxT("HandleReadError nothing available %s")) % m_IP);
499 SetError();
500 PostLostEvent();
501 return;
503 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead %d %s")) % avail % m_IP);
505 // adjust (or create) our read buffer
506 if (m_readBufferSize < avail) {
507 delete[] m_readBuffer;
508 m_readBuffer = new char[avail];
509 m_readBufferSize = avail;
511 m_readBufferPtr = m_readBuffer;
513 // read available data
514 m_readBufferContent = m_socket->read_some(buffer(m_readBuffer, avail), ec2);
515 if (SetError(ec2) || m_readBufferContent == 0) {
516 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError read %d %s %s")) % m_readBufferContent % m_IP % ec2.message());
517 PostLostEvent();
518 return;
521 m_readPending = false;
522 m_blocksRead = false;
523 PostReadEvent(2);
526 void HandleDestroy()
528 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p %s")) % m_libSocket % this % m_IP);
529 CoreNotify_LibSocketDestroy(m_libSocket);
534 // Other functions
537 void StartBackgroundRead()
539 m_readPending = true;
540 m_readBufferContent = 0;
541 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchBackgroundRead, this));
544 void PostReadEvent(int from)
546 if (!m_eventPending) {
547 AddDebugLogLineF(logAsio, CFormat(wxT("Post read event %d %s")) % from % m_IP);
548 m_eventPending = true;
549 CoreNotify_LibSocketReceive(m_libSocket, m_ErrorCode);
553 void PostLostEvent()
555 if (!m_isDestroying && !m_closed) {
556 CoreNotify_LibSocketLost(m_libSocket);
560 void SetError()
562 m_ErrorCode = 2;
565 bool SetError(const error_code & err)
567 if (err) {
568 SetError();
569 } else {
570 m_ErrorCode = 0;
572 return m_ErrorCode != 0;
576 // Synchronous sockets (amulecmd)
578 uint32 ReadSync(char * buf, uint32 bytesToRead)
580 error_code ec;
581 uint32 received = read(*m_socket, buffer(buf, bytesToRead), ec);
582 SetError(ec);
583 return received;
586 uint32 WriteSync(const void * buf, uint32 nbytes)
588 error_code ec;
589 uint32 sent = write(*m_socket, buffer(buf, nbytes), ec);
590 SetError(ec);
591 return sent;
595 // Access to even const & wxString is apparently not thread-safe.
596 // Locks are set/removed in wx and reference counts can go astray.
597 // So store our IP string in a wxString which is used nowhere.
598 // Store a pointer to its string buffer as well and use THAT everywhere.
600 void SetIp(const amuleIPV4Address& adr)
602 m_IPstring = adr.IPAddress();
603 m_IP = m_IPstring.c_str();
604 m_IPint = StringIPtoUint32(m_IPstring);
607 CLibSocket * m_libSocket;
608 ip::tcp::socket * m_socket;
609 // remote IP
610 wxString m_IPstring; // as String (use nowhere because of threading!)
611 const wxChar * m_IP; // as char* (use in debug logs)
612 uint32 m_IPint; // as int
613 uint16 m_port; // remote port
614 bool m_OK;
615 int m_ErrorCode;
616 bool m_blocksRead;
617 bool m_blocksWrite;
618 char * m_readBuffer;
619 uint32 m_readBufferSize;
620 char * m_readBufferPtr;
621 bool m_readPending;
622 uint32 m_readBufferContent;
623 bool m_eventPending;
624 char * m_sendBuffer;
625 io_service::strand m_strand; // handle synchronisation in io_service thread pool
626 deadline_timer m_timer;
627 bool m_connected;
628 bool m_closed;
629 bool m_isDestroying; // true if Destroy() was called
630 bool m_proxyState;
631 bool m_notify; // set by Notify()
632 bool m_sync; // copied from !m_notify on Connect()
637 * Library socket wrapper
640 CLibSocket::CLibSocket(int /* flags */)
642 m_aSocket = new CAsioSocketImpl(this);
646 CLibSocket::~CLibSocket()
648 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibSocket() %p %p %s")) % this % m_aSocket % m_aSocket->GetIP());
649 delete m_aSocket;
653 bool CLibSocket::Connect(const amuleIPV4Address& adr, bool wait)
655 return m_aSocket->Connect(adr, wait);
659 bool CLibSocket::IsConnected() const
661 return m_aSocket->IsConnected();
665 bool CLibSocket::IsOk() const
667 return m_aSocket->IsOk();
671 wxString CLibSocket::GetPeer()
673 return m_aSocket->GetPeer();
677 uint32 CLibSocket::GetPeerInt()
679 return m_aSocket->GetPeerInt();
683 void CLibSocket::Destroy()
685 m_aSocket->Destroy();
689 bool CLibSocket::IsDestroying() const
691 return m_aSocket->IsDestroying();
695 void CLibSocket::Notify(bool notify)
697 m_aSocket->Notify(notify);
701 uint32 CLibSocket::Read(void * buffer, uint32 nbytes)
703 return m_aSocket->Read((char *) buffer, nbytes);
707 uint32 CLibSocket::Write(const void * buffer, uint32 nbytes)
709 return m_aSocket->Write(buffer, nbytes);
713 void CLibSocket::Close()
715 m_aSocket->Close();
719 int CLibSocket::LastError() const
721 return m_aSocket->LastError();
725 void CLibSocket::SetLocal(const amuleIPV4Address& local)
727 m_aSocket->SetLocal(local);
732 // new Stuff
734 bool CLibSocket::BlocksRead() const
736 return m_aSocket->BlocksRead();
740 bool CLibSocket::BlocksWrite() const
742 return m_aSocket->BlocksWrite();
746 void CLibSocket::EventProcessed()
748 m_aSocket->EventProcessed();
752 void CLibSocket::LinkSocketImpl(class CAsioSocketImpl * socket)
754 delete m_aSocket;
755 m_aSocket = socket;
756 m_aSocket->SetWrapSocket(this);
760 const wxChar * CLibSocket::GetIP() const
762 return m_aSocket->GetIP();
766 bool CLibSocket::GetProxyState() const
768 return m_aSocket->GetProxyState();
772 void CLibSocket::SetProxyState(bool state, const amuleIPV4Address * adr)
774 m_aSocket->SetProxyState(state, adr);
779 * ASIO TCP socket server
782 class CAsioSocketServerImpl : public ip::tcp::acceptor
784 public:
785 CAsioSocketServerImpl(const amuleIPV4Address & adr, CLibSocketServer * libSocketServer)
786 : ip::tcp::acceptor(s_io_service),
787 m_libSocketServer(libSocketServer),
788 m_currentSocket(NULL),
789 m_strand(s_io_service)
791 m_ok = false;
792 m_socketAvailable = false;
794 try {
795 open(adr.GetEndpoint().protocol());
796 set_option(ip::tcp::acceptor::reuse_address(true));
797 bind(adr.GetEndpoint());
798 listen();
799 StartAccept();
800 m_ok = true;
801 AddDebugLogLineN(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d")) % adr.IPAddress() % adr.Service());
802 } catch (const system_error& err) {
803 AddDebugLogLineC(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d failed - %s")) % adr.IPAddress() % adr.Service() % err.code().message());
807 ~CAsioSocketServerImpl()
811 // For wxSocketServer, Ok will return true if the server could bind to the specified address and is already listening for new connections.
812 bool IsOk() const { return m_ok; }
814 void Close() { close(); }
816 bool AcceptWith(CLibSocket & socket)
818 if (!m_socketAvailable) {
819 AddDebugLogLineF(logAsio, wxT("AcceptWith: nothing there"));
820 return false;
823 // return the socket we received
824 socket.LinkSocketImpl(m_currentSocket.release());
826 // check if we have another socket ready for reception
827 m_currentSocket.reset(new CAsioSocketImpl(NULL));
828 error_code ec;
829 // async_accept does not work if server is non-blocking
830 // temporarily switch it to non-blocking
831 non_blocking(true);
832 // we are set to non-blocking, so this returns right away
833 accept(m_currentSocket->GetAsioSocket(), ec);
834 // back to blocking
835 non_blocking(false);
836 if (ec || !m_currentSocket->UpdateIP()) {
837 // nothing there
838 m_socketAvailable = false;
839 // start getting another one
840 StartAccept();
841 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, getting another socket in background"));
842 } else {
843 // we got another socket right away
844 m_socketAvailable = true; // it is already true, but this improves readability
845 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, another socket is available"));
846 // aMule actually doesn't need a notification as it polls the listen socket.
847 // amuleweb does need it though
848 CoreNotify_ServerTCPAccept(m_libSocketServer);
851 return true;
854 bool SocketAvailable() const { return m_socketAvailable; }
856 private:
858 void StartAccept()
860 m_currentSocket.reset(new CAsioSocketImpl(NULL));
861 async_accept(m_currentSocket->GetAsioSocket(),
862 m_strand.wrap(boost::bind(& CAsioSocketServerImpl::HandleAccept, this, placeholders::error)));
865 void HandleAccept(const error_code& error)
867 if (error) {
868 AddDebugLogLineC(logAsio, CFormat(wxT("Error in HandleAccept: %s")) % error.message());
869 } else {
870 if (m_currentSocket->UpdateIP()) {
871 AddDebugLogLineN(logAsio, CFormat(wxT("HandleAccept received a connection from %s:%d"))
872 % m_currentSocket->GetIP() % m_currentSocket->GetPort());
873 m_socketAvailable = true;
874 CoreNotify_ServerTCPAccept(m_libSocketServer);
875 return;
876 } else {
877 AddDebugLogLineN(logAsio, wxT("Error in HandleAccept: invalid socket"));
880 // We were not successful. Try again.
881 // Post the request to the event queue to make sure it doesn't get called immediately.
882 m_strand.post(boost::bind(& CAsioSocketServerImpl::StartAccept, this));
885 // The wrapper object
886 CLibSocketServer * m_libSocketServer;
887 // Startup ok
888 bool m_ok;
889 // The last socket that connected to us
890 CScopedPtr<CAsioSocketImpl> m_currentSocket;
891 // Is there a socket available?
892 bool m_socketAvailable;
893 io_service::strand m_strand; // handle synchronisation in io_service thread pool
897 CLibSocketServer::CLibSocketServer(const amuleIPV4Address& adr, int /* flags */)
899 m_aServer = new CAsioSocketServerImpl(adr, this);
903 CLibSocketServer::~CLibSocketServer()
905 delete m_aServer;
909 // Accepts an incoming connection request, and creates a new CLibSocket object which represents the server-side of the connection.
910 // Only used in CamuleApp::ListenSocketHandler() and we don't get there.
911 CLibSocket * CLibSocketServer::Accept(bool /* wait */)
913 wxFAIL;
914 return NULL;
918 // Accept an incoming connection using the specified socket object.
919 bool CLibSocketServer::AcceptWith(CLibSocket & socket, bool wait)
921 wxASSERT(!wait);
922 return m_aServer->AcceptWith(socket);
926 bool CLibSocketServer::IsOk() const
928 return m_aServer->IsOk();
932 void CLibSocketServer::Close()
934 m_aServer->Close();
938 bool CLibSocketServer::SocketAvailable()
940 return m_aServer->SocketAvailable();
945 * ASIO UDP socket implementation
948 class CAsioUDPSocketImpl
950 private:
951 // UDP data block
952 class CUDPData {
953 public:
954 char * buffer;
955 uint32 size;
956 amuleIPV4Address ipadr;
958 CUDPData(const void * src, uint32 _size, amuleIPV4Address adr) :
959 size(_size), ipadr(adr)
961 buffer = new char[size];
962 memcpy(buffer, src, size);
965 ~CUDPData()
967 delete[] buffer;
971 public:
972 CAsioUDPSocketImpl(const amuleIPV4Address &address, int /* flags */, CLibUDPSocket * libSocket) :
973 m_libSocket(libSocket),
974 m_strand(s_io_service),
975 m_timer(s_io_service),
976 m_address(address)
978 m_muleSocket = NULL;
979 m_socket = NULL;
980 m_readBuffer = new char[CMuleUDPSocket::UDP_BUFFER_SIZE];
981 m_OK = true;
982 CreateSocket();
985 ~CAsioUDPSocketImpl()
987 AddDebugLogLineF(logAsio, wxT("UDP ~CAsioUDPSocketImpl"));
988 delete m_socket;
989 delete[] m_readBuffer;
990 DeleteContents(m_receiveBuffers);
993 void SetClientData(CMuleUDPSocket * muleSocket)
995 AddDebugLogLineF(logAsio, wxT("UDP SetClientData"));
996 m_muleSocket = muleSocket;
999 uint32 RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1001 CUDPData * recdata;
1003 wxMutexLocker lock(m_receiveBuffersLock);
1004 if (m_receiveBuffers.empty()) {
1005 AddDebugLogLineN(logAsio, wxT("UDP RecvFromError no data"));
1006 return 0;
1008 recdata = * m_receiveBuffers.begin();
1009 m_receiveBuffers.pop_front();
1011 uint32 read = recdata->size;
1012 if (read > nBytes) {
1013 // should not happen
1014 AddDebugLogLineN(logAsio, CFormat(wxT("UDP RecvFromError too much data %d")) % read);
1015 read = nBytes;
1017 memcpy(buf, recdata->buffer, read);
1018 addr = recdata->ipadr;
1019 delete recdata;
1020 return read;
1023 uint32 SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1025 // Collect data, make a copy of the buffer's content
1026 CUDPData * recdata = new CUDPData(buf, nBytes, addr);
1027 AddDebugLogLineF(logAsio, CFormat(wxT("UDP SendTo %d to %s")) % nBytes % addr.IPAddress());
1028 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchSendTo, this, recdata));
1029 return nBytes;
1032 bool IsOk() const
1034 return m_OK;
1037 void Close()
1039 if (s_io_service.stopped()) {
1040 DispatchClose();
1041 } else {
1042 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchClose, this));
1046 void Destroy()
1048 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p")) % m_libSocket % this);
1049 Close();
1050 if (s_io_service.stopped()) {
1051 HandleDestroy();
1052 } else {
1053 // Close prevents creation of any more callbacks, but does not clear any callbacks already
1054 // sitting in Asio's event queue (I have seen such a crash).
1055 // So create a delay timer so they can be called until core is notified.
1056 m_timer.expires_from_now(boost::posix_time::seconds(1));
1057 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleDestroy, this)));
1062 private:
1064 // Dispatch handlers
1065 // Access to m_socket is all bundled in the thread running s_io_service to avoid
1066 // concurrent access to the socket from several threads.
1067 // So once things are running (after connect), all access goes through one of these handlers.
1069 void DispatchClose()
1071 error_code ec;
1072 m_socket->close(ec);
1073 if (ec) {
1074 AddDebugLogLineC(logAsio, CFormat(wxT("UDP Close error %s")) % ec.message());
1075 } else {
1076 AddDebugLogLineF(logAsio, wxT("UDP Closed"));
1080 void DispatchSendTo(CUDPData * recdata)
1082 ip::udp::endpoint endpoint(recdata->ipadr.GetEndpoint().address(), recdata->ipadr.Service());
1084 AddDebugLogLineF(logAsio, CFormat(wxT("UDP DispatchSendTo %d to %s:%d")) % recdata->size
1085 % endpoint.address().to_string() % endpoint.port());
1086 m_socket->async_send_to(buffer(recdata->buffer, recdata->size), endpoint,
1087 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleSendTo, this, placeholders::error, placeholders::bytes_transferred, recdata)));
1091 // Completion handlers for async requests
1094 void HandleRead(const error_code & ec, size_t received)
1096 if (ec) {
1097 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleReadError %s")) % ec.message());
1098 } else if (received == 0) {
1099 AddDebugLogLineF(logAsio, wxT("UDP HandleReadError nothing available"));
1100 } else if (m_muleSocket == NULL) {
1101 AddDebugLogLineN(logAsio, wxT("UDP HandleReadError no handler"));
1102 } else {
1104 amuleIPV4Address ipadr = amuleIPV4Address(CamuleIPV4Endpoint(m_receiveEndpoint));
1105 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleRead %d %s:%d")) % received % ipadr.IPAddress() % ipadr.Service());
1107 // create our read buffer
1108 CUDPData * recdata = new CUDPData(m_readBuffer, received, ipadr);
1110 wxMutexLocker lock(m_receiveBuffersLock);
1111 m_receiveBuffers.push_back(recdata);
1113 CoreNotify_UDPSocketReceive(m_muleSocket);
1115 StartBackgroundRead();
1118 void HandleSendTo(const error_code & ec, size_t sent, CUDPData * recdata)
1120 if (ec) {
1121 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError %s")) % ec.message());
1122 } else if (sent != recdata->size) {
1123 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError tosend: %d sent %d")) % recdata->size % sent);
1125 if (m_muleSocket == NULL) {
1126 AddDebugLogLineN(logAsio, wxT("UDP HandleSendToError no handler"));
1127 } else {
1128 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleSendTo %d to %s")) % sent % recdata->ipadr.IPAddress());
1129 CoreNotify_UDPSocketSend(m_muleSocket);
1131 delete recdata;
1134 void HandleDestroy()
1136 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p")) % m_libSocket % this);
1137 delete m_libSocket;
1141 // Other functions
1144 void CreateSocket()
1146 try {
1147 delete m_socket;
1148 ip::udp::endpoint endpoint(m_address.GetEndpoint().address(), m_address.Service());
1149 m_socket = new ip::udp::socket(s_io_service, endpoint);
1150 AddDebugLogLineN(logAsio, CFormat(wxT("Created UDP socket %s %d")) % m_address.IPAddress() % m_address.Service());
1151 StartBackgroundRead();
1152 } catch (const system_error& err) {
1153 AddLogLineC(CFormat(wxT("Error creating UDP socket %s %d : %s")) % m_address.IPAddress() % m_address.Service() % err.code().message());
1154 m_socket = NULL;
1155 m_OK = false;
1159 void StartBackgroundRead()
1161 m_socket->async_receive_from(buffer(m_readBuffer, CMuleUDPSocket::UDP_BUFFER_SIZE), m_receiveEndpoint,
1162 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleRead, this, placeholders::error, placeholders::bytes_transferred)));
1165 CLibUDPSocket * m_libSocket;
1166 ip::udp::socket * m_socket;
1167 CMuleUDPSocket * m_muleSocket;
1168 bool m_OK;
1169 io_service::strand m_strand; // handle synchronisation in io_service thread pool
1170 deadline_timer m_timer;
1171 amuleIPV4Address m_address;
1173 // One fix receive buffer
1174 char * m_readBuffer;
1175 // and a list of dynamic buffers. UDP data may be coming in faster
1176 // than the main loop can handle it.
1177 std::list<CUDPData *> m_receiveBuffers;
1178 wxMutex m_receiveBuffersLock;
1180 // Address of last reception
1181 ip::udp::endpoint m_receiveEndpoint;
1186 * Library UDP socket wrapper
1189 CLibUDPSocket::CLibUDPSocket(amuleIPV4Address &address, int flags)
1191 m_aSocket = new CAsioUDPSocketImpl(address, flags, this);
1195 CLibUDPSocket::~CLibUDPSocket()
1197 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibUDPSocket() %p %p")) % this % m_aSocket);
1198 delete m_aSocket;
1202 bool CLibUDPSocket::IsOk() const
1204 return m_aSocket->IsOk();
1208 uint32 CLibUDPSocket::RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1210 return m_aSocket->RecvFrom(addr, buf, nBytes);
1214 uint32 CLibUDPSocket::SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1216 return m_aSocket->SendTo(addr, buf, nBytes);
1220 void CLibUDPSocket::SetClientData(CMuleUDPSocket * muleSocket)
1222 m_aSocket->SetClientData(muleSocket);
1226 int CLibUDPSocket::LastError() const
1228 return !IsOk();
1232 void CLibUDPSocket::Close()
1234 m_aSocket->Close();
1238 void CLibUDPSocket::Destroy()
1240 m_aSocket->Destroy();
1245 * CAsioService - ASIO event loop thread
1248 class CAsioServiceThread : public wxThread {
1249 public:
1250 CAsioServiceThread() : wxThread(wxTHREAD_JOINABLE)
1252 static int count = 0;
1253 m_threadNumber = ++count;
1254 Create();
1255 Run();
1258 void * Entry()
1260 AddLogLineNS(CFormat(_("Asio thread %d started")) % m_threadNumber);
1261 io_service::work worker(s_io_service); // keep io_service running
1262 s_io_service.run();
1263 AddDebugLogLineN(logAsio, CFormat(wxT("Asio thread %d stopped")) % m_threadNumber);
1265 return NULL;
1268 private:
1269 int m_threadNumber;
1273 * The constructor starts the thread.
1275 CAsioService::CAsioService()
1277 m_threads = new CAsioServiceThread[m_numberOfThreads];
1281 CAsioService::~CAsioService()
1286 void CAsioService::Stop()
1288 if (!m_threads) {
1289 return;
1291 s_io_service.stop();
1292 // Wait for threads to exit
1293 for (int i = 0; i < m_numberOfThreads; i++) {
1294 CAsioServiceThread * t = m_threads + i;
1295 t->Wait();
1297 delete[] m_threads;
1298 m_threads = 0;
1306 * amuleIPV4Address
1309 amuleIPV4Address::amuleIPV4Address()
1311 m_endpoint = new CamuleIPV4Endpoint();
1314 amuleIPV4Address::amuleIPV4Address(const amuleIPV4Address &a)
1316 *this = a;
1319 amuleIPV4Address::amuleIPV4Address(const CamuleIPV4Endpoint &ep)
1321 *this = ep;
1324 amuleIPV4Address::~amuleIPV4Address()
1326 delete m_endpoint;
1329 amuleIPV4Address& amuleIPV4Address::operator=(const amuleIPV4Address &a)
1331 m_endpoint = new CamuleIPV4Endpoint(* a.m_endpoint);
1332 return *this;
1335 amuleIPV4Address& amuleIPV4Address::operator=(const CamuleIPV4Endpoint &ep)
1337 m_endpoint = new CamuleIPV4Endpoint(ep);
1338 return *this;
1341 bool amuleIPV4Address::Hostname(const wxString& name)
1343 if (name.IsEmpty()) {
1344 return false;
1346 // This is usually just an IP.
1347 std::string sname(unicode2char(name));
1348 error_code ec;
1349 ip::address_v4 adr = ip::address_v4::from_string(sname, ec);
1350 if (!ec) {
1351 m_endpoint->address(adr);
1352 return true;
1354 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") failed, not an IP address %s")) % name % ec.message());
1356 // Try to resolve (sync). Normally not required. Unless you type in your hostname as "local IP address" or something.
1357 error_code ec2;
1358 ip::tcp::resolver res(s_io_service);
1359 // We only want to get IPV4 addresses.
1360 ip::tcp::resolver::query query(ip::tcp::v4(), sname, "");
1361 ip::tcp::resolver::iterator endpoint_iterator = res.resolve(query, ec2);
1362 if (ec2) {
1363 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: %s")) % name % ec2.message());
1364 return false;
1366 if (endpoint_iterator == ip::tcp::resolver::iterator()) {
1367 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: no address found")) % name);
1368 return false;
1370 m_endpoint->address(endpoint_iterator->endpoint().address());
1371 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolved to %s")) % name % IPAddress());
1372 return true;
1375 bool amuleIPV4Address::Service(uint16 service)
1377 if (service == 0) {
1378 return false;
1380 m_endpoint->port(service);
1381 return true;
1384 uint16 amuleIPV4Address::Service() const
1386 return m_endpoint->port();
1389 bool amuleIPV4Address::IsLocalHost() const
1391 return m_endpoint->address().is_loopback();
1394 wxString amuleIPV4Address::IPAddress() const
1396 return CFormat(wxT("%s")) % m_endpoint->address().to_string();
1399 // "Set address to any of the addresses of the current machine."
1400 // This just sets the address to 0.0.0.0 .
1401 // wx does the same.
1402 bool amuleIPV4Address::AnyAddress()
1404 m_endpoint->address(ip::address_v4::any());
1405 AddDebugLogLineN(logAsio, CFormat(wxT("AnyAddress: set to %s")) % IPAddress());
1406 return true;
1409 const CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint() const
1411 return * m_endpoint;
1414 CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint()
1416 return * m_endpoint;
1421 // Notification stuff
1423 namespace MuleNotify
1426 void LibSocketConnect(CLibSocket * socket, int error)
1428 if (socket->IsDestroying()) {
1429 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Destroying %s %d")) % socket->GetIP() % error);
1430 } else if (socket->GetProxyState()) {
1431 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Proxy %s %d")) % socket->GetIP() % error);
1432 socket->OnProxyEvent(wxSOCKET_CONNECTION);
1433 } else {
1434 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect %s %d")) %socket->GetIP() % error);
1435 socket->OnConnect(error);
1439 void LibSocketSend(CLibSocket * socket, int error)
1441 if (socket->IsDestroying()) {
1442 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Destroying %s %d")) % socket->GetIP() % error);
1443 } else if (socket->GetProxyState()) {
1444 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Proxy %s %d")) % socket->GetIP() % error);
1445 socket->OnProxyEvent(wxSOCKET_OUTPUT);
1446 } else {
1447 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend %s %d")) % socket->GetIP() % error);
1448 socket->OnSend(error);
1452 void LibSocketReceive(CLibSocket * socket, int error)
1454 socket->EventProcessed();
1455 if (socket->IsDestroying()) {
1456 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Destroying %s %d")) % socket->GetIP() % error);
1457 } else if (socket->GetProxyState()) {
1458 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Proxy %s %d")) % socket->GetIP() % error);
1459 socket->OnProxyEvent(wxSOCKET_INPUT);
1460 } else {
1461 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive %s %d")) % socket->GetIP() % error);
1462 socket->OnReceive(error);
1466 void LibSocketLost(CLibSocket * socket)
1468 if (socket->IsDestroying()) {
1469 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Destroying %s")) % socket->GetIP());
1470 } else if (socket->GetProxyState()) {
1471 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Proxy %s")) % socket->GetIP());
1472 socket->OnProxyEvent(wxSOCKET_LOST);
1473 } else {
1474 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost %s")) % socket->GetIP());
1475 socket->OnLost();
1479 void LibSocketDestroy(CLibSocket * socket)
1481 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocket_Destroy %s")) % socket->GetIP());
1482 delete socket;
1485 void ProxySocketEvent(CLibSocket * socket, int evt)
1487 AddDebugLogLineF(logAsio, CFormat(wxT("ProxySocketEvent %s %d")) % socket->GetIP() % evt);
1488 socket->OnProxyEvent(evt);
1491 void ServerTCPAccept(CLibSocketServer * socketServer)
1493 AddDebugLogLineF(logAsio, wxT("ServerTCP_Accept"));
1494 socketServer->OnAccept();
1497 } // namespace MuleNotify
1500 // Initialize MuleBoostVersion
1502 wxString MuleBoostVersion = CFormat(wxT("%d.%d")) % (BOOST_VERSION / 100000) % (BOOST_VERSION / 100 % 1000);