1 /*******************************************************************************
2 * File Name : LudpSender.cpp
5 * Created Time : 2009-10-13 8:58:38
7 ******************************************************************************/
10 /*******************************************************************************
11 * Desc : Includes Files
12 ******************************************************************************/
13 #include "LudpSender.h"
16 /*******************************************************************************
17 * Desc : Macro Definations
18 ******************************************************************************/
21 /*******************************************************************************
22 * Desc : Type Definations
23 ******************************************************************************/
26 /*******************************************************************************
27 * Desc : Global Variables
28 ******************************************************************************/
31 /*******************************************************************************
32 * Desc : File Variables
33 ******************************************************************************/
40 /******************************************************************************
45 ******************************************************************************/
46 static inline UINT32
RoundAdd (UINT32 unRound
, UINT32 unVal1
, UINT32 unVal2
)
50 unResult
= (unVal1
+ unVal2
) % unRound
;
56 /******************************************************************************
61 ******************************************************************************/
62 static inline UINT32
RoundSub (UINT32 unRound
, UINT32 unVal1
, UINT32 unVal2
)
66 if (unVal1
>= unVal2
) {
67 unResult
= unVal1
- unVal2
;
69 unResult
= unRound
+ unVal1
- unVal2
;
79 /******************************************************************************
80 * Desc : Constructor / Deconstructor of LudpSender
81 ******************************************************************************/
89 /******************************************************************************
91 * Desc : Constructor of LudpSender
94 ******************************************************************************/
95 LudpSender::LudpSender ()
98 // Init mutex and condition
100 m_zhcSendRequestCond
.Init ();
102 // Notify that this instance not created
109 /******************************************************************************
110 * Func : LudpSender::~LudpSender
111 * Desc : Deconstructor of LudpSender
114 ******************************************************************************/
115 LudpSender::~LudpSender ()
124 /******************************************************************************
126 ******************************************************************************/
132 /******************************************************************************
133 * Func : LudpSender::Create
134 * Desc : Create instance of LudpSender
135 * Args : zhuSock Socket used by this LudpSender (shared)
136 * ucBaseType Base type of data process by this sender
137 * used by OnReceived to filter packages
138 * unTimestamp Initial timestamp
139 * unPkgNum Number of package to be store for resend
140 * usMaxSliceDataLen Max length of Slice Data
141 * Outs : If successe, return 0, otherwise return error code
142 ******************************************************************************/
143 int LudpSender::Create (
144 HLH_UDPSock
&zhuSock
,
148 UINT16 usMaxSliceDataLen
155 ASSERT (unPkgNum
> 0);
156 ASSERT (usMaxSliceDataLen
> 0);
158 // Lock this instance
161 // Check if already created
163 // Unlock this instance
164 m_zhmMutex
.Unlock ();
165 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("already created") );
166 return LUDP_SENDER_ERR_ALREADY_CREATED
;
169 // Set socket pointer (share with other)
170 m_pzhuSock
= &zhuSock
;
171 zhuSock
.AddRecvHandler (OnReceived
, this);
173 m_ucBaseType
= ucBaseType
;
175 // Initial Max Slice Data Len
176 m_usMaxSliceDataLen
= usMaxSliceDataLen
;
178 // Initial timestamp of package to be send
179 m_zhrTimestampCur
= unTimestamp
;
181 // Initial Send Queue
182 m_slSendQueue
.clear ();
184 // Initial Package List
185 m_unPkgNum
= unPkgNum
;
188 m_pzsPkgList
= (SenderPackage
*) malloc ( sizeof (SenderPackage
) * unPkgNum
);
189 if (m_pzsPkgList
== NULL
) {
190 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("memory allocate send pkg list failed") );
193 memset ( m_pzsPkgList
, 0, sizeof(SenderPackage
) * unPkgNum
);
196 nRetVal
= m_zhtSendThread
.Start (SendThreadFunc
, this);
198 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("start send thread failed") );
202 // Notify that this instance created
205 // Unlock this instance
206 m_zhmMutex
.Unlock ();
212 if (m_pzsPkgList
!= NULL
) {
217 // Unlock this instance
218 m_zhmMutex
.Unlock ();
220 return LUDP_SENDER_ERR_CANT_CREATE
;
225 /******************************************************************************
226 * Func : LudpSender::IsCreated
227 * Desc : Whether this instance created
229 * Outs : If created return true, otherwise return false.
230 ******************************************************************************/
231 bool LudpSender::IsCreated ()
236 bCreated
= m_bCreated
;
237 m_zhmMutex
.Unlock ();
242 /******************************************************************************
243 * Func : LudpSender::Stop
244 * Desc : Stop Ludp Sender Connection
246 * Outs : If success return 0, otherwise return error code
247 ******************************************************************************/
248 int LudpSender::Destroy ()
253 SenderPackage
*pzspPkg
;
257 ////////////////////////////////////////////////////////////////////////////////
258 // Stop send thread if started
259 m_zhtSendThread
.Stop ();
262 // Lock this instance
266 // Unlock this instance
267 m_zhmMutex
.Unlock ();
268 return LUDP_SENDER_ERR_NOT_CREATED
;
271 ////////////////////////////////////////////////////////////////////////////////
272 // Release package list
273 if (m_pzsPkgList
!= NULL
) {
276 for (unPkgIt
= 0; unPkgIt
< m_unPkgNum
; unPkgIt
++) {
277 pzspPkg
= & m_pzsPkgList
[unPkgIt
];
278 ppzsSlices
= pzspPkg
->ppzsSlices
;
279 if (ppzsSlices
!= NULL
) {
281 for (usSliceIt
= 0; usSliceIt
< pzspPkg
->usSliceNum
; usSliceIt
++) {
282 free (ppzsSlices
[usSliceIt
]);
284 // Release the slice pointer list
294 ////////////////////////////////////////////////////////////////////////////////
295 // Release send queue
297 while ( !m_slSendQueue
.empty() ) {
298 pzsSlice
= m_slSendQueue
.front ();
300 m_slSendQueue
.pop_front ();
303 ////////////////////////////////////////////////////////////////////////////////
304 // Notify that this instance not created
308 // Unlock this instance
309 m_zhmMutex
.Unlock ();
315 /******************************************************************************
317 * Desc : Divide package into slice and put them into transmit queue
318 * Args : pucBuffer Buffer of package to be send
319 * unLen Length of package
320 * Outs : If operation successed, return 0, otherwise return error code
321 ******************************************************************************/
322 int LudpSender::SendPackage (UINT8
*pucBuffer
, UINT32 unLen
)
336 // Lock this instance
340 // Unlock this instance
341 m_zhmMutex
.Unlock ();
342 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("not created") );
343 return LUDP_SENDER_ERR_NOT_CREATED
;
346 ////////////////////////////////////////////////////////////////////////////////
348 // If current position isn't empty, free resource used by that package
349 unSliceNum
= m_pzsPkgList
[m_unCurPos
].usSliceNum
;
350 ppzsSlices
= m_pzsPkgList
[m_unCurPos
].ppzsSlices
;
351 if ( unSliceNum
!= 0 && ppzsSlices
!= NULL
) {
353 for (unSliceIt
= 0; unSliceIt
< unSliceNum
; unSliceIt
++) {
354 free ( ppzsSlices
[unSliceIt
] );
359 m_pzsPkgList
[m_unCurPos
].usSliceNum
= 0;
360 m_pzsPkgList
[m_unCurPos
].ppzsSlices
= NULL
;
364 ////////////////////////////////////////////////////////////////////////////////
366 // Calculate the number of slices
367 unSliceNum
= (unLen
+ m_usMaxSliceDataLen
- 1) / m_usMaxSliceDataLen
;
370 // Allocate space to store the slice pointers
371 ppzsSlices
= (Slice
**) malloc ( sizeof (Slice
*) * unSliceNum
);
372 if (ppzsSlices
== NULL
) {
373 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("allocate slices failed") );
377 memset ( ppzsSlices
, 0, sizeof (Slice
*) * unSliceNum
);
379 m_pzsPkgList
[m_unCurPos
].zhrTimestamp
= m_zhrTimestampCur
;
380 m_pzsPkgList
[m_unCurPos
].usSliceNum
= unSliceNum
;
381 m_pzsPkgList
[m_unCurPos
].ppzsSlices
= ppzsSlices
;
383 // Process all the slices
386 for (unSliceIt
= 0; unSliceIt
< unSliceNum
; unSliceIt
++) {
388 // Caculate length of slice data
389 unSliceLen
= unLen
< m_usMaxSliceDataLen
? unLen
: m_usMaxSliceDataLen
;
391 ////////////////////////////////////////////////////////////////////////////////
392 // Put slice into resend list
394 // Allocate one Slice
395 pzsSlice
= (Slice
*) malloc (sizeof (Slice
) - 4 + unSliceLen
);
396 if (pzsSlice
== NULL
) {
401 pzsSlice
->usType
= LUDP_SLICE_TYPE (m_ucBaseType
, LUDP_SLICE_SUB_TYPE_DATA
);
402 pzsSlice
->usDataLen
= unSliceLen
;
403 pzsSlice
->usSliceNum
= unSliceNum
;
404 pzsSlice
->usSliceSN
= unSliceIt
;
405 pzsSlice
->unTimestamp
= m_zhrTimestampCur
;
407 // Copy data to buffer
408 memcpy (pzsSlice
->aucData
, pucData
, unSliceLen
);
411 ppzsSlices
[unSliceIt
] = pzsSlice
;
414 ////////////////////////////////////////////////////////////////////////////////
415 // Put slice into send queue
417 // Allocate one Slice
418 pzsSlice
= CloneSlice (pzsSlice
);
419 if (pzsSlice
== NULL
) {
420 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("clone slice failed") );
424 // Put slice into queue
425 m_slSendQueue
.push_back (pzsSlice
);
429 ////////////////////////////////////////////////////////////////////////////////
430 // Update pointers / counters
432 pucData
+= unSliceLen
;
437 // Increase timestamp (auto round up)
438 m_zhrTimestampCur
++;
440 // Move ring buffer pointer
441 m_unCurPos
= RoundAdd (m_unPkgNum
, m_unCurPos
, 1);
444 // Notify send thread if need
446 //HLH_DEBUG ( HLH_DEBUG_LUDP, ("signal send thread to send") );
447 m_zhcSendRequestCond
.Signal ();
450 // Unlock this instance
451 m_zhmMutex
.Unlock ();
458 ////////////////////////////////////////////////////////////////////////////////
459 // If we can't allocate memory successfully, just abort this package sending
460 ////////////////////////////////////////////////////////////////////////////////
462 unSliceNum
= m_pzsPkgList
[m_unCurPos
].usSliceNum
;
463 ppzsSlices
= m_pzsPkgList
[m_unCurPos
].ppzsSlices
;
465 if (ppzsSlices
!= NULL
) {
466 // Remove slice of current timestamp from list
467 for (unSliceIt
= 0; unSliceIt
< unSliceNum
; unSliceIt
++) {
468 free ( ppzsSlices
[unSliceIt
] );
474 m_pzsPkgList
[m_unCurPos
].usSliceNum
= 0;
475 m_pzsPkgList
[m_unCurPos
].ppzsSlices
= NULL
;
477 // Remove slice of current timestamp from queue
478 while ( ( pzsSlice
= m_slSendQueue
.back () ) != NULL
479 && pzsSlice
->unTimestamp
== (UINT32
) m_zhrTimestampCur
) {
481 m_slSendQueue
.pop_back ();
484 // Unlock this instance
485 m_zhmMutex
.Unlock ();
487 return LUDP_SENDER_ERR_FAILED
;
490 /******************************************************************************
491 * Func : LudpSender::__SendThreadFunc
492 * Desc : Send Thread Function:
493 * Send the slices in pszSendQueue
494 * Args : zhtThread send thread
495 * Outs : If success started, return (void*)0, otherwise return (void*)-1
496 ******************************************************************************/
497 void LudpSender::__SendThreadFunc (HLH_Thread
&zhtThread
)
506 // Notify zhtThread that SendThreadFunc () started
507 zhtThread
.ThreadStarted ();
509 if ( !IsCreated() ) {
510 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("not created") );
516 ////////////////////////////////////////////////////////////////////////////////
517 // Lock this instance
520 // Check if send request
522 // If no send request, wait for a while
523 if ( m_slSendQueue
.empty () ) {
524 m_zhcSendRequestCond
.TimeWait ( m_zhmMutex
,
525 HLH_Time (LUDP_SENDER_TIMEOUT_SEND_COND_WAIT_SECONDS
, 0) );
527 // If there is still no send,
528 if ( m_slSendQueue
.empty () ) {
529 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("timeout and send queue still empty") );
530 // Unlock this instance
531 m_zhmMutex
.Unlock ();
532 // Check for stop condition
537 // Unlock this instance
538 m_zhmMutex
.Unlock ();
541 ////////////////////////////////////////////////////////////////////////////////
542 // There are some slices to send
544 // Whether socket is ready for send until timeout
545 unPollType
= HLH_SOCK_POLL_WRITE
;
546 zhtTime
= HLH_Time (LUDP_SENDER_TIMEOUT_SOCK_WRITE_WAIT_SECONDS
, 0);
547 nRetVal
= m_pzhuSock
->PollWait ( unPollType
, zhtTime
);
549 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("poll failed") );
550 // Socket hasn't ready for send yet
555 ////////////////////////////////////////////////////////////////////////////////
556 // Send the slice now
558 // Lock this instance
561 // Maybe send queue changed
562 if ( m_slSendQueue
.empty () ) {
563 // Unlock this instance
564 m_zhmMutex
.Unlock ();
570 pzsSlice
= m_slSendQueue
.front ();
571 ASSERT (pzsSlice
!= NULL
);
574 m_pzhuSock
->Send ( pzsSlice
, HLH_SLICE_LENGTH (pzsSlice
) );
575 #else /* simulate for slice lost */
578 int nLimit
= 1023 * 3/4;
580 nRand
= rand () % 1023;
581 if (nRand
< nLimit
) {
582 m_pzhuSock
->Send ( pzsSlice
, HLH_SLICE_LENGTH (pzsSlice
) );
587 // Release the slices sent
589 m_slSendQueue
.pop_front ();
591 // Unlock this instance
592 m_zhmMutex
.Unlock ();
595 // Check for stop condition
596 if ( zhtThread
.IsStopping () ) {
597 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("stop on request") );
607 /******************************************************************************
608 * Func : LudpSender::SendThreadFunc
609 * Desc : Send Thread Function:
610 * Send the slices in pszSendQueue
611 * Args : zhtThread send thread
612 * pvThis pointer to this instance
613 * Outs : Always return 0
614 ******************************************************************************/
615 void * LudpSender::SendThreadFunc (HLH_Thread
&zhtThread
, void *pvThis
)
617 ( (LudpSender
*) pvThis
)->__SendThreadFunc (zhtThread
);
623 /******************************************************************************
624 * Func : ResendSlices
625 * Desc : Resend Package: Respond the slice which request to resend slices
626 * Args : pzsSlice the slice send by receiver to request some old slices
628 * .0........1.||.2.......3.|.4.......5.| ...
629 * usSliceNum || usSliceSN | usSliceSN | ...
631 ******************************************************************************/
632 void LudpSender::ResendSlices (Slice
*pzsSlice
)
635 HLH_RoundU32 zhrTimestamp
;
636 HLH_RoundU32 zhrTimestampLow
;
647 SenderPackage
*pzsPkg
;
655 // Get Some Info from package
656 pucData
= pzsSlice
->aucData
;
658 // Prepare resend info
659 zhrTimestamp
= pzsSlice
->unTimestamp
;
660 usSliceNum
= * (UINT16
*) &pucData
[0];
661 pusSliceSN
= (UINT16
*) &pucData
[2];
665 // Lock this instance
668 // Check if \c zhrTimestamp is valid
669 zhrTimestampLow
= (UINT32
) m_zhrTimestampCur
- m_unPkgNum
;
670 if ( zhrTimestamp
< zhrTimestampLow
|| zhrTimestamp
>= m_zhrTimestampCur
) {
671 HLH_DEBUG ( HLH_DEBUG_LUDP
,
672 ( "resend %u out of range %u ~ %u",
673 (UINT32
)zhrTimestamp
, (UINT32
)zhrTimestampLow
,
674 (UINT32
)m_zhrTimestampCur
-1 ) );
678 // Get Package info from resend list
679 unPos
= RoundSub ( m_unPkgNum
, m_unCurPos
, m_zhrTimestampCur
- zhrTimestamp
);
680 pzsPkg
= & m_pzsPkgList
[unPos
];
682 // Package may be absend as a result of memory allocate error
683 if ( IS_NULL_SENDER_PACKAGE (pzsPkg
) ) {
684 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("resend of null package") );
689 ASSERT (zhrTimestamp
== pzsPkg
->zhrTimestamp
);
693 // Resend the request slices
696 ppzsSlices
= pzsPkg
->ppzsSlices
;
698 for (usSliceIt
= 0; usSliceIt
< usSliceNum
; usSliceIt
++) {
700 // Get SliceSN from slice
701 usSliceSN
= *pusSliceSN
++;
703 HLH_DEBUG ( HLH_DEBUG_LUDP
,
704 ("resend slice %u of pkg %u", usSliceSN
, (UINT32
)zhrTimestamp
) );
706 ASSERT ( usSliceSN
< pzsPkg
->usSliceNum
);
708 // Make a copy of that slice
709 pzsSliceSrc
= ppzsSlices
[usSliceSN
];
710 pzsSliceDst
= CloneSlice (pzsSliceSrc
);
711 if (pzsSliceDst
== NULL
) {
712 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("clone slice failed") );
717 m_slSendQueue
.push_back (pzsSliceDst
);
722 // Notify send thread if need
724 //HLH_DEBUG ( HLH_DEBUG_LUDP, ("signal send thread to send") );
725 m_zhcSendRequestCond
.Signal ();
732 m_zhmMutex
.Unlock ();
742 /******************************************************************************
743 * Func : LudpSender::OnReceived
744 * Desc : Callback function: receive data from socket
745 * Receive the slices sent by receiver and respond to them
746 * Args : pvThis pointer to this instance
747 * pvBuf buffer for the data received
748 * unLen length of data received
749 * zhsPeerAddr peer address of data received
751 ******************************************************************************/
752 void LudpSender::OnReceived (
754 void *pvBuf
, UINT32 unLen
,
755 HLH_SockAddr
&zhsPeerAddr
)
757 LudpSender
*pzlsSender
;
760 ASSERT ( pvThis
!= NULL
);
761 ASSERT ( pvBuf
!= NULL
);
762 ASSERT ( unLen
!= 0 );
766 // Pointer to this instance
767 pzlsSender
= (LudpSender
*) pvThis
;
769 // Make it convenience to access slice member
770 pzsSlice
= (Slice
*) pvBuf
;
773 // If resend requested
774 if ( LUDP_SLICE_BASE_TYPE(pzsSlice
->usType
) == pzlsSender
->m_ucBaseType
775 && LUDP_SLICE_SUB_TYPE (pzsSlice
->usType
) == LUDP_SLICE_SUB_TYPE_RESEND
) {
776 pzlsSender
->ResendSlices (pzsSlice
);
778 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("unknown slice type") );