1 /*********************************************************************
2 Copyright 2013 Karl Jones
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice, this
9 list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright notice,
11 this list of conditions and the following disclaimer in the documentation
12 and/or other materials provided with the distribution.
14 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 For Further Information Please Contact me at
27 http://p.sf.net/kdis/UserGuide
28 *********************************************************************/
32 #include "./Connection.h"
34 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 ) // Windows Headers //
38 #define ERROR_CODE WSAGetLastError()
40 #else // Linux Headers //
45 #define INVALID_SOCKET -1
46 #define SOCKET_ERROR -1
47 #define ERROR_CODE errno
51 #define THROW_ERROR throw KException( getErrorText( ERROR_CODE ), CONNECTION_SOCKET_ERROR )
53 #define RECEIVE_SOCK 1
57 using namespace UTILS
;
58 using namespace NETWORK
;
61 //////////////////////////////////////////////////////////////////////////
63 //////////////////////////////////////////////////////////////////////////
65 void Connection::startup()
67 // Create the sockets if they do not already exist.
68 if( !m_iSocket
[SEND_SOCK
] && !m_iSocket
[RECEIVE_SOCK
] )
71 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
73 static KINT32 iWinSockInit
= WSAStartup( 0x0202, &w
); // Init with winsock version 2.2
74 if( iWinSockInit
!= NO_ERROR
)
80 // Create both sockets
81 for( KINT8 i
= 0; i
< 2; ++i
)
83 m_iSocket
[i
] = socket( AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
); // Create a IPv4 UDP socket.
85 if( m_iSocket
[i
] == INVALID_SOCKET
)
91 // Bind socket for sending and/or receiving data.
96 //////////////////////////////////////////////////////////////////////////
98 void Connection::bindSocket()
100 // Set the receive socket to be reusable. Useful if your server has
101 // been shut down, and then restarted right away.
103 KINT32 iRet
= setsockopt( m_iSocket
[RECEIVE_SOCK
], SOL_SOCKET
, SO_REUSEADDR
, ( const char * )&yes
, sizeof( yes
) );
104 if ( iRet
== SOCKET_ERROR
)
109 // Bind the *sending* socket to the specified local interface
112 socklen_t addrlen
= sizeof(m_InterfaceAddr
);
113 iRet
= bind( m_iSocket
[SEND_SOCK
], ( sockaddr
* )&m_InterfaceAddr
, addrlen
);
114 if ( iRet
== SOCKET_ERROR
)
120 // Construct bind structure for receive socket, using IPADDR_ANY for Linux compatibility
122 memset( &Address
, 0, sizeof( Address
) );
123 Address
.sin_port
= htons( m_uiPort
); // Set listening port
124 Address
.sin_family
= AF_INET
; // IPv4 address family
125 Address
.sin_addr
.s_addr
= INADDR_ANY
; // Any interface
127 // Bind the *receiving* socket to the chosen port (on all local interfaces)
130 iRet
= bind( m_iSocket
[RECEIVE_SOCK
], ( sockaddr
* )&Address
, sizeof( Address
) );
131 if( iRet
== SOCKET_ERROR
)
138 //////////////////////////////////////////////////////////////////////////
140 void Connection::shutdown()
142 for( KINT8 i
= 0; i
< 2; ++i
)
146 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
147 closesocket( m_iSocket
[i
] );
149 close( m_iSocket
[i
] );
157 //////////////////////////////////////////////////////////////////////////
159 const KCHAR8
* Connection::getErrorText( KINT32 ErrorCode
) const
163 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
164 case /* 10004 */ WSAEINTR
:
165 return "WSAEINTR: Invalid interrupt number";
166 case /* 10009 */ WSAEBADF
:
167 return "WSAEBADF: Invalid file number";
168 case /* 10013 */ WSAEACCES
:
170 case /* 10014 */ WSAEFAULT
:
171 return "WSAEFAULT - The buf parameter is not completely contained in a valid part of the user address space.";
172 case /* 10022 */ WSAEINVAL
:
173 return "WSAEINVAL, Invalid argument";
174 case /* 10024 */ WSAEMFILE
:
175 return "WSAEMFILE, Is a File";
176 case /* 10035 */ WSAEWOULDBLOCK
:
177 return "WSAEWOULDBLOCK: Function would block";
178 case /* 10036 */ WSAEINPROGRESS
:
179 return "WSAEINPROGRESS: Winsock blocking function in progress";
180 case /* 10037 */ WSAEALREADY
:
181 return "WSAEALREADY";
182 case /* 10038 */ WSAENOTSOCK
:
183 return "WSAENOTSOCK";
184 case /* 10039 */ WSAEDESTADDRREQ
:
185 return "WSAEDESTADDRREQ";
186 case /* 10040 */ WSAEMSGSIZE
:
187 return "WSAEMSGSIZE";
188 case /* 10041 */ WSAEPROTOTYPE
:
189 return "WSAEPROTOTYPE";
190 case /* 10042 */ WSAENOPROTOOPT
:
191 return "WSAENOPROTOOPT";
192 case /* 10043 */ WSAEPROTONOSUPPORT
:
193 return "WSAEPROTONOSUPPORT: Protocol not supported.";
194 case /* 10044 */ WSAESOCKTNOSUPPORT
:
195 return "WSAESOCKTNOSUPPORT: Socket type not supported.";
196 case /* 10045 */ WSAEOPNOTSUPP
:
197 return "Operation not supported.";
198 case /* 10046 */ WSAEPFNOSUPPORT
:
199 return "WSAEPFNOSUPPORT: Protocol family not supported.";
200 case /* 10047 */ WSAEAFNOSUPPORT
:
201 return "WSAEAFNOSUPPORT: Address family not supported by protocol family.";
202 case /* 10048 */ WSAEADDRINUSE
:
203 return "WSAEADDRINUSE: Address is already in use";
204 case /* 10049 */ WSAEADDRNOTAVAIL
:
205 return "WSAEADDRNOTAVAIL: Address is not available";
206 case /* 10050 */ WSAENETDOWN
:
207 return "WSAENETDOWN: Network is down";
208 case /* 10051 */ WSAENETUNREACH
:
209 return "WSAENETUNREACH: Network is unreachable";
210 case /* 10052 */ WSAENETRESET
:
211 return "WSAENETRESET: Network has been reset";
212 case /* 10053 */ WSAECONNABORTED
:
213 return "WSAECONNABORTED: Connection aborted";
214 case /* 10054 */ WSAECONNRESET
:
215 return "WSAECONNRESET: Connection has been reset";
216 case /* 10056 */ WSAEISCONN
:
217 return "WSAEISCONN: Socket Is Already Connected. ";
218 case /* 10057 */ WSAENOTCONN
:
219 return "WSAENOTCONN: Socket not connected";
220 case /* 10058 */ WSAESHUTDOWN
:
221 return "WSAESHUTDOWN: Socket has been shutdown";
222 case /* 10059 */ WSAETOOMANYREFS
:
223 return "WSAETOOMANYREFS: Too many references";
224 case /* 10060 */ WSAETIMEDOUT
:
225 return "WSAETIMEDOUT: Operation timed out";
226 case /* 10061 */ WSAECONNREFUSED
:
227 return "WSAECONNREFUSED: Connection refused";
228 case /* 10062 */ WSAELOOP
:
230 case /* 10063 */ WSAENAMETOOLONG
:
231 return "WSAENAMETOOLONG: Name too long";
232 case /* 10064 */ WSAEHOSTDOWN
:
233 return "WSAEHOSTDOWN: Host is down";
234 case /* 10065 */ WSAEHOSTUNREACH
:
235 return "WSAEHOSTUNREACH: Host is unreachable";
236 case /* 10066 */ WSAENOTEMPTY
:
237 return "WSAENOTEMPTY";
238 case /* 10067 */ WSAEPROCLIM
:
239 return "WSAEPROCLIM";
240 case /* 10068 */ WSAEUSERS
:
242 case /* 10069 */ WSAEDQUOT
:
244 case /* 10070 */ WSAESTALE
:
246 case /* 10071 */ WSAEREMOTE
:
248 case /* 10091 */ WSASYSNOTREADY
:
249 return "WSASYSNOTREADY: System not ready"; // WSAStartup
250 case /* 10092 */ WSAVERNOTSUPPORTED
:
251 return "WSAVERNOTSUPPORTED: Version not supported"; // WSAStartup
252 case /* 10093 */ WSANOTINITIALISED
:
253 return "WSANOTINITIALISED: Winsock not initialised"; // WSAStartup not called
254 case /* 10101 */ WSAEDISCON
:
255 return "WSAEDISCON: Remote party has disconnected";
256 case /* 11001 */ WSAHOST_NOT_FOUND
:
257 return "Host not found";
258 case /* 11002 */ WSATRY_AGAIN
:
259 return "WSATRY_AGAIN: Try again";
260 case /* 11003 */ WSANO_RECOVERY
:
261 return "WSANO_RECOVERY: No recovery";
262 case /* 11004 */ WSANO_DATA
:
263 return "WSANO_DATA: No data";
265 case /* 00001 */ EPERM
:
266 return "EPERM: Operation not permitted";
267 case /* 00002 */ ENOENT
:
268 return "ENOENT: No such file or directory";
269 case /* 00003 */ ESRCH
:
270 return "ESRCH: No such process";
271 case /* 00004 */ EINTR
:
272 return "EINTR: Interrupted system call";
273 case /* 00005 */ EIO
:
274 return "EIO: I/O error";
275 case /* 00006 */ ENXIO
:
276 return "ENXIO: No such device or address";
277 case /* 00007 */ E2BIG
:
278 return "E2BIG: Argument list too long";
279 case /* 00008 */ ENOEXEC
:
280 return "ENOEXEC: Exec format error";
281 case /* 00009 */ EBADF
:
282 return "EBADF: Bad file number";
283 case /* 00010 */ ECHILD
:
284 return "ECHILD: No child processes";
285 case /* 00011 */ EAGAIN
:
286 return "EAGAIN: Try again";
287 case /* 00012 */ ENOMEM
:
288 return "ENOMEM: Out of memory";
289 case /* 00013 */ EACCES
:
290 return "EACCES: Permission denied";
291 case /* 00014 */ EFAULT
:
292 return "EFAULT: Bad address";
293 case /* 00015 */ ENOTBLK
:
294 return "ENOTBLK: Block device required";
295 case /* 00016 */ EBUSY
:
296 return "EBUSY: Device or resource busy";
297 case /* 00017 */ EEXIST
:
298 return "EEXIST: File exists";
299 case /* 00018 */ EXDEV
:
300 return "EXDEV: Cross-device link";
301 case /* 00019 */ ENODEV
:
302 return "ENODEV: No such device";
303 case /* 00020 */ ENOTDIR
:
304 return "ENOTDIR: Not a directory";
305 case /* 00021 */ EISDIR
:
306 return "EISDIR: Is a directory";
307 case /* 00022 */ EINVAL
:
308 return "Invalid argument";
309 case /* 00023 */ ENFILE
:
310 return "ENFILE: File table overflow";
311 case /* 00024 */ EMFILE
:
312 return "EMFILE: Too many open files";
313 case /* 00025 */ ENOTTY
:
314 return "ENOTTY: Not a typewriter";
315 case /* 00026 */ ETXTBSY
:
316 return "ETXTBSY: Text file busy";
317 case /* 00027 */ EFBIG
:
318 return "EFBIG: File too large";
319 case /* 00028 */ ENOSPC
:
320 return "ENOSPC: No space left on device";
321 case /* 00029 */ ESPIPE
:
322 return "ESPIPE: Illegal seek";
323 case /* 00030 */ EROFS
:
324 return "EROFS: Read-only file system";
325 case /* 00031 */ EMLINK
:
326 return "EMLINK: Too many links";
327 case /* 00032 */ EPIPE
:
328 return "EPIPE: Broken pipe";
329 case /* 00033 */ EDOM
:
330 return "EDOM: Math argument out of domain of func";
331 case /* 00034 */ ERANGE
:
332 return "ERANGE: Math result not representable";
336 return "Unknown Socket Error";
340 //////////////////////////////////////////////////////////////////////////
342 //////////////////////////////////////////////////////////////////////////
344 Connection::Connection( const KString
& SendAddress
, KUINT32 Port
/* = 3000 */, KBOOL SendAddressIsMulticast
/* = false */,
345 KBOOL Blocking
/* = true */, PDU_Factory
* Custom
/* = 0 */, KBOOL SendOnly
/* = false*/,
346 KBOOL ReceiveOnly
/* = false*/, const KString
& InterfaceAddress
/* = "" */) :
348 m_bBlockingSocket( Blocking
),
349 m_bSendOnly( SendOnly
),
350 m_bReceiveOnly( ReceiveOnly
)
352 m_iSocket
[SEND_SOCK
] = 0;
353 m_iSocket
[RECEIVE_SOCK
] = 0;
355 m_blockingTimeout
.tv_sec
= 0;
356 m_blockingTimeout
.tv_usec
= 0;
358 SetInterfaceAddress( InterfaceAddress
); //specify which network interface to use for DIS
362 SetSendAddress( SendAddress
, SendAddressIsMulticast
);
364 SetBlockingModeEnabled( Blocking
);
366 // Should we use the standard PDU_Factory or have we been supplied we a customised one?
373 m_pPduFact
= new PDU_Factory
;
377 //////////////////////////////////////////////////////////////////////////
379 Connection::~Connection()
385 //////////////////////////////////////////////////////////////////////////
387 Connection::Connection( const Connection
& other
)
389 // Disallow copy constructor.
390 throw KException(__FUNCTION__
, INVALID_OPERATION
);
393 //////////////////////////////////////////////////////////////////////////
395 Connection
& Connection::operator=( const Connection
& other
)
399 // Disallow copy assignment.
400 throw KException(__FUNCTION__
, INVALID_OPERATION
);
405 //////////////////////////////////////////////////////////////////////////
407 // Set the local interface to use for sending DIS packets,
408 // otherwise the kernel chooses first interface with path to destination,
409 // which is ambiguous in the case of multicast and some broadcast endpoints.
410 void Connection::SetInterfaceAddress( const KString
& A
)
411 // Connection defaults to send on first interface with route to destination if interface not specified (i.e. A = "")
413 socklen_t addrlen
= sizeof( m_InterfaceAddr
);
415 m_sInterfaceAddress
= A
;
416 memset( &m_InterfaceAddr
, 0, addrlen
);
417 m_InterfaceAddr
.sin_family
= AF_INET
;
418 m_InterfaceAddr
.sin_addr
.s_addr
= ( A
.empty() ? INADDR_ANY
: inet_addr( m_sInterfaceAddress
.c_str() ) );
421 const KString
& Connection::GetInterfaceAddress() const
423 return m_sInterfaceAddress
;
426 //////////////////////////////////////////////////////////////////////////
428 // Note: We must bind() the sending socket to specific interface (m_InterfaceAddr) (if desired)
429 // before calling this or else connect() will automatically bind() the socket to an interface
430 // of the kernel's choosing. Thus, SetInterfaceAddress() and startup() (which in turn calls
431 // bindSocket(), where bind() is called on the sending socket,)) should be called before this.
432 void Connection::SetSendAddress( const KString
& A
, KBOOL Multicast
/*= false */ )
436 // Create the send to address structure
437 memset( &m_SendToAddr
, 0, sizeof( m_SendToAddr
) );
438 m_SendToAddr
.sin_family
= AF_INET
;
439 m_SendToAddr
.sin_addr
.s_addr
= inet_addr( m_sSendAddress
.c_str() );
440 m_SendToAddr
.sin_port
= htons( m_uiPort
);
444 AddMulticastAddress( A
);
446 // TODO: Do we need to disable broadcasting on the socket if we switch to multicast?
450 // Enable broadcasting on the send socket
452 KINT32 iRet
= setsockopt( m_iSocket
[SEND_SOCK
], SOL_SOCKET
, SO_BROADCAST
, ( const char * )&yes
, sizeof( yes
) );
453 if( iRet
== SOCKET_ERROR
)
459 // Attempt to 'connect()' to remote endpoint (prompts kernel to pick a local interface if not
460 // specified). If not specified in m_InterfaceAddr, kernel will choose interface as follows:
461 // If m_SendToAddr is empty, the connect() will fail, and loopback (127.0.0.1) will be chosen.
462 // If m_SendToAddr is already set, the first interface with a route to m_SendToAddr is chosen.
463 ( void )connect( m_iSocket
[SEND_SOCK
], ( sockaddr
* )&m_SendToAddr
, sizeof( m_SendToAddr
) ); //ignore return value - OK to fail
465 // Update the socket's (user-specified or kernel-chosen) interface address info (m_InterfaceAddr)
466 socklen_t addrlen
= sizeof(m_InterfaceAddr
);
467 ( void )getsockname( m_iSocket
[SEND_SOCK
], ( sockaddr
* )&m_InterfaceAddr
, &addrlen
);
469 // Update string representation of interface address (m_sInterfaceAddress) too
470 char buf
[INET_ADDRSTRLEN
];
471 const char* p_str
= inet_ntop( AF_INET
, &m_InterfaceAddr
.sin_addr
, buf
, INET_ADDRSTRLEN
); //returns null-terminated string or NULL
472 if ( p_str
!= 0x0 ) //if inet_ntop() returns an endpoint address, it was successful
474 m_sInterfaceAddress
= p_str
;
478 //////////////////////////////////////////////////////////////////////////
480 const KString
& Connection::GetSendAddress() const
482 return m_sSendAddress
;
485 //////////////////////////////////////////////////////////////////////////
487 void Connection::AddMulticastAddress( const KString
& A
)
489 // Attempt to join the group
491 mc
.imr_multiaddr
.s_addr
= inet_addr( m_sSendAddress
.c_str() );
492 mc
.imr_interface
= m_InterfaceAddr
.sin_addr
;
493 KINT32 iRet
= setsockopt( m_iSocket
[RECEIVE_SOCK
], IPPROTO_IP
, IP_ADD_MEMBERSHIP
, ( KOCTET
* )&mc
, sizeof( mc
) );
494 if( iRet
== SOCKET_ERROR
)
500 //////////////////////////////////////////////////////////////////////////
502 void Connection::RemoveMulticastAddress( const KString
& A
)
504 // Attempt to drop the address.
506 mc
.imr_multiaddr
.s_addr
= inet_addr( m_sSendAddress
.c_str() );
507 mc
.imr_interface
= m_InterfaceAddr
.sin_addr
;
508 KINT32 iRet
= setsockopt( m_iSocket
[RECEIVE_SOCK
], IPPROTO_IP
, IP_DROP_MEMBERSHIP
, ( KOCTET
* )&mc
, sizeof( mc
) );
509 if( iRet
== SOCKET_ERROR
)
515 //////////////////////////////////////////////////////////////////////////
517 void Connection::SetBlockingModeEnabled( KBOOL E
)
519 m_bBlockingSocket
= E
;
521 for( KINT8 i
= 0; i
< 2; ++i
)
525 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
527 // Windows non blocking //
528 unsigned long int uliIoctBlock
= !m_bBlockingSocket
; // 1 enable, 0 disable.
529 iResult
= ioctlsocket( m_iSocket
[i
], FIONBIO
, &uliIoctBlock
); // FIONBIO = blocking mode
533 // Linux non blocking //
534 KINT32 uliIoctBlock
= !m_bBlockingSocket
; // 1 enable, 0 disable.
535 iResult
= fcntl( m_iSocket
[i
], F_SETFL
, O_NONBLOCK
| FASYNC
, &uliIoctBlock
);
539 if( iResult
== SOCKET_ERROR
)
546 //////////////////////////////////////////////////////////////////////////
548 KBOOL
Connection::IsBlockingModeEnabled() const
550 return m_bBlockingSocket
;
553 //////////////////////////////////////////////////////////////////////////
555 void Connection::SetBlockingTimeOut( KINT32 sec
, KINT32 usec
)
557 m_blockingTimeout
.tv_sec
= sec
;
558 m_blockingTimeout
.tv_usec
= usec
;
561 //////////////////////////////////////////////////////////////////////////
563 void Connection::AddSubscriber( ConnectionSubscriber
* S
)
567 m_vpSubscribers
.push_back( S
);
571 //////////////////////////////////////////////////////////////////////////
573 void Connection::RemoveSubscriber( ConnectionSubscriber
* S
)
577 // Perform a linear search for the subscriber.
578 vector
<ConnectionSubscriber
*>::iterator itr
= m_vpSubscribers
.begin();
579 vector
<ConnectionSubscriber
*>::iterator itrEnd
= m_vpSubscribers
.end();
580 while( itr
!= itrEnd
)
584 // Remove the subsriber
585 itr
= m_vpSubscribers
.erase( itr
);
586 itrEnd
= m_vpSubscribers
.end();
588 // Now continue searching, the subscriber may have been added twice...
598 //////////////////////////////////////////////////////////////////////////
600 void Connection::SetPDU_Factory( PDU_Factory
* P
)
604 if( m_pPduFact
)delete m_pPduFact
;
609 //////////////////////////////////////////////////////////////////////////
611 PDU_Factory
* Connection::GetPDU_Factory()
616 //////////////////////////////////////////////////////////////////////////
618 KINT32
Connection::Send( const KOCTET
* Data
, KUINT32 DataSz
)
620 KINT32 iBytesSent
= sendto( m_iSocket
[SEND_SOCK
], Data
, DataSz
, 0, ( sockaddr
* )&m_SendToAddr
, sizeof( m_SendToAddr
) );
622 if( iBytesSent
== SOCKET_ERROR
)
630 //////////////////////////////////////////////////////////////////////////
632 KINT32
Connection::Send( const KDataStream
& stream
)
634 return Send( stream
.GetBufferPtr(), stream
.GetBufferSize() );
637 //////////////////////////////////////////////////////////////////////////
639 KINT32
Connection::SendPDU( Header
* H
)
641 // First lets fire the events, then send the PDU.
642 vector
<ConnectionSubscriber
*>::iterator itr
= m_vpSubscribers
.begin();
643 vector
<ConnectionSubscriber
*>::iterator itrEnd
= m_vpSubscribers
.end();
644 for( ; itr
!= itrEnd
; ++itr
)
646 ( *itr
)->OnPDUTransmit( H
);
652 return Send( stream
.GetBufferPtr(), stream
.GetBufferSize() );
655 //////////////////////////////////////////////////////////////////////////
657 KINT32
Connection::Receive( KOCTET
* Buffer
, KUINT32 BufferSz
, KString
* SenderIp
/*= NULL*/ )
659 // We use fd_set to test the receive socket for readability. This is used in both blocking
660 // and none blocking mode however it is primarily here for none blocking mode as it is more
661 // efficient to use this method than to continuously poll the socket.
664 FD_SET( m_iSocket
[RECEIVE_SOCK
], &fd
);
667 if( !m_bBlockingSocket
)
669 // If we are using none blocking mode we need to set
670 // a time limit to wait for the select function.
678 // Even in blocking mode, it can be useful to return
679 // occasionally after a long period without data.
680 // where (long period == a second or so).
681 // This can make clean exits a joy and allow status messages
682 // from the same thread
683 pTimeout
= m_blockingTimeout
;
686 // Check the socket, do we have data waiting?
687 KINT32 uiErr
= select( m_iSocket
[RECEIVE_SOCK
] + 1, &fd
, 0, 0, &pTimeout
);
689 if( uiErr
== SOCKET_ERROR
)
694 if( uiErr
) // If none zero we have data waiting
696 // Get data from socket
697 sockaddr_in ClientAddr
;
698 socklen_t iSz
= sizeof( ClientAddr
);
699 uiErr
= recvfrom( m_iSocket
[RECEIVE_SOCK
], Buffer
, BufferSz
, 0, ( sockaddr
* )&ClientAddr
, &iSz
);
701 if( uiErr
== SOCKET_ERROR
)
706 // Do we need the sending IP address?
709 *SenderIp
= inet_ntoa( ClientAddr
.sin_addr
);
716 //////////////////////////////////////////////////////////////////////////
718 unique_ptr
<Header
> Connection::GetNextPDU( KString
* SenderIp
/* = 0 */ )
720 // Are we currently dealing with a PDU Bundle, if so then dont read any new data.
721 if( m_stream
.GetBufferSize() == 0 )
723 // Get some new data from the network
724 // Create a buffer to store network data
725 KOCTET Buffer
[MAX_PDU_SIZE
];
726 KINT32 iSz
= Receive( Buffer
, MAX_PDU_SIZE
, &m_sLastIP
);
730 // Fire the first event, this event can also be used to inform us if we should stop
731 vector
<ConnectionSubscriber
*>::iterator itr
= m_vpSubscribers
.begin();
732 vector
<ConnectionSubscriber
*>::iterator itrEnd
= m_vpSubscribers
.end();
733 for( ; itr
!= itrEnd
; ++itr
)
735 if( !( *itr
)->OnDataReceived( Buffer
, iSz
, m_sLastIP
) )
738 return unique_ptr
<Header
>();
742 // Create a new data stream
744 m_stream
.CopyFromBuffer( Buffer
, iSz
);
748 // Now process the stream
749 if( m_stream
.GetBufferSize() > 0 )
751 // Do they want the IP address returned?
754 *SenderIp
= m_sLastIP
;
757 // Get the current write position
758 KUINT16 currentPos
= m_stream
.GetCurrentWritePosition();
762 // Get the next/only PDU from the stream
763 unique_ptr
<Header
> pdu
= m_pPduFact
->Decode( m_stream
);
765 // If the PDU was decoded successfully then fire the next event
768 vector
<ConnectionSubscriber
*>::iterator itr
= m_vpSubscribers
.begin();
769 vector
<ConnectionSubscriber
*>::iterator itrEnd
= m_vpSubscribers
.end();
770 for( ; itr
!= itrEnd
; ++itr
)
772 ( *itr
)->OnPDUReceived( pdu
.get() );
775 // Set the write pos for the next pdu. We do this here as its possible that when the PDU was decoded that some data may
776 // have been left un-decoded so to be extra safe we use the reported pdu size and not the current stream.
777 m_stream
.SetCurrentWritePosition( currentPos
+ pdu
->GetPDULength() );
779 // Now return the decoded pdu
784 // If a PDU could not be decoded in the PDU bundle, then we need to throw
785 // out the whole stream. There is no way to know where the next PDU might start
786 // in the data stream.
790 catch( const exception
& e
)
792 // Something went wrong, the stream is likely corrupted now so wipe it or we will have issues in the next GetNextPDU call.
798 return unique_ptr
<Header
>(); // No data so Null ptr
801 //////////////////////////////////////////////////////////////////////////