Update Connection.cpp
[KDIS.git] / KDIS / KDIS / Network / Connection.cpp
blob49e9c8fa2d23a1b6ba3aae2458ad1dcce37f95e7
1 /*********************************************************************
2 Copyright 2013 Karl Jones
3 All rights reserved.
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice, this
9 list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright notice,
11 this list of conditions and the following disclaimer in the documentation
12 and/or other materials provided with the distribution.
14 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 For Further Information Please Contact me at
26 Karljj1@yahoo.com
27 http://p.sf.net/kdis/UserGuide
28 *********************************************************************/
30 #include <ostream>
31 #include <iostream>
32 #include "./Connection.h"
34 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 ) // Windows Headers //
36 #include <ws2tcpip.h>
38 #define ERROR_CODE WSAGetLastError()
40 #else // Linux Headers //
42 #include <sys/time.h>
43 #include <unistd.h>
45 #define INVALID_SOCKET -1
46 #define SOCKET_ERROR -1
47 #define ERROR_CODE errno
49 #endif
51 #define THROW_ERROR throw KException( getErrorText( ERROR_CODE ), CONNECTION_SOCKET_ERROR )
52 #define SEND_SOCK 0
53 #define RECEIVE_SOCK 1
55 using namespace KDIS;
56 using namespace PDU;
57 using namespace UTILS;
58 using namespace NETWORK;
59 using namespace std;
61 //////////////////////////////////////////////////////////////////////////
62 // protected:
63 //////////////////////////////////////////////////////////////////////////
65 void Connection::startup()
67 // Create the sockets if they do not already exist.
68 if( !m_iSocket[SEND_SOCK] && !m_iSocket[RECEIVE_SOCK] )
70 // Windows only //
71 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
72 WSADATA w;
73 static KINT32 iWinSockInit = WSAStartup( 0x0202, &w ); // Init with winsock version 2.2
74 if( iWinSockInit != NO_ERROR )
76 THROW_ERROR;
78 #endif
80 // Create both sockets
81 for( KINT8 i = 0; i < 2; ++i )
83 m_iSocket[i] = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); // Create a IPv4 UDP socket.
85 if( m_iSocket[i] == INVALID_SOCKET )
87 THROW_ERROR;
91 // Bind socket for sending and/or receiving data.
92 bindSocket();
96 //////////////////////////////////////////////////////////////////////////
98 void Connection::bindSocket()
100 // Set the receive socket to be reusable. Useful if your server has
101 // been shut down, and then restarted right away.
102 KINT32 yes = 1;
103 KINT32 iRet = setsockopt( m_iSocket[RECEIVE_SOCK], SOL_SOCKET, SO_REUSEADDR, ( const char * )&yes, sizeof( yes ) );
104 if ( iRet == SOCKET_ERROR )
106 THROW_ERROR;
109 // Bind the *sending* socket to the specified local interface
110 if (!m_bReceiveOnly)
112 socklen_t addrlen = sizeof(m_InterfaceAddr);
113 iRet = bind( m_iSocket[SEND_SOCK], ( sockaddr * )&m_InterfaceAddr, addrlen );
114 if ( iRet == SOCKET_ERROR )
116 THROW_ERROR;
120 // Construct bind structure for receive socket, using IPADDR_ANY for Linux compatibility
121 sockaddr_in Address;
122 memset( &Address, 0, sizeof( Address ) );
123 Address.sin_port = htons( m_uiPort ); // Set listening port
124 Address.sin_family = AF_INET; // IPv4 address family
125 Address.sin_addr.s_addr = INADDR_ANY; // Any interface
127 // Bind the *receiving* socket to the chosen port (on all local interfaces)
128 if( !m_bSendOnly )
130 iRet = bind( m_iSocket[RECEIVE_SOCK], ( sockaddr * )&Address, sizeof( Address ) );
131 if( iRet == SOCKET_ERROR )
133 THROW_ERROR;
138 //////////////////////////////////////////////////////////////////////////
140 void Connection::shutdown()
142 for( KINT8 i = 0; i < 2; ++i )
144 if( m_iSocket[i] )
146 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
147 closesocket( m_iSocket[i] );
148 #else
149 close( m_iSocket[i] );
150 #endif
152 m_iSocket[i] = 0;
157 //////////////////////////////////////////////////////////////////////////
159 const KCHAR8 * Connection::getErrorText( KINT32 ErrorCode ) const
161 switch ( ErrorCode )
163 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
164 case /* 10004 */ WSAEINTR:
165 return "WSAEINTR: Invalid interrupt number";
166 case /* 10009 */ WSAEBADF:
167 return "WSAEBADF: Invalid file number";
168 case /* 10013 */ WSAEACCES:
169 return "WSAEACCES";
170 case /* 10014 */ WSAEFAULT:
171 return "WSAEFAULT - The buf parameter is not completely contained in a valid part of the user address space.";
172 case /* 10022 */ WSAEINVAL:
173 return "WSAEINVAL, Invalid argument";
174 case /* 10024 */ WSAEMFILE:
175 return "WSAEMFILE, Is a File";
176 case /* 10035 */ WSAEWOULDBLOCK:
177 return "WSAEWOULDBLOCK: Function would block";
178 case /* 10036 */ WSAEINPROGRESS:
179 return "WSAEINPROGRESS: Winsock blocking function in progress";
180 case /* 10037 */ WSAEALREADY:
181 return "WSAEALREADY";
182 case /* 10038 */ WSAENOTSOCK:
183 return "WSAENOTSOCK";
184 case /* 10039 */ WSAEDESTADDRREQ:
185 return "WSAEDESTADDRREQ";
186 case /* 10040 */ WSAEMSGSIZE:
187 return "WSAEMSGSIZE";
188 case /* 10041 */ WSAEPROTOTYPE:
189 return "WSAEPROTOTYPE";
190 case /* 10042 */ WSAENOPROTOOPT:
191 return "WSAENOPROTOOPT";
192 case /* 10043 */ WSAEPROTONOSUPPORT:
193 return "WSAEPROTONOSUPPORT: Protocol not supported.";
194 case /* 10044 */ WSAESOCKTNOSUPPORT:
195 return "WSAESOCKTNOSUPPORT: Socket type not supported.";
196 case /* 10045 */ WSAEOPNOTSUPP:
197 return "Operation not supported.";
198 case /* 10046 */ WSAEPFNOSUPPORT:
199 return "WSAEPFNOSUPPORT: Protocol family not supported.";
200 case /* 10047 */ WSAEAFNOSUPPORT:
201 return "WSAEAFNOSUPPORT: Address family not supported by protocol family.";
202 case /* 10048 */ WSAEADDRINUSE:
203 return "WSAEADDRINUSE: Address is already in use";
204 case /* 10049 */ WSAEADDRNOTAVAIL:
205 return "WSAEADDRNOTAVAIL: Address is not available";
206 case /* 10050 */ WSAENETDOWN:
207 return "WSAENETDOWN: Network is down";
208 case /* 10051 */ WSAENETUNREACH:
209 return "WSAENETUNREACH: Network is unreachable";
210 case /* 10052 */ WSAENETRESET:
211 return "WSAENETRESET: Network has been reset";
212 case /* 10053 */ WSAECONNABORTED:
213 return "WSAECONNABORTED: Connection aborted";
214 case /* 10054 */ WSAECONNRESET:
215 return "WSAECONNRESET: Connection has been reset";
216 case /* 10056 */ WSAEISCONN:
217 return "WSAEISCONN: Socket Is Already Connected. ";
218 case /* 10057 */ WSAENOTCONN:
219 return "WSAENOTCONN: Socket not connected";
220 case /* 10058 */ WSAESHUTDOWN:
221 return "WSAESHUTDOWN: Socket has been shutdown";
222 case /* 10059 */ WSAETOOMANYREFS:
223 return "WSAETOOMANYREFS: Too many references";
224 case /* 10060 */ WSAETIMEDOUT:
225 return "WSAETIMEDOUT: Operation timed out";
226 case /* 10061 */ WSAECONNREFUSED:
227 return "WSAECONNREFUSED: Connection refused";
228 case /* 10062 */ WSAELOOP:
229 return "WSAELOOP";
230 case /* 10063 */ WSAENAMETOOLONG:
231 return "WSAENAMETOOLONG: Name too long";
232 case /* 10064 */ WSAEHOSTDOWN:
233 return "WSAEHOSTDOWN: Host is down";
234 case /* 10065 */ WSAEHOSTUNREACH:
235 return "WSAEHOSTUNREACH: Host is unreachable";
236 case /* 10066 */ WSAENOTEMPTY:
237 return "WSAENOTEMPTY";
238 case /* 10067 */ WSAEPROCLIM:
239 return "WSAEPROCLIM";
240 case /* 10068 */ WSAEUSERS:
241 return "WSAEUSERS";
242 case /* 10069 */ WSAEDQUOT:
243 return "WSAEUSERS";
244 case /* 10070 */ WSAESTALE:
245 return "WSAESTALE";
246 case /* 10071 */ WSAEREMOTE:
247 return "WSAEREMOTE";
248 case /* 10091 */ WSASYSNOTREADY:
249 return "WSASYSNOTREADY: System not ready"; // WSAStartup
250 case /* 10092 */ WSAVERNOTSUPPORTED:
251 return "WSAVERNOTSUPPORTED: Version not supported"; // WSAStartup
252 case /* 10093 */ WSANOTINITIALISED:
253 return "WSANOTINITIALISED: Winsock not initialised"; // WSAStartup not called
254 case /* 10101 */ WSAEDISCON:
255 return "WSAEDISCON: Remote party has disconnected";
256 case /* 11001 */ WSAHOST_NOT_FOUND:
257 return "Host not found";
258 case /* 11002 */ WSATRY_AGAIN:
259 return "WSATRY_AGAIN: Try again";
260 case /* 11003 */ WSANO_RECOVERY:
261 return "WSANO_RECOVERY: No recovery";
262 case /* 11004 */ WSANO_DATA:
263 return "WSANO_DATA: No data";
264 #else
265 case /* 00001 */ EPERM:
266 return "EPERM: Operation not permitted";
267 case /* 00002 */ ENOENT:
268 return "ENOENT: No such file or directory";
269 case /* 00003 */ ESRCH:
270 return "ESRCH: No such process";
271 case /* 00004 */ EINTR:
272 return "EINTR: Interrupted system call";
273 case /* 00005 */ EIO:
274 return "EIO: I/O error";
275 case /* 00006 */ ENXIO:
276 return "ENXIO: No such device or address";
277 case /* 00007 */ E2BIG:
278 return "E2BIG: Argument list too long";
279 case /* 00008 */ ENOEXEC:
280 return "ENOEXEC: Exec format error";
281 case /* 00009 */ EBADF:
282 return "EBADF: Bad file number";
283 case /* 00010 */ ECHILD:
284 return "ECHILD: No child processes";
285 case /* 00011 */ EAGAIN:
286 return "EAGAIN: Try again";
287 case /* 00012 */ ENOMEM:
288 return "ENOMEM: Out of memory";
289 case /* 00013 */ EACCES:
290 return "EACCES: Permission denied";
291 case /* 00014 */ EFAULT:
292 return "EFAULT: Bad address";
293 case /* 00015 */ ENOTBLK:
294 return "ENOTBLK: Block device required";
295 case /* 00016 */ EBUSY:
296 return "EBUSY: Device or resource busy";
297 case /* 00017 */ EEXIST:
298 return "EEXIST: File exists";
299 case /* 00018 */ EXDEV:
300 return "EXDEV: Cross-device link";
301 case /* 00019 */ ENODEV:
302 return "ENODEV: No such device";
303 case /* 00020 */ ENOTDIR:
304 return "ENOTDIR: Not a directory";
305 case /* 00021 */ EISDIR:
306 return "EISDIR: Is a directory";
307 case /* 00022 */ EINVAL:
308 return "Invalid argument";
309 case /* 00023 */ ENFILE:
310 return "ENFILE: File table overflow";
311 case /* 00024 */ EMFILE:
312 return "EMFILE: Too many open files";
313 case /* 00025 */ ENOTTY:
314 return "ENOTTY: Not a typewriter";
315 case /* 00026 */ ETXTBSY:
316 return "ETXTBSY: Text file busy";
317 case /* 00027 */ EFBIG:
318 return "EFBIG: File too large";
319 case /* 00028 */ ENOSPC:
320 return "ENOSPC: No space left on device";
321 case /* 00029 */ ESPIPE:
322 return "ESPIPE: Illegal seek";
323 case /* 00030 */ EROFS:
324 return "EROFS: Read-only file system";
325 case /* 00031 */ EMLINK:
326 return "EMLINK: Too many links";
327 case /* 00032 */ EPIPE:
328 return "EPIPE: Broken pipe";
329 case /* 00033 */ EDOM:
330 return "EDOM: Math argument out of domain of func";
331 case /* 00034 */ ERANGE:
332 return "ERANGE: Math result not representable";
333 #endif
335 default:
336 return "Unknown Socket Error";
340 //////////////////////////////////////////////////////////////////////////
341 // public:
342 //////////////////////////////////////////////////////////////////////////
344 Connection::Connection( const KString & SendAddress, KUINT32 Port /* = 3000 */, KBOOL SendAddressIsMulticast /* = false */,
345 KBOOL Blocking /* = true */, PDU_Factory * Custom /* = 0 */, KBOOL SendOnly /* = false*/,
346 KBOOL ReceiveOnly /* = false*/, const KString & InterfaceAddress /* = "" */) :
347 m_uiPort( Port ),
348 m_bBlockingSocket( Blocking ),
349 m_bSendOnly( SendOnly ),
350 m_bReceiveOnly( ReceiveOnly )
352 m_iSocket[SEND_SOCK] = 0;
353 m_iSocket[RECEIVE_SOCK] = 0;
355 m_blockingTimeout.tv_sec = 0;
356 m_blockingTimeout.tv_usec = 0;
358 SetInterfaceAddress( InterfaceAddress ); //specify which network interface to use for DIS
360 startup();
362 SetSendAddress( SendAddress, SendAddressIsMulticast );
364 SetBlockingModeEnabled( Blocking );
366 // Should we use the standard PDU_Factory or have we been supplied we a customised one?
367 if( Custom )
369 m_pPduFact = Custom;
371 else
373 m_pPduFact = new PDU_Factory;
377 //////////////////////////////////////////////////////////////////////////
379 Connection::~Connection()
381 shutdown();
382 delete m_pPduFact;
385 //////////////////////////////////////////////////////////////////////////
387 Connection::Connection( const Connection& other )
389 // Disallow copy constructor.
390 throw KException(__FUNCTION__, INVALID_OPERATION);
393 //////////////////////////////////////////////////////////////////////////
395 Connection& Connection::operator=( const Connection& other )
397 if( this != &other)
399 // Disallow copy assignment.
400 throw KException(__FUNCTION__, INVALID_OPERATION);
402 return *this;
405 //////////////////////////////////////////////////////////////////////////
407 // Set the local interface to use for sending DIS packets,
408 // otherwise the kernel chooses first interface with path to destination,
409 // which is ambiguous in the case of multicast and some broadcast endpoints.
410 void Connection::SetInterfaceAddress( const KString & A )
411 // Connection defaults to send on first interface with route to destination if interface not specified (i.e. A = "")
413 socklen_t addrlen = sizeof( m_InterfaceAddr );
415 m_sInterfaceAddress = A;
416 memset( &m_InterfaceAddr, 0, addrlen );
417 m_InterfaceAddr.sin_family = AF_INET;
418 m_InterfaceAddr.sin_addr.s_addr = ( A.empty() ? INADDR_ANY : inet_addr( m_sInterfaceAddress.c_str() ) );
421 const KString & Connection::GetInterfaceAddress() const
423 return m_sInterfaceAddress;
426 //////////////////////////////////////////////////////////////////////////
428 // Note: We must bind() the sending socket to specific interface (m_InterfaceAddr) (if desired)
429 // before calling this or else connect() will automatically bind() the socket to an interface
430 // of the kernel's choosing. Thus, SetInterfaceAddress() and startup() (which in turn calls
431 // bindSocket(), where bind() is called on the sending socket,)) should be called before this.
432 void Connection::SetSendAddress( const KString & A, KBOOL Multicast /*= false */ )
434 m_sSendAddress = A;
436 // Create the send to address structure
437 memset( &m_SendToAddr, 0, sizeof( m_SendToAddr ) );
438 m_SendToAddr.sin_family = AF_INET;
439 m_SendToAddr.sin_addr.s_addr = inet_addr( m_sSendAddress.c_str() );
440 m_SendToAddr.sin_port = htons( m_uiPort );
442 if( Multicast )
444 AddMulticastAddress( A );
446 // TODO: Do we need to disable broadcasting on the socket if we switch to multicast?
448 else
450 // Enable broadcasting on the send socket
451 KINT32 yes = 1;
452 KINT32 iRet = setsockopt( m_iSocket[SEND_SOCK], SOL_SOCKET, SO_BROADCAST, ( const char * )&yes, sizeof( yes ) );
453 if( iRet == SOCKET_ERROR )
455 THROW_ERROR;
459 // Attempt to 'connect()' to remote endpoint (prompts kernel to pick a local interface if not
460 // specified). If not specified in m_InterfaceAddr, kernel will choose interface as follows:
461 // If m_SendToAddr is empty, the connect() will fail, and loopback (127.0.0.1) will be chosen.
462 // If m_SendToAddr is already set, the first interface with a route to m_SendToAddr is chosen.
463 ( void )connect( m_iSocket[SEND_SOCK], ( sockaddr * )&m_SendToAddr, sizeof( m_SendToAddr ) ); //ignore return value - OK to fail
465 // Update the socket's (user-specified or kernel-chosen) interface address info (m_InterfaceAddr)
466 socklen_t addrlen = sizeof(m_InterfaceAddr);
467 ( void )getsockname( m_iSocket[SEND_SOCK], ( sockaddr * )&m_InterfaceAddr, &addrlen );
469 // Update string representation of interface address (m_sInterfaceAddress) too
470 char buf[INET_ADDRSTRLEN];
471 const char* p_str = inet_ntop( AF_INET, &m_InterfaceAddr.sin_addr, buf, INET_ADDRSTRLEN ); //returns null-terminated string or NULL
472 if ( p_str != 0x0 ) //if inet_ntop() returns an endpoint address, it was successful
474 m_sInterfaceAddress = p_str;
478 //////////////////////////////////////////////////////////////////////////
480 const KString & Connection::GetSendAddress() const
482 return m_sSendAddress;
485 //////////////////////////////////////////////////////////////////////////
487 void Connection::AddMulticastAddress( const KString & A )
489 // Attempt to join the group
490 ip_mreq mc;
491 mc.imr_multiaddr.s_addr = inet_addr( m_sSendAddress.c_str() );
492 mc.imr_interface = m_InterfaceAddr.sin_addr;
493 KINT32 iRet = setsockopt( m_iSocket[RECEIVE_SOCK], IPPROTO_IP, IP_ADD_MEMBERSHIP, ( KOCTET* )&mc, sizeof( mc ) );
494 if( iRet == SOCKET_ERROR )
496 THROW_ERROR;
500 //////////////////////////////////////////////////////////////////////////
502 void Connection::RemoveMulticastAddress( const KString & A )
504 // Attempt to drop the address.
505 ip_mreq mc;
506 mc.imr_multiaddr.s_addr = inet_addr( m_sSendAddress.c_str() );
507 mc.imr_interface = m_InterfaceAddr.sin_addr;
508 KINT32 iRet = setsockopt( m_iSocket[RECEIVE_SOCK], IPPROTO_IP, IP_DROP_MEMBERSHIP, ( KOCTET* )&mc, sizeof( mc ) );
509 if( iRet == SOCKET_ERROR )
511 THROW_ERROR;
515 //////////////////////////////////////////////////////////////////////////
517 void Connection::SetBlockingModeEnabled( KBOOL E )
519 m_bBlockingSocket = E;
521 for( KINT8 i = 0; i < 2; ++i )
523 KINT32 iResult;
525 #if defined( WIN32 ) | defined( _WIN32 ) | defined( WIN64 ) | defined( _WIN64 )
527 // Windows non blocking //
528 unsigned long int uliIoctBlock = !m_bBlockingSocket; // 1 enable, 0 disable.
529 iResult = ioctlsocket( m_iSocket[i], FIONBIO, &uliIoctBlock ); // FIONBIO = blocking mode
531 #else
533 // Linux non blocking //
534 KINT32 uliIoctBlock = !m_bBlockingSocket; // 1 enable, 0 disable.
535 iResult = fcntl( m_iSocket[i], F_SETFL, O_NONBLOCK | FASYNC, &uliIoctBlock );
537 #endif
539 if( iResult == SOCKET_ERROR )
541 THROW_ERROR;
546 //////////////////////////////////////////////////////////////////////////
548 KBOOL Connection::IsBlockingModeEnabled() const
550 return m_bBlockingSocket;
553 //////////////////////////////////////////////////////////////////////////
555 void Connection::SetBlockingTimeOut( KINT32 sec, KINT32 usec )
557 m_blockingTimeout.tv_sec = sec;
558 m_blockingTimeout.tv_usec = usec;
561 //////////////////////////////////////////////////////////////////////////
563 void Connection::AddSubscriber( ConnectionSubscriber * S )
565 if( S )
567 m_vpSubscribers.push_back( S );
571 //////////////////////////////////////////////////////////////////////////
573 void Connection::RemoveSubscriber( ConnectionSubscriber * S )
575 if( S )
577 // Perform a linear search for the subscriber.
578 vector<ConnectionSubscriber*>::iterator itr = m_vpSubscribers.begin();
579 vector<ConnectionSubscriber*>::iterator itrEnd = m_vpSubscribers.end();
580 while( itr != itrEnd )
582 if( *itr == S )
584 // Remove the subsriber
585 itr = m_vpSubscribers.erase( itr );
586 itrEnd = m_vpSubscribers.end();
588 // Now continue searching, the subscriber may have been added twice...
590 else
592 ++itr;
598 //////////////////////////////////////////////////////////////////////////
600 void Connection::SetPDU_Factory( PDU_Factory * P )
602 if( P )
604 if( m_pPduFact )delete m_pPduFact;
605 m_pPduFact = P;
609 //////////////////////////////////////////////////////////////////////////
611 PDU_Factory * Connection::GetPDU_Factory()
613 return m_pPduFact;
616 //////////////////////////////////////////////////////////////////////////
618 KINT32 Connection::Send( const KOCTET * Data, KUINT32 DataSz )
620 KINT32 iBytesSent = sendto( m_iSocket[SEND_SOCK], Data, DataSz, 0, ( sockaddr * )&m_SendToAddr, sizeof( m_SendToAddr ) );
622 if( iBytesSent == SOCKET_ERROR )
624 THROW_ERROR;
627 return iBytesSent;
630 //////////////////////////////////////////////////////////////////////////
632 KINT32 Connection::Send( const KDataStream & stream )
634 return Send( stream.GetBufferPtr(), stream.GetBufferSize() );
637 //////////////////////////////////////////////////////////////////////////
639 KINT32 Connection::SendPDU( Header * H )
641 // First lets fire the events, then send the PDU.
642 vector<ConnectionSubscriber*>::iterator itr = m_vpSubscribers.begin();
643 vector<ConnectionSubscriber*>::iterator itrEnd = m_vpSubscribers.end();
644 for( ; itr != itrEnd; ++itr )
646 ( *itr )->OnPDUTransmit( H );
649 // Now send the PDU
650 KDataStream stream;
651 H->Encode( stream );
652 return Send( stream.GetBufferPtr(), stream.GetBufferSize() );
655 //////////////////////////////////////////////////////////////////////////
657 KINT32 Connection::Receive( KOCTET * Buffer, KUINT32 BufferSz, KString * SenderIp /*= NULL*/ )
659 // We use fd_set to test the receive socket for readability. This is used in both blocking
660 // and none blocking mode however it is primarily here for none blocking mode as it is more
661 // efficient to use this method than to continuously poll the socket.
662 fd_set fd;
663 FD_ZERO( &fd );
664 FD_SET( m_iSocket[RECEIVE_SOCK], &fd );
665 timeval pTimeout;
667 if( !m_bBlockingSocket )
669 // If we are using none blocking mode we need to set
670 // a time limit to wait for the select function.
671 timeval tval;
672 tval.tv_sec = 0;
673 tval.tv_usec = 1;
674 pTimeout = tval;
676 else
678 // Even in blocking mode, it can be useful to return
679 // occasionally after a long period without data.
680 // where (long period == a second or so).
681 // This can make clean exits a joy and allow status messages
682 // from the same thread
683 pTimeout = m_blockingTimeout;
686 // Check the socket, do we have data waiting?
687 KINT32 uiErr = select( m_iSocket[RECEIVE_SOCK] + 1, &fd, 0, 0, &pTimeout );
689 if( uiErr == SOCKET_ERROR )
691 THROW_ERROR;
694 if( uiErr ) // If none zero we have data waiting
696 // Get data from socket
697 sockaddr_in ClientAddr;
698 socklen_t iSz = sizeof( ClientAddr );
699 uiErr = recvfrom( m_iSocket[RECEIVE_SOCK], Buffer, BufferSz, 0, ( sockaddr * )&ClientAddr, &iSz );
701 if( uiErr == SOCKET_ERROR )
703 THROW_ERROR;
706 // Do we need the sending IP address?
707 if( SenderIp )
709 *SenderIp = inet_ntoa( ClientAddr.sin_addr );
713 return uiErr;
716 //////////////////////////////////////////////////////////////////////////
718 unique_ptr<Header> Connection::GetNextPDU( KString * SenderIp /* = 0 */ )
720 // Are we currently dealing with a PDU Bundle, if so then dont read any new data.
721 if( m_stream.GetBufferSize() == 0 )
723 // Get some new data from the network
724 // Create a buffer to store network data
725 KOCTET Buffer[MAX_PDU_SIZE];
726 KINT32 iSz = Receive( Buffer, MAX_PDU_SIZE, &m_sLastIP );
728 if( iSz )
730 // Fire the first event, this event can also be used to inform us if we should stop
731 vector<ConnectionSubscriber*>::iterator itr = m_vpSubscribers.begin();
732 vector<ConnectionSubscriber*>::iterator itrEnd = m_vpSubscribers.end();
733 for( ; itr != itrEnd; ++itr )
735 if( !( *itr )->OnDataReceived( Buffer, iSz, m_sLastIP ) )
737 // We should quit
738 return unique_ptr<Header>();
742 // Create a new data stream
743 m_stream.Clear();
744 m_stream.CopyFromBuffer( Buffer, iSz );
748 // Now process the stream
749 if( m_stream.GetBufferSize() > 0 )
751 // Do they want the IP address returned?
752 if( SenderIp )
754 *SenderIp = m_sLastIP;
757 // Get the current write position
758 KUINT16 currentPos = m_stream.GetCurrentWritePosition();
762 // Get the next/only PDU from the stream
763 unique_ptr<Header> pdu = m_pPduFact->Decode( m_stream );
765 // If the PDU was decoded successfully then fire the next event
766 if( pdu.get() )
768 vector<ConnectionSubscriber*>::iterator itr = m_vpSubscribers.begin();
769 vector<ConnectionSubscriber*>::iterator itrEnd = m_vpSubscribers.end();
770 for( ; itr != itrEnd; ++itr )
772 ( *itr )->OnPDUReceived( pdu.get() );
775 // Set the write pos for the next pdu. We do this here as its possible that when the PDU was decoded that some data may
776 // have been left un-decoded so to be extra safe we use the reported pdu size and not the current stream.
777 m_stream.SetCurrentWritePosition( currentPos + pdu->GetPDULength() );
779 // Now return the decoded pdu
780 return pdu;
782 else
784 // If a PDU could not be decoded in the PDU bundle, then we need to throw
785 // out the whole stream. There is no way to know where the next PDU might start
786 // in the data stream.
787 m_stream.Clear();
790 catch( const exception & e )
792 // Something went wrong, the stream is likely corrupted now so wipe it or we will have issues in the next GetNextPDU call.
793 m_stream.Clear();
794 throw;
798 return unique_ptr<Header>(); // No data so Null ptr
801 //////////////////////////////////////////////////////////////////////////