Improve the code by static code analysis [3/3]: Style
[amule.git] / src / LibSocketAsio.cpp
blob86f95c9a2f82546dfc975c571cc2096813f75995
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2011-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2011-2011 Stu Redman ( admin@amule.org / http://www.amule.org )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #ifdef HAVE_CONFIG_H
27 # include "config.h" // Needed for HAVE_BOOST_SOURCES
28 #endif
30 #ifdef _MSC_VER
31 #define _WIN32_WINNT 0x0501 // Boost complains otherwise
32 #endif
34 // Windows requires that Boost headers are included before wx headers.
35 // This works if precompiled headers are disabled for this file.
37 #define BOOST_ALL_NO_LIB
39 // Suppress warning caused by faulty boost/preprocessor/config/config.hpp in Boost 1.49
40 #if defined __GNUC__ && ! defined __GXX_EXPERIMENTAL_CXX0X__ && __cplusplus < 201103L
41 #define BOOST_PP_VARIADICS 0
42 #endif
45 #include <boost/asio.hpp>
46 #include <boost/bind.hpp>
47 #include <boost/version.hpp>
50 // Do away with building Boost.System, adding lib paths...
51 // Just include the single file and be done.
53 #ifdef HAVE_BOOST_SOURCES
54 # include <boost/../libs/system/src/error_code.cpp>
55 #else
56 # include <boost/system/error_code.hpp>
57 #endif
59 #include "LibSocket.h"
60 #include <wx/thread.h> // wxMutex
61 #include <wx/intl.h> // _()
62 #include <common/Format.h> // Needed for CFormat
63 #include "Logger.h"
64 #include "GuiEvents.h"
65 #include "amuleIPV4Address.h"
66 #include "MuleUDPSocket.h"
67 #include "OtherFunctions.h" // DeleteContents, MuleBoostVersion
68 #include "ScopedPtr.h"
70 using namespace boost::asio;
71 using namespace boost::system; // for error_code
72 static io_service s_io_service;
74 // Number of threads in the Asio thread pool
75 const int CAsioService::m_numberOfThreads = 4;
77 /**
78 * ASIO Client TCP socket implementation
81 class CamuleIPV4Endpoint : public ip::tcp::endpoint {
82 public:
83 CamuleIPV4Endpoint() {}
85 CamuleIPV4Endpoint(const CamuleIPV4Endpoint & impl) : ip::tcp::endpoint(impl) {}
86 CamuleIPV4Endpoint(const ip::tcp::endpoint & ep) { * this = ep; }
87 CamuleIPV4Endpoint(const ip::udp::endpoint & ep) { address(ep.address()); port(ep.port()); }
89 const CamuleIPV4Endpoint& operator = (const ip::tcp::endpoint & ep)
91 * (ip::tcp::endpoint *) this = ep;
92 return *this;
96 class CAsioSocketImpl
98 public:
99 // cppcheck-suppress uninitMemberVar m_readBufferPtr
100 CAsioSocketImpl(CLibSocket * libSocket) :
101 m_libSocket(libSocket),
102 m_strand(s_io_service),
103 m_timer(s_io_service)
105 m_OK = false;
106 m_blocksRead = false;
107 m_blocksWrite = false;
108 m_ErrorCode = 0;
109 m_readBuffer = NULL;
110 m_readBufferSize = 0;
111 m_readPending = false;
112 m_readBufferContent = 0;
113 m_eventPending = false;
114 m_port = 0;
115 m_sendBuffer = NULL;
116 m_connected = false;
117 m_closed = false;
118 m_isDestroying = false;
119 m_proxyState = false;
120 m_notify = true;
121 m_sync = false;
122 m_IP = wxT("?");
123 m_IPint = 0;
124 m_socket = new ip::tcp::socket(s_io_service);
126 // Set socket to non blocking
127 m_socket->non_blocking();
130 ~CAsioSocketImpl()
132 delete[] m_readBuffer;
133 delete[] m_sendBuffer;
136 void Notify(bool notify)
138 m_notify = notify;
141 bool Connect(const amuleIPV4Address& adr, bool wait)
143 if (!m_proxyState) {
144 SetIp(adr);
146 m_port = adr.Service();
147 m_closed = false;
148 m_OK = false;
149 m_sync = !m_notify; // set this once for the whole lifetime of the socket
150 AddDebugLogLineF(logAsio, CFormat(wxT("Connect %s %p")) % m_IP % this);
152 if (wait || m_sync) {
153 error_code ec;
154 m_socket->connect(adr.GetEndpoint(), ec);
155 m_OK = !ec;
156 m_connected = m_OK;
157 return m_OK;
158 } else {
159 m_socket->async_connect(adr.GetEndpoint(),
160 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleConnect, this, placeholders::error)));
161 // m_OK and return are false because we are not connected yet
162 return false;
166 bool IsConnected() const
168 return m_connected;
171 // For wxSocketClient, Ok won't return true unless the client is connected to a server.
172 bool IsOk() const
174 return m_OK;
177 bool IsDestroying() const
179 return m_isDestroying;
182 // Returns the actual error code
183 int LastError() const
185 return m_ErrorCode;
188 // Is reading blocked?
189 bool BlocksRead() const
191 return m_blocksRead;
194 // Is writing blocked?
195 bool BlocksWrite() const
197 return m_blocksWrite;
200 // Problem: wx sends an event when data gets available, so first there is an event, then Read() is called
201 // Asio can read async with callback, so you first read, then you get an event.
202 // Strategy:
203 // - Read some data in background into a buffer
204 // - Callback posts event when something is there
205 // - Read data from buffer
206 // - If data is exhausted, start reading more in background
207 // - If not, post another event (making sure events don't pile up though)
208 uint32 Read(char * buf, uint32 bytesToRead)
210 if (bytesToRead == 0) { // huh?
211 return 0;
214 if (m_sync) {
215 return ReadSync(buf, bytesToRead);
218 if (m_ErrorCode) {
219 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Error")) % m_IP % bytesToRead);
220 return 0;
223 if (m_readPending // Background read hasn't completed.
224 || m_readBufferContent == 0) { // shouldn't be if it's not pending
226 m_blocksRead = true;
227 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Block")) % m_IP % bytesToRead);
228 return 0;
231 m_blocksRead = false; // shouldn't be needed
233 // Read from our buffer
234 uint32 readCache = std::min(m_readBufferContent, bytesToRead);
235 memcpy(buf, m_readBufferPtr, readCache);
236 m_readBufferContent -= readCache;
237 m_readBufferPtr += readCache;
239 AddDebugLogLineF(logAsio, CFormat(wxT("Read2 %s %d - %d")) % m_IP % bytesToRead % readCache);
240 if (m_readBufferContent) {
241 // Data left, post another event
242 PostReadEvent(1);
243 } else {
244 // Nothing left, read more
245 StartBackgroundRead();
247 return readCache;
251 // Make a copy of the data and send it in background
252 // - unless a background send is already going on
253 uint32 Write(const void * buf, uint32 nbytes)
255 if (m_sync) {
256 return WriteSync(buf, nbytes);
259 if (m_sendBuffer) {
260 m_blocksWrite = true;
261 AddDebugLogLineF(logAsio, CFormat(wxT("Write blocks %d %p %s")) % nbytes % m_sendBuffer % m_IP);
262 return 0;
264 AddDebugLogLineF(logAsio, CFormat(wxT("Write %d %s")) % nbytes % m_IP);
265 m_sendBuffer = new char[nbytes];
266 memcpy(m_sendBuffer, buf, nbytes);
267 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchWrite, this, nbytes));
268 m_ErrorCode = 0;
269 return nbytes;
273 void Close()
275 if (!m_closed) {
276 m_closed = true;
277 m_connected = false;
278 if (m_sync || s_io_service.stopped()) {
279 DispatchClose();
280 } else {
281 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchClose, this));
287 void Destroy()
289 if (m_isDestroying) {
290 AddDebugLogLineC(logAsio, CFormat(wxT("Destroy() already dying socket %p %p %s")) % m_libSocket % this % m_IP);
291 return;
293 m_isDestroying = true;
294 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p %s")) % m_libSocket % this % m_IP);
295 Close();
296 if (m_sync || s_io_service.stopped()) {
297 HandleDestroy();
298 } else {
299 // Close prevents creation of any more callbacks, but does not clear any callbacks already
300 // sitting in Asio's event queue (I have seen such a crash).
301 // So create a delay timer so they can be called until core is notified.
302 m_timer.expires_from_now(boost::posix_time::seconds(1));
303 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleDestroy, this)));
308 wxString GetPeer()
310 return m_IP;
313 uint32 GetPeerInt()
315 return m_IPint;
319 // Bind socket to local endpoint if user wants to choose the local address
321 void SetLocal(const amuleIPV4Address& local)
323 error_code ec;
324 if (!m_socket->is_open()) {
325 // Socket is usually still closed when this is called
326 m_socket->open(boost::asio::ip::tcp::v4(), ec);
327 if (ec) {
328 AddDebugLogLineC(logAsio, CFormat(wxT("Can't open socket : %s")) % ec.message());
332 // We are using random (OS-defined) local ports.
333 // To set a constant output port, first call
334 // m_socket->set_option(socket_base::reuse_address(true));
335 // and then set the endpoint's port to it.
337 CamuleIPV4Endpoint endpoint(local.GetEndpoint());
338 endpoint.port(0);
339 m_socket->bind(endpoint, ec);
340 if (ec) {
341 AddDebugLogLineC(logAsio, CFormat(wxT("Can't bind socket to local endpoint %s : %s"))
342 % local.IPAddress() % ec.message());
343 } else {
344 AddDebugLogLineF(logAsio, CFormat(wxT("Bound socket to local endpoint %s")) % local.IPAddress());
349 void EventProcessed()
351 m_eventPending = false;
354 void SetWrapSocket(CLibSocket * socket)
356 m_libSocket = socket;
357 // Also do some setting up
358 m_OK = true;
359 m_connected = true;
360 // Start reading
361 StartBackgroundRead();
364 bool UpdateIP()
366 error_code ec;
367 amuleIPV4Address addr = CamuleIPV4Endpoint(m_socket->remote_endpoint(ec));
368 if (SetError(ec)) {
369 AddDebugLogLineN(logAsio, CFormat(wxT("UpdateIP failed %p %s")) % this % ec.message());
370 return false;
372 SetIp(addr);
373 m_port = addr.Service();
374 AddDebugLogLineF(logAsio, CFormat(wxT("UpdateIP %s %d %p")) % m_IP % m_port % this);
375 return true;
378 const wxChar * GetIP() const { return m_IP; }
379 uint16 GetPort() const { return m_port; }
381 ip::tcp::socket & GetAsioSocket()
383 return * m_socket;
386 bool GetProxyState() const { return m_proxyState; }
388 void SetProxyState(bool state, const amuleIPV4Address * adr)
390 m_proxyState = state;
391 if (state) {
392 // Start. Get the true IP for logging.
393 wxASSERT(adr);
394 SetIp(*adr);
395 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to proxy %s")) % m_IP);
396 } else {
397 // Transition from proxy to normal mode
398 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to normal %s")) % m_IP);
399 m_ErrorCode = 0;
403 private:
405 // Dispatch handlers
406 // Access to m_socket is all bundled in the thread running s_io_service to avoid
407 // concurrent access to the socket from several threads.
408 // So once things are running (after connect), all access goes through one of these handlers.
410 void DispatchClose()
412 error_code ec;
413 m_socket->close(ec);
414 if (ec) {
415 AddDebugLogLineC(logAsio, CFormat(wxT("Close error %s %s")) % m_IP % ec.message());
416 } else {
417 AddDebugLogLineF(logAsio, CFormat(wxT("Closed %s")) % m_IP);
421 void DispatchBackgroundRead()
423 AddDebugLogLineF(logAsio, CFormat(wxT("DispatchBackgroundRead %s")) % m_IP);
424 m_socket->async_read_some(null_buffers(),
425 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleRead, this, placeholders::error)));
428 void DispatchWrite(uint32 nbytes)
430 async_write(*m_socket, buffer(m_sendBuffer, nbytes),
431 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleSend, this, placeholders::error, placeholders::bytes_transferred)));
435 // Completion handlers for async requests
438 void HandleConnect(const error_code& err)
440 m_OK = !err;
441 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect %d %s")) % m_OK % m_IP);
442 if (m_isDestroying) {
443 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect: socket pending for deletion %s")) % m_IP);
444 } else {
445 CoreNotify_LibSocketConnect(m_libSocket, err.value());
446 if (m_OK) {
447 // After connect also send a OUTPUT event to show data is available
448 CoreNotify_LibSocketSend(m_libSocket, 0);
449 // Start reading
450 StartBackgroundRead();
451 m_connected = true;
456 void HandleSend(const error_code& err, size_t bytes_transferred)
458 delete[] m_sendBuffer;
459 m_sendBuffer = NULL;
461 if (m_isDestroying) {
462 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend: socket pending for deletion %s")) % m_IP);
463 } else {
464 if (SetError(err)) {
465 AddDebugLogLineN(logAsio, CFormat(wxT("HandleSend Error %d %s")) % bytes_transferred % m_IP);
466 PostLostEvent();
467 } else {
468 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend %d %s")) % bytes_transferred % m_IP);
469 m_blocksWrite = false;
470 CoreNotify_LibSocketSend(m_libSocket, m_ErrorCode);
475 void HandleRead(const error_code & ec)
477 if (m_isDestroying) {
478 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead: socket pending for deletion %s")) % m_IP);
481 if (SetError(ec)) {
482 // This is what we get in Windows when a connection gets closed from remote.
483 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError %s %s")) % m_IP % ec.message());
484 PostLostEvent();
485 return;
488 error_code ec2;
489 uint32 avail = m_socket->available(ec2);
490 if (SetError(ec2)) {
491 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError available %d %s %s")) % avail % m_IP % ec2.message());
492 PostLostEvent();
493 return;
495 if (avail == 0) {
496 // This is what we get in Linux when a connection gets closed from remote.
497 AddDebugLogLineF(logAsio, CFormat(wxT("HandleReadError nothing available %s")) % m_IP);
498 SetError();
499 PostLostEvent();
500 return;
502 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead %d %s")) % avail % m_IP);
504 // adjust (or create) our read buffer
505 if (m_readBufferSize < avail) {
506 delete[] m_readBuffer;
507 m_readBuffer = new char[avail];
508 m_readBufferSize = avail;
510 m_readBufferPtr = m_readBuffer;
512 // read available data
513 m_readBufferContent = m_socket->read_some(buffer(m_readBuffer, avail), ec2);
514 if (SetError(ec2) || m_readBufferContent == 0) {
515 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError read %d %s %s")) % m_readBufferContent % m_IP % ec2.message());
516 PostLostEvent();
517 return;
520 m_readPending = false;
521 m_blocksRead = false;
522 PostReadEvent(2);
525 void HandleDestroy()
527 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p %s")) % m_libSocket % this % m_IP);
528 CoreNotify_LibSocketDestroy(m_libSocket);
533 // Other functions
536 void StartBackgroundRead()
538 m_readPending = true;
539 m_readBufferContent = 0;
540 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchBackgroundRead, this));
543 void PostReadEvent(int from)
545 if (!m_eventPending) {
546 AddDebugLogLineF(logAsio, CFormat(wxT("Post read event %d %s")) % from % m_IP);
547 m_eventPending = true;
548 CoreNotify_LibSocketReceive(m_libSocket, m_ErrorCode);
552 void PostLostEvent()
554 if (!m_isDestroying && !m_closed) {
555 CoreNotify_LibSocketLost(m_libSocket);
559 void SetError()
561 m_ErrorCode = 2;
564 bool SetError(const error_code & err)
566 if (err) {
567 SetError();
568 } else {
569 m_ErrorCode = 0;
571 return m_ErrorCode != 0;
575 // Synchronous sockets (amulecmd)
577 uint32 ReadSync(char * buf, uint32 bytesToRead)
579 error_code ec;
580 uint32 received = read(*m_socket, buffer(buf, bytesToRead), ec);
581 SetError(ec);
582 return received;
585 uint32 WriteSync(const void * buf, uint32 nbytes)
587 error_code ec;
588 uint32 sent = write(*m_socket, buffer(buf, nbytes), ec);
589 SetError(ec);
590 return sent;
594 // Access to even const & wxString is apparently not thread-safe.
595 // Locks are set/removed in wx and reference counts can go astray.
596 // So store our IP string in a wxString which is used nowhere.
597 // Store a pointer to its string buffer as well and use THAT everywhere.
599 void SetIp(const amuleIPV4Address& adr)
601 m_IPstring = adr.IPAddress();
602 m_IP = m_IPstring.c_str();
603 m_IPint = StringIPtoUint32(m_IPstring);
606 CLibSocket * m_libSocket;
607 ip::tcp::socket * m_socket;
608 // remote IP
609 wxString m_IPstring; // as String (use nowhere because of threading!)
610 const wxChar * m_IP; // as char* (use in debug logs)
611 uint32 m_IPint; // as int
612 uint16 m_port; // remote port
613 bool m_OK;
614 int m_ErrorCode;
615 bool m_blocksRead;
616 bool m_blocksWrite;
617 char * m_readBuffer;
618 uint32 m_readBufferSize;
619 char * m_readBufferPtr;
620 bool m_readPending;
621 uint32 m_readBufferContent;
622 bool m_eventPending;
623 char * m_sendBuffer;
624 io_service::strand m_strand; // handle synchronisation in io_service thread pool
625 deadline_timer m_timer;
626 bool m_connected;
627 bool m_closed;
628 bool m_isDestroying; // true if Destroy() was called
629 bool m_proxyState;
630 bool m_notify; // set by Notify()
631 bool m_sync; // copied from !m_notify on Connect()
636 * Library socket wrapper
639 CLibSocket::CLibSocket(int /* flags */)
641 m_aSocket = new CAsioSocketImpl(this);
645 CLibSocket::~CLibSocket()
647 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibSocket() %p %p %s")) % this % m_aSocket % m_aSocket->GetIP());
648 delete m_aSocket;
652 bool CLibSocket::Connect(const amuleIPV4Address& adr, bool wait)
654 return m_aSocket->Connect(adr, wait);
658 bool CLibSocket::IsConnected() const
660 return m_aSocket->IsConnected();
664 bool CLibSocket::IsOk() const
666 return m_aSocket->IsOk();
670 wxString CLibSocket::GetPeer()
672 return m_aSocket->GetPeer();
676 uint32 CLibSocket::GetPeerInt()
678 return m_aSocket->GetPeerInt();
682 void CLibSocket::Destroy()
684 m_aSocket->Destroy();
688 bool CLibSocket::IsDestroying() const
690 return m_aSocket->IsDestroying();
694 void CLibSocket::Notify(bool notify)
696 m_aSocket->Notify(notify);
700 uint32 CLibSocket::Read(void * buffer, uint32 nbytes)
702 return m_aSocket->Read((char *) buffer, nbytes);
706 uint32 CLibSocket::Write(const void * buffer, uint32 nbytes)
708 return m_aSocket->Write(buffer, nbytes);
712 void CLibSocket::Close()
714 m_aSocket->Close();
718 int CLibSocket::LastError() const
720 return m_aSocket->LastError();
724 void CLibSocket::SetLocal(const amuleIPV4Address& local)
726 m_aSocket->SetLocal(local);
731 // new Stuff
733 bool CLibSocket::BlocksRead() const
735 return m_aSocket->BlocksRead();
739 bool CLibSocket::BlocksWrite() const
741 return m_aSocket->BlocksWrite();
745 void CLibSocket::EventProcessed()
747 m_aSocket->EventProcessed();
751 void CLibSocket::LinkSocketImpl(class CAsioSocketImpl * socket)
753 delete m_aSocket;
754 m_aSocket = socket;
755 m_aSocket->SetWrapSocket(this);
759 const wxChar * CLibSocket::GetIP() const
761 return m_aSocket->GetIP();
765 bool CLibSocket::GetProxyState() const
767 return m_aSocket->GetProxyState();
771 void CLibSocket::SetProxyState(bool state, const amuleIPV4Address * adr)
773 m_aSocket->SetProxyState(state, adr);
778 * ASIO TCP socket server
781 class CAsioSocketServerImpl : public ip::tcp::acceptor
783 public:
784 CAsioSocketServerImpl(const amuleIPV4Address & adr, CLibSocketServer * libSocketServer)
785 : ip::tcp::acceptor(s_io_service),
786 m_libSocketServer(libSocketServer),
787 m_currentSocket(NULL),
788 m_strand(s_io_service)
790 m_ok = false;
791 m_socketAvailable = false;
793 try {
794 open(adr.GetEndpoint().protocol());
795 set_option(ip::tcp::acceptor::reuse_address(true));
796 bind(adr.GetEndpoint());
797 listen();
798 StartAccept();
799 m_ok = true;
800 AddDebugLogLineN(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d")) % adr.IPAddress() % adr.Service());
801 } catch (const system_error& err) {
802 AddDebugLogLineC(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d failed - %s")) % adr.IPAddress() % adr.Service() % err.code().message());
806 ~CAsioSocketServerImpl()
810 // For wxSocketServer, Ok will return true if the server could bind to the specified address and is already listening for new connections.
811 bool IsOk() const { return m_ok; }
813 void Close() { close(); }
815 bool AcceptWith(CLibSocket & socket)
817 if (!m_socketAvailable) {
818 AddDebugLogLineF(logAsio, wxT("AcceptWith: nothing there"));
819 return false;
822 // return the socket we received
823 socket.LinkSocketImpl(m_currentSocket.release());
825 // check if we have another socket ready for reception
826 m_currentSocket.reset(new CAsioSocketImpl(NULL));
827 error_code ec;
828 // async_accept does not work if server is non-blocking
829 // temporarily switch it to non-blocking
830 non_blocking(true);
831 // we are set to non-blocking, so this returns right away
832 accept(m_currentSocket->GetAsioSocket(), ec);
833 // back to blocking
834 non_blocking(false);
835 if (ec || !m_currentSocket->UpdateIP()) {
836 // nothing there
837 m_socketAvailable = false;
838 // start getting another one
839 StartAccept();
840 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, getting another socket in background"));
841 } else {
842 // we got another socket right away
843 m_socketAvailable = true; // it is already true, but this improves readability
844 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, another socket is available"));
845 // aMule actually doesn't need a notification as it polls the listen socket.
846 // amuleweb does need it though
847 CoreNotify_ServerTCPAccept(m_libSocketServer);
850 return true;
853 bool SocketAvailable() const { return m_socketAvailable; }
855 private:
857 void StartAccept()
859 m_currentSocket.reset(new CAsioSocketImpl(NULL));
860 async_accept(m_currentSocket->GetAsioSocket(),
861 m_strand.wrap(boost::bind(& CAsioSocketServerImpl::HandleAccept, this, placeholders::error)));
864 void HandleAccept(const error_code& error)
866 if (error) {
867 AddDebugLogLineC(logAsio, CFormat(wxT("Error in HandleAccept: %s")) % error.message());
868 } else {
869 if (m_currentSocket->UpdateIP()) {
870 AddDebugLogLineN(logAsio, CFormat(wxT("HandleAccept received a connection from %s:%d"))
871 % m_currentSocket->GetIP() % m_currentSocket->GetPort());
872 m_socketAvailable = true;
873 CoreNotify_ServerTCPAccept(m_libSocketServer);
874 return;
875 } else {
876 AddDebugLogLineN(logAsio, wxT("Error in HandleAccept: invalid socket"));
879 // We were not successful. Try again.
880 // Post the request to the event queue to make sure it doesn't get called immediately.
881 m_strand.post(boost::bind(& CAsioSocketServerImpl::StartAccept, this));
884 // The wrapper object
885 CLibSocketServer * m_libSocketServer;
886 // Startup ok
887 bool m_ok;
888 // The last socket that connected to us
889 CScopedPtr<CAsioSocketImpl> m_currentSocket;
890 // Is there a socket available?
891 bool m_socketAvailable;
892 io_service::strand m_strand; // handle synchronisation in io_service thread pool
896 CLibSocketServer::CLibSocketServer(const amuleIPV4Address& adr, int /* flags */)
898 m_aServer = new CAsioSocketServerImpl(adr, this);
902 CLibSocketServer::~CLibSocketServer()
904 delete m_aServer;
908 // Accepts an incoming connection request, and creates a new CLibSocket object which represents the server-side of the connection.
909 // Only used in CamuleApp::ListenSocketHandler() and we don't get there.
910 CLibSocket * CLibSocketServer::Accept(bool /* wait */)
912 wxFAIL;
913 return NULL;
917 // Accept an incoming connection using the specified socket object.
918 bool CLibSocketServer::AcceptWith(CLibSocket & socket, bool wait)
920 wxASSERT(!wait);
921 return m_aServer->AcceptWith(socket);
925 bool CLibSocketServer::IsOk() const
927 return m_aServer->IsOk();
931 void CLibSocketServer::Close()
933 m_aServer->Close();
937 bool CLibSocketServer::SocketAvailable()
939 return m_aServer->SocketAvailable();
944 * ASIO UDP socket implementation
947 class CAsioUDPSocketImpl
949 private:
950 // UDP data block
951 class CUDPData {
952 public:
953 char * buffer;
954 uint32 size;
955 amuleIPV4Address ipadr;
957 CUDPData(const void * src, uint32 _size, amuleIPV4Address adr) :
958 size(_size), ipadr(adr)
960 buffer = new char[size];
961 memcpy(buffer, src, size);
964 ~CUDPData()
966 delete[] buffer;
970 public:
971 CAsioUDPSocketImpl(const amuleIPV4Address &address, int /* flags */, CLibUDPSocket * libSocket) :
972 m_libSocket(libSocket),
973 m_strand(s_io_service),
974 m_timer(s_io_service),
975 m_address(address)
977 m_muleSocket = NULL;
978 m_socket = NULL;
979 m_readBuffer = new char[CMuleUDPSocket::UDP_BUFFER_SIZE];
980 m_OK = true;
981 CreateSocket();
984 ~CAsioUDPSocketImpl()
986 AddDebugLogLineF(logAsio, wxT("UDP ~CAsioUDPSocketImpl"));
987 delete m_socket;
988 delete[] m_readBuffer;
989 DeleteContents(m_receiveBuffers);
992 void SetClientData(CMuleUDPSocket * muleSocket)
994 AddDebugLogLineF(logAsio, wxT("UDP SetClientData"));
995 m_muleSocket = muleSocket;
998 uint32 RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1000 CUDPData * recdata;
1002 wxMutexLocker lock(m_receiveBuffersLock);
1003 if (m_receiveBuffers.empty()) {
1004 AddDebugLogLineN(logAsio, wxT("UDP RecvFromError no data"));
1005 return 0;
1007 recdata = * m_receiveBuffers.begin();
1008 m_receiveBuffers.pop_front();
1010 uint32 read = recdata->size;
1011 if (read > nBytes) {
1012 // should not happen
1013 AddDebugLogLineN(logAsio, CFormat(wxT("UDP RecvFromError too much data %d")) % read);
1014 read = nBytes;
1016 memcpy(buf, recdata->buffer, read);
1017 addr = recdata->ipadr;
1018 delete recdata;
1019 return read;
1022 uint32 SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1024 // Collect data, make a copy of the buffer's content
1025 CUDPData * recdata = new CUDPData(buf, nBytes, addr);
1026 AddDebugLogLineF(logAsio, CFormat(wxT("UDP SendTo %d to %s")) % nBytes % addr.IPAddress());
1027 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchSendTo, this, recdata));
1028 return nBytes;
1031 bool IsOk() const
1033 return m_OK;
1036 void Close()
1038 if (s_io_service.stopped()) {
1039 DispatchClose();
1040 } else {
1041 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchClose, this));
1045 void Destroy()
1047 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p")) % m_libSocket % this);
1048 Close();
1049 if (s_io_service.stopped()) {
1050 HandleDestroy();
1051 } else {
1052 // Close prevents creation of any more callbacks, but does not clear any callbacks already
1053 // sitting in Asio's event queue (I have seen such a crash).
1054 // So create a delay timer so they can be called until core is notified.
1055 m_timer.expires_from_now(boost::posix_time::seconds(1));
1056 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleDestroy, this)));
1061 private:
1063 // Dispatch handlers
1064 // Access to m_socket is all bundled in the thread running s_io_service to avoid
1065 // concurrent access to the socket from several threads.
1066 // So once things are running (after connect), all access goes through one of these handlers.
1068 void DispatchClose()
1070 error_code ec;
1071 m_socket->close(ec);
1072 if (ec) {
1073 AddDebugLogLineC(logAsio, CFormat(wxT("UDP Close error %s")) % ec.message());
1074 } else {
1075 AddDebugLogLineF(logAsio, wxT("UDP Closed"));
1079 void DispatchSendTo(CUDPData * recdata)
1081 ip::udp::endpoint endpoint(recdata->ipadr.GetEndpoint().address(), recdata->ipadr.Service());
1083 AddDebugLogLineF(logAsio, CFormat(wxT("UDP DispatchSendTo %d to %s:%d")) % recdata->size
1084 % endpoint.address().to_string() % endpoint.port());
1085 m_socket->async_send_to(buffer(recdata->buffer, recdata->size), endpoint,
1086 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleSendTo, this, placeholders::error, placeholders::bytes_transferred, recdata)));
1090 // Completion handlers for async requests
1093 void HandleRead(const error_code & ec, size_t received)
1095 if (ec) {
1096 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleReadError %s")) % ec.message());
1097 } else if (received == 0) {
1098 AddDebugLogLineF(logAsio, wxT("UDP HandleReadError nothing available"));
1099 } else if (m_muleSocket == NULL) {
1100 AddDebugLogLineN(logAsio, wxT("UDP HandleReadError no handler"));
1101 } else {
1103 amuleIPV4Address ipadr = amuleIPV4Address(CamuleIPV4Endpoint(m_receiveEndpoint));
1104 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleRead %d %s:%d")) % received % ipadr.IPAddress() % ipadr.Service());
1106 // create our read buffer
1107 CUDPData * recdata = new CUDPData(m_readBuffer, received, ipadr);
1109 wxMutexLocker lock(m_receiveBuffersLock);
1110 m_receiveBuffers.push_back(recdata);
1112 CoreNotify_UDPSocketReceive(m_muleSocket);
1114 StartBackgroundRead();
1117 void HandleSendTo(const error_code & ec, size_t sent, CUDPData * recdata)
1119 if (ec) {
1120 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError %s")) % ec.message());
1121 } else if (sent != recdata->size) {
1122 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError tosend: %d sent %d")) % recdata->size % sent);
1124 if (m_muleSocket == NULL) {
1125 AddDebugLogLineN(logAsio, wxT("UDP HandleSendToError no handler"));
1126 } else {
1127 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleSendTo %d to %s")) % sent % recdata->ipadr.IPAddress());
1128 CoreNotify_UDPSocketSend(m_muleSocket);
1130 delete recdata;
1133 void HandleDestroy()
1135 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p")) % m_libSocket % this);
1136 delete m_libSocket;
1140 // Other functions
1143 void CreateSocket()
1145 try {
1146 delete m_socket;
1147 ip::udp::endpoint endpoint(m_address.GetEndpoint().address(), m_address.Service());
1148 m_socket = new ip::udp::socket(s_io_service, endpoint);
1149 AddDebugLogLineN(logAsio, CFormat(wxT("Created UDP socket %s %d")) % m_address.IPAddress() % m_address.Service());
1150 StartBackgroundRead();
1151 } catch (const system_error& err) {
1152 AddLogLineC(CFormat(wxT("Error creating UDP socket %s %d : %s")) % m_address.IPAddress() % m_address.Service() % err.code().message());
1153 m_socket = NULL;
1154 m_OK = false;
1158 void StartBackgroundRead()
1160 m_socket->async_receive_from(buffer(m_readBuffer, CMuleUDPSocket::UDP_BUFFER_SIZE), m_receiveEndpoint,
1161 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleRead, this, placeholders::error, placeholders::bytes_transferred)));
1164 CLibUDPSocket * m_libSocket;
1165 ip::udp::socket * m_socket;
1166 CMuleUDPSocket * m_muleSocket;
1167 bool m_OK;
1168 io_service::strand m_strand; // handle synchronisation in io_service thread pool
1169 deadline_timer m_timer;
1170 amuleIPV4Address m_address;
1172 // One fix receive buffer
1173 char * m_readBuffer;
1174 // and a list of dynamic buffers. UDP data may be coming in faster
1175 // than the main loop can handle it.
1176 std::list<CUDPData *> m_receiveBuffers;
1177 wxMutex m_receiveBuffersLock;
1179 // Address of last reception
1180 ip::udp::endpoint m_receiveEndpoint;
1185 * Library UDP socket wrapper
1188 CLibUDPSocket::CLibUDPSocket(amuleIPV4Address &address, int flags)
1190 m_aSocket = new CAsioUDPSocketImpl(address, flags, this);
1194 CLibUDPSocket::~CLibUDPSocket()
1196 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibUDPSocket() %p %p")) % this % m_aSocket);
1197 delete m_aSocket;
1201 bool CLibUDPSocket::IsOk() const
1203 return m_aSocket->IsOk();
1207 uint32 CLibUDPSocket::RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1209 return m_aSocket->RecvFrom(addr, buf, nBytes);
1213 uint32 CLibUDPSocket::SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1215 return m_aSocket->SendTo(addr, buf, nBytes);
1219 void CLibUDPSocket::SetClientData(CMuleUDPSocket * muleSocket)
1221 m_aSocket->SetClientData(muleSocket);
1225 int CLibUDPSocket::LastError() const
1227 return !IsOk();
1231 void CLibUDPSocket::Close()
1233 m_aSocket->Close();
1237 void CLibUDPSocket::Destroy()
1239 m_aSocket->Destroy();
1244 * CAsioService - ASIO event loop thread
1247 class CAsioServiceThread : public wxThread {
1248 public:
1249 CAsioServiceThread() : wxThread(wxTHREAD_JOINABLE)
1251 static int count = 0;
1252 m_threadNumber = ++count;
1253 Create();
1254 Run();
1257 void * Entry()
1259 AddLogLineNS(CFormat(_("Asio thread %d started")) % m_threadNumber);
1260 io_service::work worker(s_io_service); // keep io_service running
1261 s_io_service.run();
1262 AddDebugLogLineN(logAsio, CFormat(wxT("Asio thread %d stopped")) % m_threadNumber);
1264 return NULL;
1267 private:
1268 int m_threadNumber;
1272 * The constructor starts the thread.
1274 CAsioService::CAsioService()
1276 m_threads = new CAsioServiceThread[m_numberOfThreads];
1280 CAsioService::~CAsioService()
1285 void CAsioService::Stop()
1287 if (!m_threads) {
1288 return;
1290 s_io_service.stop();
1291 // Wait for threads to exit
1292 for (int i = 0; i < m_numberOfThreads; i++) {
1293 CAsioServiceThread * t = m_threads + i;
1294 t->Wait();
1296 delete[] m_threads;
1297 m_threads = 0;
1305 * amuleIPV4Address
1308 amuleIPV4Address::amuleIPV4Address()
1310 m_endpoint = new CamuleIPV4Endpoint();
1313 amuleIPV4Address::amuleIPV4Address(const amuleIPV4Address &a)
1315 *this = a;
1318 amuleIPV4Address::amuleIPV4Address(const CamuleIPV4Endpoint &ep)
1320 *this = ep;
1323 amuleIPV4Address::~amuleIPV4Address()
1325 delete m_endpoint;
1328 amuleIPV4Address& amuleIPV4Address::operator=(const amuleIPV4Address &a)
1330 m_endpoint = new CamuleIPV4Endpoint(* a.m_endpoint);
1331 return *this;
1334 amuleIPV4Address& amuleIPV4Address::operator=(const CamuleIPV4Endpoint &ep)
1336 m_endpoint = new CamuleIPV4Endpoint(ep);
1337 return *this;
1340 bool amuleIPV4Address::Hostname(const wxString& name)
1342 if (name.IsEmpty()) {
1343 return false;
1345 // This is usually just an IP.
1346 std::string sname(unicode2char(name));
1347 error_code ec;
1348 ip::address_v4 adr = ip::address_v4::from_string(sname, ec);
1349 if (!ec) {
1350 m_endpoint->address(adr);
1351 return true;
1353 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") failed, not an IP address %s")) % name % ec.message());
1355 // Try to resolve (sync). Normally not required. Unless you type in your hostname as "local IP address" or something.
1356 error_code ec2;
1357 ip::tcp::resolver res(s_io_service);
1358 // We only want to get IPV4 addresses.
1359 ip::tcp::resolver::query query(ip::tcp::v4(), sname, "");
1360 ip::tcp::resolver::iterator endpoint_iterator = res.resolve(query, ec2);
1361 if (ec2) {
1362 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: %s")) % name % ec2.message());
1363 return false;
1365 if (endpoint_iterator == ip::tcp::resolver::iterator()) {
1366 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: no address found")) % name);
1367 return false;
1369 m_endpoint->address(endpoint_iterator->endpoint().address());
1370 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolved to %s")) % name % IPAddress());
1371 return true;
1374 bool amuleIPV4Address::Service(uint16 service)
1376 if (service == 0) {
1377 return false;
1379 m_endpoint->port(service);
1380 return true;
1383 uint16 amuleIPV4Address::Service() const
1385 return m_endpoint->port();
1388 bool amuleIPV4Address::IsLocalHost() const
1390 return m_endpoint->address().is_loopback();
1393 wxString amuleIPV4Address::IPAddress() const
1395 return CFormat(wxT("%s")) % m_endpoint->address().to_string();
1398 // "Set address to any of the addresses of the current machine."
1399 // This just sets the address to 0.0.0.0 .
1400 // wx does the same.
1401 bool amuleIPV4Address::AnyAddress()
1403 m_endpoint->address(ip::address_v4::any());
1404 AddDebugLogLineN(logAsio, CFormat(wxT("AnyAddress: set to %s")) % IPAddress());
1405 return true;
1408 const CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint() const
1410 return * m_endpoint;
1413 CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint()
1415 return * m_endpoint;
1420 // Notification stuff
1422 namespace MuleNotify
1425 void LibSocketConnect(CLibSocket * socket, int error)
1427 if (socket->IsDestroying()) {
1428 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Destroying %s %d")) % socket->GetIP() % error);
1429 } else if (socket->GetProxyState()) {
1430 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Proxy %s %d")) % socket->GetIP() % error);
1431 socket->OnProxyEvent(wxSOCKET_CONNECTION);
1432 } else {
1433 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect %s %d")) %socket->GetIP() % error);
1434 socket->OnConnect(error);
1438 void LibSocketSend(CLibSocket * socket, int error)
1440 if (socket->IsDestroying()) {
1441 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Destroying %s %d")) % socket->GetIP() % error);
1442 } else if (socket->GetProxyState()) {
1443 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Proxy %s %d")) % socket->GetIP() % error);
1444 socket->OnProxyEvent(wxSOCKET_OUTPUT);
1445 } else {
1446 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend %s %d")) % socket->GetIP() % error);
1447 socket->OnSend(error);
1451 void LibSocketReceive(CLibSocket * socket, int error)
1453 socket->EventProcessed();
1454 if (socket->IsDestroying()) {
1455 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Destroying %s %d")) % socket->GetIP() % error);
1456 } else if (socket->GetProxyState()) {
1457 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Proxy %s %d")) % socket->GetIP() % error);
1458 socket->OnProxyEvent(wxSOCKET_INPUT);
1459 } else {
1460 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive %s %d")) % socket->GetIP() % error);
1461 socket->OnReceive(error);
1465 void LibSocketLost(CLibSocket * socket)
1467 if (socket->IsDestroying()) {
1468 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Destroying %s")) % socket->GetIP());
1469 } else if (socket->GetProxyState()) {
1470 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Proxy %s")) % socket->GetIP());
1471 socket->OnProxyEvent(wxSOCKET_LOST);
1472 } else {
1473 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost %s")) % socket->GetIP());
1474 socket->OnLost();
1478 void LibSocketDestroy(CLibSocket * socket)
1480 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocket_Destroy %s")) % socket->GetIP());
1481 delete socket;
1484 void ProxySocketEvent(CLibSocket * socket, int evt)
1486 AddDebugLogLineF(logAsio, CFormat(wxT("ProxySocketEvent %s %d")) % socket->GetIP() % evt);
1487 socket->OnProxyEvent(evt);
1490 void ServerTCPAccept(CLibSocketServer * socketServer)
1492 AddDebugLogLineF(logAsio, wxT("ServerTCP_Accept"));
1493 socketServer->OnAccept();
1496 } // namespace MuleNotify
1499 // Initialize MuleBoostVersion
1501 wxString MuleBoostVersion = CFormat(wxT("%d.%d")) % (BOOST_VERSION / 100000) % (BOOST_VERSION / 100 % 1000);