2 // This file is part of the aMule Project.
4 // Copyright (c) 2004-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2004-2008 Angel Vidal ( kry@amule.org )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
34 #include "ECPacket.h" // Needed for CECPacket
36 #define EC_COMPRESSION_LEVEL Z_BEST_COMPRESSION
37 #define EC_MAX_UNCOMPRESSED 1024
40 #define __attribute__(x)
43 // If your compiler gives errors on these lines, just remove them.
44 int utf8_mbtowc(wchar_t *p
, const unsigned char *s
, int n
) __attribute__((__visibility__("internal")));
45 int utf8_wctomb(unsigned char *s
, wchar_t wc
, int maxlen
) __attribute__((__visibility__("internal")));
46 int utf8_mb_remain(char c
) __attribute__((__pure__
));
48 /*----------=> Import from the Linux kernel <=----------*/
54 * Sample implementation from Unicode home page.
55 * http://www.stonehand.com/unicode/standard/fss-utf.html
65 static struct utf8_table utf8_table
[] =
67 {0x80, 0x00, 0*6, 0x7F, 0, /* 1 byte sequence */},
68 {0xE0, 0xC0, 1*6, 0x7FF, 0x80, /* 2 byte sequence */},
69 {0xF0, 0xE0, 2*6, 0xFFFF, 0x800, /* 3 byte sequence */},
70 {0xF8, 0xF0, 3*6, 0x1FFFFF, 0x10000, /* 4 byte sequence */},
71 {0xFC, 0xF8, 4*6, 0x3FFFFFF, 0x200000, /* 5 byte sequence */},
72 {0xFE, 0xFC, 5*6, 0x7FFFFFFF, 0x4000000, /* 6 byte sequence */},
73 {0, 0, 0, 0, 0, /* end of table */}
76 int utf8_mbtowc(uint32_t *p
, const unsigned char *s
, int n
)
85 for (t
= utf8_table
; t
->cmask
; t
++) {
87 if ((c0
& t
->cmask
) == t
->cval
) {
97 c
= (*s
^ 0x80) & 0xFF;
105 int utf8_wctomb(unsigned char *s
, uint32_t wc
, int maxlen
)
109 struct utf8_table
*t
;
113 for (t
= utf8_table
; t
->cmask
&& maxlen
; t
++, maxlen
--) {
117 *s
= t
->cval
| (l
>> c
);
121 *s
= 0x80 | ((l
>> c
) & 0x3F);
128 /*----------=> End of Import <=----------*/
130 int utf8_mb_remain(char c
)
133 for (i
= 0; i
< 5; ++i
) {
134 if ((c
& utf8_table
[i
].cmask
) == utf8_table
[i
].cval
) break;
140 void CQueuedData::Write(const void *data
, size_t len
)
142 const size_t canWrite
= std::min(GetRemLength(), len
);
143 wxASSERT(len
== canWrite
);
145 memcpy(m_wr_ptr
, data
, canWrite
);
146 m_wr_ptr
+= canWrite
;
150 void CQueuedData::WriteAt(const void *data
, size_t len
, size_t offset
)
152 wxASSERT(len
+ offset
<= m_data
.size());
153 if (offset
> m_data
.size()) {
155 } else if (offset
+ len
> m_data
.size()) {
156 len
= m_data
.size() - offset
;
159 memcpy(&m_data
[0] + offset
, data
, len
);
163 void CQueuedData::Read(void *data
, size_t len
)
165 const size_t canRead
= std::min(GetUnreadDataLength(), len
);
166 wxASSERT(len
== canRead
);
168 memcpy(data
, m_rd_ptr
, canRead
);
173 void CQueuedData::WriteToSocket(CECSocket
*sock
)
175 wxCHECK_RET(m_rd_ptr
< m_wr_ptr
,
176 wxT("Reading past written data in WriteToSocket"));
178 sock
->SocketWrite(m_rd_ptr
, GetUnreadDataLength());
179 m_rd_ptr
+= sock
->GetLastCount();
183 void CQueuedData::ReadFromSocket(CECSocket
*sock
, size_t len
)
185 const size_t canWrite
= std::min(GetRemLength(), len
);
186 wxASSERT(len
== canWrite
);
188 sock
->SocketRead(m_wr_ptr
, canWrite
);
189 m_wr_ptr
+= sock
->GetLastCount();
193 size_t CQueuedData::ReadFromSocketAll(CECSocket
*sock
, size_t len
)
195 size_t read_rem
= std::min(GetRemLength(), len
);
196 wxASSERT(read_rem
== len
);
198 // We get here when socket is truly blocking
200 // Give socket a 10 sec chance to recv more data.
201 if ( !sock
->WaitSocketRead(10, 0) ) {
205 wxASSERT(m_wr_ptr
+ read_rem
<= &m_data
[0] + m_data
.size());
206 sock
->SocketRead(m_wr_ptr
, read_rem
);
207 m_wr_ptr
+= sock
->GetLastCount();
208 read_rem
-= sock
->GetLastCount();
210 if (sock
->SocketError() && !sock
->WouldBlock()) {
215 return len
- read_rem
;
219 size_t CQueuedData::GetLength() const
221 return m_data
.size();
225 size_t CQueuedData::GetDataLength() const
227 const size_t len
= m_wr_ptr
- &m_data
[0];
228 wxCHECK_MSG(len
<= m_data
.size(), m_data
.size(),
229 wxT("Write-pointer past end of buffer"));
235 size_t CQueuedData::GetRemLength() const
237 return m_data
.size() - GetDataLength();
241 size_t CQueuedData::GetUnreadDataLength() const
243 wxCHECK_MSG(m_wr_ptr
>= m_rd_ptr
, 0,
244 wxT("Read position past write position."));
246 return m_wr_ptr
- m_rd_ptr
;
252 // CECSocket API - User interface functions
255 CECSocket::CECSocket(bool use_events
)
257 m_use_events(use_events
),
259 m_in_ptr(EC_SOCKET_BUFFER_SIZE
),
260 m_out_ptr(EC_SOCKET_BUFFER_SIZE
),
261 m_curr_rx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE
)),
262 m_curr_tx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE
)),
265 m_my_flags(0x20 | EC_FLAG_ZLIB
| EC_FLAG_UTF8_NUMBERS
| EC_FLAG_ACCEPTS
),
266 // setup initial state: 4 flags + 4 length
273 bool CECSocket::HaveNotificationSupport()
275 return (m_rx_flags
& EC_FLAG_NOTIFY
) != 0;
278 CECSocket::~CECSocket()
280 while (!m_output_queue
.empty()) {
281 CQueuedData
*data
= m_output_queue
.front();
282 m_output_queue
.pop_front();
287 bool CECSocket::ConnectSocket(uint32_t ip
, uint16_t port
)
290 #if wxCHECK_VERSION(2, 8, 8)
291 res
= InternalConnect(ip
, port
, !m_use_events
);
293 res
= InternalConnect(ip
, port
, false);
294 if ( !m_use_events
) {
295 res
= WaitSocketConnect(10, 0) && InternalIsConnected();
303 return !SocketError() && res
;
306 void CECSocket::SendPacket(const CECPacket
*packet
)
308 packet
->DebugPrint(false);
313 const CECPacket
*CECSocket::SendRecvPacket(const CECPacket
*packet
)
316 m_curr_rx_data
->ReadFromSocketAll(this, 2 * sizeof(uint32_t));
317 if (SocketError() && !WouldBlock()) {
322 m_curr_rx_data
->Read(&m_rx_flags
, sizeof(m_rx_flags
));
323 m_rx_flags
= ENDIAN_NTOHL(m_rx_flags
);
324 m_curr_rx_data
->Read(&m_curr_packet_len
, sizeof(m_curr_packet_len
));
325 m_curr_packet_len
= ENDIAN_NTOHL(m_curr_packet_len
);
327 if ( m_curr_rx_data
->GetLength() < (m_curr_packet_len
+2*sizeof(uint32_t)) ) {
328 m_curr_rx_data
.reset(new CQueuedData(m_curr_packet_len
));
330 m_curr_rx_data
->ReadFromSocketAll(this, m_curr_packet_len
);
331 if (SocketError() && !WouldBlock()) {
335 const CECPacket
*reply
= ReadPacket();
336 m_curr_rx_data
->Rewind();
340 std::string
CECSocket::GetLastErrorMsg()
342 int code
= InternalGetLastError();
344 case EC_ERROR_NOERROR
:
345 return "No error happened";
347 return "Invalid operation";
349 return "Input/Output error";
350 case EC_ERROR_INVADDR
:
351 return "Invalid address passed to wxSocket";
352 case EC_ERROR_INVSOCK
:
353 return "Invalid socket (uninitialized)";
354 case EC_ERROR_NOHOST
:
355 return "No corresponding host";
356 case EC_ERROR_INVPORT
:
357 return "Invalid port";
358 case EC_ERROR_WOULDBLOCK
:
359 return "The socket is non-blocking and the operation would block";
360 case EC_ERROR_TIMEDOUT
:
361 return "The timeout for this operation expired";
362 case EC_ERROR_MEMERR
:
363 return "Memory exhausted";
365 ostringstream error_string
;
366 error_string
<< "Error code " << code
<< " unknown.";
367 return error_string
.str();
370 void CECSocket::OnError()
373 cout
<< GetLastErrorMsg() << endl
;
377 void CECSocket::OnLost()
384 void CECSocket::OnConnect()
388 void CECSocket::OnInput()
392 if (m_curr_rx_data
.get()) {
393 m_curr_rx_data
->ReadFromSocket(this, m_bytes_needed
);
397 if (SocketError() && !WouldBlock()) {
399 // socket already disconnected in this point
400 m_curr_rx_data
.reset(0);
403 bytes_rx
= GetLastCount();
404 m_bytes_needed
-= bytes_rx
;
405 } while (m_bytes_needed
&& bytes_rx
);
407 if (!m_bytes_needed
) {
410 m_curr_rx_data
->Read(&m_rx_flags
, sizeof(m_rx_flags
));
411 m_rx_flags
= ENDIAN_NTOHL(m_rx_flags
);
412 if (m_rx_flags
& EC_FLAG_ACCEPTS
) {
413 // Client sends its capabilities, update the internal mask.
414 m_curr_rx_data
->Read(&m_my_flags
, sizeof(m_my_flags
));
415 m_my_flags
= ENDIAN_NTOHL(m_my_flags
);
416 //printf("Reading accepts mask: %x\n", m_my_flags);
417 wxASSERT(m_my_flags
& 0x20);
418 // There has to be 4 more bytes. THERE HAS TO BE, DAMN IT.
419 m_curr_rx_data
->ReadFromSocketAll(this, sizeof(m_curr_packet_len
));
421 m_curr_rx_data
->Read(&m_curr_packet_len
, sizeof(m_curr_packet_len
));
422 m_curr_packet_len
= ENDIAN_NTOHL(m_curr_packet_len
);
423 m_bytes_needed
= m_curr_packet_len
;
424 // packet bigger that 16Mb looks more like broken request
425 if (m_bytes_needed
> 16*1024*1024) {
429 size_t needed_size
= m_bytes_needed
+ ((m_rx_flags
& EC_FLAG_ACCEPTS
) ? 12 : 8);
430 if (!m_curr_rx_data
.get() ||
431 m_curr_rx_data
->GetLength() < needed_size
) {
432 m_curr_rx_data
.reset(new CQueuedData(needed_size
));
434 //#warning Kry TODO: Read packet?
436 //m_curr_rx_data->DumpMem();
437 std::auto_ptr
<const CECPacket
> packet(ReadPacket());
438 m_curr_rx_data
->Rewind();
440 std::auto_ptr
<const CECPacket
> reply(OnPacketReceived(packet
.get()));
442 SendPacket(reply
.get());
451 void CECSocket::OnOutput()
453 while (!m_output_queue
.empty()) {
454 CQueuedData
* data
= m_output_queue
.front();
455 data
->WriteToSocket(this);
456 if (!data
->GetUnreadDataLength()) {
457 m_output_queue
.pop_front();
462 if ( m_use_events
) {
465 if ( !WaitSocketWrite(10, 0) ) {
481 // All outstanding data sent to socket
483 WriteDoneAndQueueEmpty();
486 bool CECSocket::DataPending()
488 return !m_output_queue
.empty();
495 size_t CECSocket::ReadBufferFromSocket(void *buffer
, size_t required_len
)
497 wxASSERT(required_len
);
499 if (m_curr_rx_data
->GetUnreadDataLength() < required_len
) {
500 // need more data that we have. Looks like nothing will help here
503 m_curr_rx_data
->Read(buffer
, required_len
);
507 void CECSocket::WriteBufferToSocket(const void *buffer
, size_t len
)
509 unsigned char *wr_ptr
= (unsigned char *)buffer
;
511 size_t curr_free
= m_curr_tx_data
->GetRemLength();
512 if ( len
> curr_free
) {
514 m_curr_tx_data
->Write(wr_ptr
, curr_free
);
517 m_output_queue
.push_back(m_curr_tx_data
.release());
518 m_curr_tx_data
.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE
));
520 m_curr_tx_data
->Write(wr_ptr
, len
);
528 // ZLib "error handler"
531 void ShowZError(int zerror
, z_streamp strm
)
533 const char *p
= NULL
;
536 case Z_STREAM_END
: p
= "Z_STREAM_END"; break;
537 case Z_NEED_DICT
: p
= "Z_NEED_DICT"; break;
538 case Z_ERRNO
: p
= "Z_ERRNO"; break;
539 case Z_STREAM_ERROR
: p
= "Z_STREAM_ERROR"; break;
540 case Z_DATA_ERROR
: p
= "Z_DATA_ERROR"; break;
541 case Z_MEM_ERROR
: p
= "Z_MEM_ERROR"; break;
542 case Z_BUF_ERROR
: p
= "Z_BUF_ERROR"; break;
543 case Z_VERSION_ERROR
: p
= "Z_VERSION_ERROR"; break;
545 printf("ZLib operation returned %s\n", p
);
546 printf("ZLib error message: %s\n", strm
->msg
);
547 printf("zstream state:\n\tnext_in=%p\n\tavail_in=%u\n\ttotal_in=%lu\n\tnext_out=%p\n\tavail_out=%u\n\ttotal_out=%lu\n",
548 strm
->next_in
, strm
->avail_in
, strm
->total_in
, strm
->next_out
, strm
->avail_out
, strm
->total_out
);
552 bool CECSocket::ReadNumber(void *buffer
, size_t len
)
554 if (m_rx_flags
& EC_FLAG_UTF8_NUMBERS
) {
557 if (!ReadBuffer(mb
, 1)) return false;
558 int remains
= utf8_mb_remain(mb
[0]);
559 if (remains
) if (!ReadBuffer(&(mb
[1]), remains
)) return false;
560 if (utf8_mbtowc(&wc
, mb
, 6) == -1) return false; // Invalid UTF-8 code sequence
562 case 1: PokeUInt8( buffer
, wc
); break;
563 case 2: RawPokeUInt16( buffer
, wc
); break;
564 case 4: RawPokeUInt32( buffer
, wc
); break;
567 if ( !ReadBuffer(buffer
, len
) ) {
572 RawPokeUInt16( buffer
, ENDIAN_NTOHS( RawPeekUInt16( buffer
) ) );
575 RawPokeUInt32( buffer
, ENDIAN_NTOHL( RawPeekUInt32( buffer
) ) );
582 bool CECSocket::WriteNumber(const void *buffer
, size_t len
)
584 if (m_tx_flags
& EC_FLAG_UTF8_NUMBERS
) {
589 case 1: wc
= PeekUInt8( buffer
); break;
590 case 2: wc
= RawPeekUInt16( buffer
); break;
591 case 4: wc
= RawPeekUInt32( buffer
); break;
592 default: return false;
594 if ((mb_len
= utf8_wctomb(mb
, wc
, 6)) == -1) return false; // Something is terribly wrong...
595 return WriteBuffer(mb
, mb_len
);
600 case 1: PokeUInt8( tmp
, PeekUInt8( buffer
) ); break;
601 case 2: RawPokeUInt16( tmp
, ENDIAN_NTOHS( RawPeekUInt16( buffer
) ) ); break;
602 case 4: RawPokeUInt32( tmp
, ENDIAN_NTOHL( RawPeekUInt32( buffer
) ) ); break;
604 return WriteBuffer(tmp
, len
);
608 bool CECSocket::ReadBuffer(void *buffer
, size_t len
)
610 if (m_rx_flags
& EC_FLAG_ZLIB
) {
611 if ( !m_z
.avail_in
) {
612 // no reason for this situation: all packet should be
616 m_z
.avail_out
= (uInt
)len
;
617 m_z
.next_out
= (Bytef
*)buffer
;
618 int zerror
= inflate(&m_z
, Z_SYNC_FLUSH
);
619 if ((zerror
!= Z_OK
) && (zerror
!= Z_STREAM_END
)) {
620 ShowZError(zerror
, &m_z
);
625 // using uncompressed buffered i/o
626 return ReadBufferFromSocket(buffer
, len
) == len
;
630 bool CECSocket::WriteBuffer(const void *buffer
, size_t len
)
632 if (m_tx_flags
& EC_FLAG_ZLIB
) {
634 unsigned char *rd_ptr
= (unsigned char *)buffer
;
636 unsigned int remain_in
= EC_SOCKET_BUFFER_SIZE
- m_z
.avail_in
;
637 if ( remain_in
>= len
) {
638 memcpy(m_z
.next_in
+m_z
.avail_in
, rd_ptr
, len
);
639 m_z
.avail_in
+= (uInt
)len
;
642 memcpy(m_z
.next_in
+m_z
.avail_in
, rd_ptr
, remain_in
);
643 m_z
.avail_in
+= remain_in
;
646 // buffer is full, calling zlib
648 m_z
.next_out
= &m_out_ptr
[0];
649 m_z
.avail_out
= EC_SOCKET_BUFFER_SIZE
;
650 int zerror
= deflate(&m_z
, Z_NO_FLUSH
);
651 if ( zerror
!= Z_OK
) {
652 ShowZError(zerror
, &m_z
);
655 WriteBufferToSocket(&m_out_ptr
[0],
656 EC_SOCKET_BUFFER_SIZE
- m_z
.avail_out
);
657 } while ( m_z
.avail_out
== 0 );
658 // all input should be used by now
659 wxASSERT(m_z
.avail_in
== 0);
660 m_z
.next_in
= &m_in_ptr
[0];
665 // using uncompressed buffered i/o
666 WriteBufferToSocket(buffer
, len
);
671 bool CECSocket::FlushBuffers()
673 if (m_tx_flags
& EC_FLAG_ZLIB
) {
675 m_z
.next_out
= &m_out_ptr
[0];
676 m_z
.avail_out
= EC_SOCKET_BUFFER_SIZE
;
677 int zerror
= deflate(&m_z
, Z_FINISH
);
678 if ( zerror
== Z_STREAM_ERROR
) {
679 ShowZError(zerror
, &m_z
);
682 WriteBufferToSocket(&m_out_ptr
[0],
683 EC_SOCKET_BUFFER_SIZE
- m_z
.avail_out
);
684 } while ( m_z
.avail_out
== 0 );
686 if ( m_curr_tx_data
->GetDataLength() ) {
687 m_output_queue
.push_back(m_curr_tx_data
.release());
688 m_curr_tx_data
.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE
));
697 void CECSocket::WritePacket(const CECPacket
*packet
)
699 if (SocketError() && !WouldBlock()) {
704 uint32_t flags
= 0x20;
706 if ( packet
->GetPacketLength() > EC_MAX_UNCOMPRESSED
) {
707 flags
|= EC_FLAG_ZLIB
;
709 flags
|= EC_FLAG_UTF8_NUMBERS
;
715 if (flags
& EC_FLAG_ZLIB
) {
720 m_z
.next_in
= &m_in_ptr
[0];
721 int zerror
= deflateInit(&m_z
, EC_COMPRESSION_LEVEL
);
722 if (zerror
!= Z_OK
) {
723 // don't use zlib if init failed
724 flags
&= ~EC_FLAG_ZLIB
;
725 ShowZError(zerror
, &m_z
);
729 uint32_t tmp_flags
= ENDIAN_HTONL(flags
/* | EC_FLAG_ACCEPTS*/);
730 WriteBufferToSocket(&tmp_flags
, sizeof(uint32
));
732 /* uint32_t tmp_accepts_flags = ENDIAN_HTONL(m_my_flags);
733 WriteBufferToSocket(&tmp_accepts_flags, sizeof(uint32));*/
735 // preallocate 4 bytes in buffer for packet length
736 uint32_t packet_len
= 0;
737 WriteBufferToSocket(&packet_len
, sizeof(uint32
));
739 packet
->WritePacket(*this);
743 // now calculate actual size of data
744 wxASSERT(m_curr_tx_data
->GetDataLength() < 0xFFFFFFFF);
745 packet_len
= (uint32_t)m_curr_tx_data
->GetDataLength();
746 for(std::deque
<CQueuedData
*>::iterator i
= m_output_queue
.begin(); i
!= m_output_queue
.end(); i
++) {
747 wxASSERT(( packet_len
+ m_curr_tx_data
->GetDataLength()) < 0xFFFFFFFF);
748 packet_len
+= (uint32_t)(*i
)->GetDataLength();
750 // 4 flags and 4 length are not counted
752 // now write actual length @ offset 4
753 packet_len
= ENDIAN_HTONL(packet_len
);
755 CQueuedData
*first_buff
= m_output_queue
.front();
756 if ( !first_buff
) first_buff
= m_curr_tx_data
.get();
757 first_buff
->WriteAt(&packet_len
, sizeof(uint32_t), sizeof(uint32_t));
759 if (flags
& EC_FLAG_ZLIB
) {
760 int zerror
= deflateEnd(&m_z
);
761 if ( zerror
!= Z_OK
) {
762 ShowZError(zerror
, &m_z
);
769 const CECPacket
*CECSocket::ReadPacket()
771 CECPacket
*packet
= 0;
773 uint32_t flags
= m_rx_flags
;
775 if ( ((flags
& 0x60) != 0x20) || (flags
& EC_FLAG_UNKNOWN_MASK
) ) {
776 // Protocol error - other end might use an older protocol
777 cout
<< "ReadPacket: packet have invalid flags " << flags
<< endl
;
782 if (flags
& EC_FLAG_ZLIB
) {
790 int zerror
= inflateInit(&m_z
);
791 if (zerror
!= Z_OK
) {
792 ShowZError(zerror
, &m_z
);
793 cout
<< "ReadPacket: failed zlib init" << endl
;
799 m_curr_rx_data
->ToZlib(m_z
);
800 packet
= new CECPacket(*this);
801 packet
->ReadFromSocket(*this);
803 if (packet
->HasError()) {
804 cout
<< "ReadPacket: error in packet read" << endl
;
810 if (flags
& EC_FLAG_ZLIB
) {
811 int zerror
= inflateEnd(&m_z
);
812 if ( zerror
!= Z_OK
) {
813 ShowZError(zerror
, &m_z
);
814 cout
<< "ReadPacket: failed zlib free" << endl
;
822 const CECPacket
*CECSocket::OnPacketReceived(const CECPacket
*)
826 // File_checked_for_headers