1 /*******************************************************************************
2 * File Name : LudpReceiver.cpp
5 * Created Time : 2009-10-13 9:02:39
7 ******************************************************************************/
10 /*******************************************************************************
11 * Desc : Includes Files
12 ******************************************************************************/
14 #include "LudpReceiver.h"
17 /*******************************************************************************
18 * Desc : Macro Definations
19 ******************************************************************************/
22 /*******************************************************************************
23 * Desc : Type Definations
24 ******************************************************************************/
27 /*******************************************************************************
28 * Desc : Global Variables
29 ******************************************************************************/
32 /*******************************************************************************
33 * Desc : File Variables
34 ******************************************************************************/
41 /******************************************************************************
46 ******************************************************************************/
47 static inline UINT32
RoundAdd (UINT32 unRound
, UINT32 unVal1
, UINT32 unVal2
)
51 unResult
= (unVal1
+ unVal2
) % unRound
;
57 /******************************************************************************
62 ******************************************************************************/
63 static inline UINT32
RoundSub (UINT32 unRound
, UINT32 unVal1
, UINT32 unVal2
)
67 if (unVal1
>= unVal2
) {
68 unResult
= unVal1
- unVal2
;
70 unResult
= unRound
+ unVal1
- unVal2
;
81 /******************************************************************************
82 * Desc : Constructor / Deconstructor
83 ******************************************************************************/
90 /******************************************************************************
92 * Desc : Constructor of LudpReceiver
95 ******************************************************************************/
96 LudpReceiver::LudpReceiver ()
102 // Notify that this instance hasn't been created
109 /******************************************************************************
114 ******************************************************************************/
115 LudpReceiver::~LudpReceiver ()
125 /******************************************************************************
127 ******************************************************************************/
135 /******************************************************************************
136 * Func : LudpReceiver::Create
137 * Desc : Create instance of LudpReceiver
138 * Args : pzhuSock Socket of this receiver (shared)
139 * ucBaseType Base type process by this receiver
140 * unPkgNum Number of package buffer to be used
141 * unTimeoutMs Timeout of package since last reciption
142 * unInterval Interval of timestamp to be considered as valid
143 * Outs : If success return 0, otherwise return error code
144 ******************************************************************************/
145 int LudpReceiver::Create (
146 HLH_UDPSock
&zhuSock
,
148 OnLudpRecvFunc zolOnLudpRecv
,
149 void *pvOnLudpRecvParam
,
157 ASSERT (zolOnLudpRecv
!= NULL
);
158 ASSERT (unPkgNum
> 0);
159 ASSERT (unTimeTickUs
> 0);
160 ASSERT (unTimeout
> 0);
163 // Lock this instance
166 // Check if already created
168 // Unlock this instance
169 m_zhmMutex
.Unlock ();
170 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("already created") );
171 return LUDP_RECEIVER_ERR_ALREADY_CREATED
;
175 // Set socket pointer (share with other)
176 m_pzhuSock
= &zhuSock
;
177 zhuSock
.AddRecvHandler (OnReceived
, this);
179 m_ucBaseType
= ucBaseType
;
183 m_zhtTimeTick
.SetTime ( unTimeTickUs
/ 1000000, unTimeTickUs
% 1000000 );
184 m_unMaxTimeout
= unTimeout
;
185 m_unMaxRetry
= unMaxRetry
;
189 // Init timestamp receive range high
190 m_zhrTimestampHigh
= unTimestampInit
+ unPkgNum
- 1;
195 // Init package array
196 m_unPkgNum
= unPkgNum
;
197 m_unHighPos
= unPkgNum
/2;
199 m_pzrPkgList
= (ReceiverPackage
*) malloc ( sizeof(ReceiverPackage
) * unPkgNum
);
200 if (m_pzrPkgList
== NULL
) {
201 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("memory allocate package list failed") );
204 memset ( m_pzrPkgList
, 0, sizeof(ReceiverPackage
) * unPkgNum
);
206 // Start timeout thread
207 m_zolOnLudpRecv
= zolOnLudpRecv
;
208 m_pvOnLudpRecvParam
= pvOnLudpRecvParam
;
209 m_zhtTimeoutThread
.Start (TimeoutThreadFunc
, this);
211 // Notify that this instance created
214 // Unlock this instance
215 m_zhmMutex
.Unlock ();
220 // Unlock this instance
221 m_zhmMutex
.Unlock ();
223 return LUDP_RECEIVER_ERR_CANT_CREATE
;
227 /******************************************************************************
228 * Func : LudpReceiver::IsCreated
229 * Desc : Whether this instance created
231 * Outs : If created return true, otherwise return false
232 ******************************************************************************/
233 bool LudpReceiver::IsCreated ()
238 bCreated
= m_bCreated
;
239 m_zhmMutex
.Unlock ();
247 /******************************************************************************
248 * Func : LudpReceiver::Destroy
249 * Desc : Destory this LudpReceiver
252 ******************************************************************************/
253 void LudpReceiver::Destroy ()
256 // Stop Timeout Thread if started
257 m_zhtTimeoutThread
.Stop ();
259 // Lock this instance
263 // Unlock this instance
264 m_zhmMutex
.Unlock ();
268 ////////////////////////////////////////////////////////////////////////////////
269 // Release package list
271 if (m_pzrPkgList
!= NULL
) {
272 // Release package list
273 ClearPkgListRange (0, m_unPkgNum
-1);
279 ////////////////////////////////////////////////////////////////////////////////
280 // Notify this LudpReceiver not created
284 // Unlock this instance
285 m_zhmMutex
.Unlock ();
293 /******************************************************************************
294 * Func : LudpReceiver::IsRecvRange
295 * Desc : Whether zhrTimestamp in current receive range
296 * Args : zhrTimestamp Timestamp to check
297 * Outs : if zhrTimestamp in current receive range, return true
298 * otherwise return false
299 ******************************************************************************/
300 bool LudpReceiver::IsInRecvRange (HLH_RoundU32 zhrTimestamp
)
302 return ( zhrTimestamp
>= m_zhrTimestampHigh
- HLH_RoundU32(m_unPkgNum
)
303 && zhrTimestamp
<= m_zhrTimestampHigh
);
308 /******************************************************************************
309 * Func : LudpReceiver::UpdateRecvRange
310 * Desc : Update timestamp range and package list if need according to zhrTimestamp
311 * Args : zhrTimestamp Timestamp received
312 * Outs : if updated return true, otherwise return false
313 ******************************************************************************/
314 bool LudpReceiver::UpdateRecvRange (HLH_RoundU32 zhrTimestamp
)
317 HLH_RoundU32 zhrTimestampLow
;
318 HLH_RoundU32 zhrOldTimestampHigh
;
322 // Update only if new timestamp comming
323 if ( zhrTimestamp
> m_zhrTimestampHigh
) {
324 HLH_DEBUG ( HLH_DEBUG_LUDP
,
325 ("before update: m_zhrTimestampHigh = %u, m_unHighPos = %u",
326 (UINT32
)m_zhrTimestampHigh
, m_unHighPos
) );
330 zhrOldTimestampHigh
= m_zhrTimestampHigh
;
332 zhrTimestampLow
= (UINT32
)zhrTimestamp
- m_unPkgNum
+ 1;
333 m_zhrTimestampHigh
= zhrTimestamp
;
337 unOldHighPos
= m_unHighPos
;
338 m_unHighPos
= RoundAdd (m_unPkgNum
, m_unHighPos
,
339 m_zhrTimestampHigh
- zhrOldTimestampHigh
);
343 if ( zhrOldTimestampHigh
>= zhrTimestampLow
344 && zhrOldTimestampHigh
< m_zhrTimestampHigh
) {
345 // As zhrTimestamp > m_zhrTimestampHigh,
346 // zhrOldTimestampHigh won't equal m_zhrTimestampHigh
348 // If new range interleave with old range,
349 // clear some of package out of range.
351 if ( m_unHighPos
< unOldHighPos
) {
352 if ( unOldHighPos
+ 1 != m_unPkgNum
) {
353 ClearPkgListRange ( unOldHighPos
+1, m_unPkgNum
-1 );
355 ClearPkgListRange ( 0, m_unHighPos
);
356 } else { // m_unHighPos > unOldHighPos
357 ClearPkgListRange ( unOldHighPos
+1, m_unHighPos
);
361 // Otherwise clear all package
362 ClearPkgListRange ( 0, m_unPkgNum
- 1 );
365 HLH_DEBUG ( HLH_DEBUG_LUDP
,
366 ("after update: m_zhrTimestampHigh = %u, m_unHighPos = %u",
367 (UINT32
)m_zhrTimestampHigh
, m_unHighPos
) );
377 /******************************************************************************
378 * Func : LudpReceiver::ClearPkgListRange
379 * Desc : Clear package list in range (unPosLow ~ unPosHigh)
380 * Args : unPosLow low limit of range (included)
381 * unPosHigh high limit of range (included)
383 ******************************************************************************/
384 void LudpReceiver::ClearPkgListRange (UINT32 unPosLow
, UINT32 unPosHigh
)
389 ReceiverPackage
*pzspPkg
;
393 ASSERT ( unPosLow
< m_unPkgNum
);
394 ASSERT ( unPosHigh
< m_unPkgNum
);
395 ASSERT ( unPosLow
<= unPosHigh
);
397 HLH_DEBUG ( HLH_DEBUG_LUDP
,
398 ("clear range %u - %u", unPosLow
, unPosHigh
) );
400 for (unPkgIt
= unPosLow
; unPkgIt
<= unPosHigh
; unPkgIt
++) {
401 pzspPkg
= & m_pzrPkgList
[unPkgIt
];
402 ppzsSlices
= pzspPkg
->ppzsSlices
;
403 if (ppzsSlices
!= NULL
) {
405 for (usSliceIt
= 0; usSliceIt
< pzspPkg
->usSliceNum
; usSliceIt
++) {
406 pzsSlice
= ppzsSlices
[usSliceIt
];
407 if (pzsSlice
!= NULL
) {
415 // Clear content of packages
416 pzspPkg
= & m_pzrPkgList
[unPosLow
];
418 sizeof (ReceiverPackage
) * (unPosHigh
- unPosLow
+ 1) );
425 /******************************************************************************
426 * Func : ProcessSlice
427 * Desc : Process Slice received
428 * Args : pzsSlice slice to be process
429 * Outs : If process operation successed, return 0, otherwise return -1
430 ******************************************************************************/
431 int LudpReceiver::ProcessSlice (Slice
* pzsSlice
)
435 ReceiverPackage
*pzrpPkg
;
439 ReceiverPackage zrpPkg
;
442 ASSERT (pzsSlice
!= NULL
);
444 HLH_DEBUG ( HLH_DEBUG_LUDP
,
445 ("new slice %u/%u of package %u arrived",
446 pzsSlice
->usSliceSN
, pzsSlice
->usSliceNum
, pzsSlice
->unTimestamp
) );
448 // Lock this instance
451 // Check if this instance created
453 // Unlock this instance
454 m_zhmMutex
.Unlock ();
455 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("not created") );
456 return LUDP_RECEIVER_ERR_NOT_CREATED
;
460 // Set timestamp if first slice
463 m_zhrTimestampHigh
= pzsSlice
->unTimestamp
+ m_unPkgNum
/2;
466 if ( !IsInRecvRange (pzsSlice
->unTimestamp
) ) {
467 HLH_DEBUG ( HLH_DEBUG_LUDP
,
468 ("pkg %u not in range %u - %u",
469 pzsSlice
->unTimestamp
,
470 (UINT32
)m_zhrTimestampHigh
-m_unPkgNum
+1,
471 (UINT32
)m_zhrTimestampHigh
) );
474 // Whether need to process this slice
476 if ( IsInRecvRange (pzsSlice
->unTimestamp
)
477 || UpdateRecvRange (pzsSlice
->unTimestamp
) ) {
479 // Caculate the position of slice
480 unPos
= RoundSub (m_unPkgNum
, m_unHighPos
,
481 (UINT32
)m_zhrTimestampHigh
- pzsSlice
->unTimestamp
);
483 pzrpPkg
= & m_pzrPkgList
[unPos
];
485 // Check if this package finish receiption
486 if (pzrpPkg
->bFinished
) {
487 HLH_DEBUG ( HLH_DEBUG_LUDP
,
488 ("receive slice of package already finished") );
492 // Allocate package if need
494 ppzsSlices
= pzrpPkg
->ppzsSlices
;
495 if (ppzsSlices
== NULL
) {
496 // Allocate slice array
497 ppzsSlices
= (Slice
**) malloc ( sizeof (Slice
*) * pzsSlice
->usSliceNum
);
498 if (ppzsSlices
== NULL
) {
499 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("allocate slice failed") );
502 memset ( ppzsSlices
, 0, sizeof (Slice
*) * pzsSlice
->usSliceNum
);
505 pzrpPkg
->ppzsSlices
= ppzsSlices
;
506 pzrpPkg
->usSliceNum
= pzsSlice
->usSliceNum
;
507 pzrpPkg
->usSliceLeft
= pzsSlice
->usSliceNum
;
508 pzrpPkg
->unTimestamp
= pzsSlice
->unTimestamp
;
509 pzrpPkg
->unTimeout
= m_unMaxTimeout
;
510 pzrpPkg
->unRetry
= m_unMaxRetry
;
514 // Check if there is something wrong
515 ASSERT ( pzsSlice
->usSliceSN
< pzsSlice
->usSliceNum
);
516 ASSERT ( pzrpPkg
->usSliceNum
== pzsSlice
->usSliceNum
);
517 if (pzrpPkg
->unTimestamp
!= pzsSlice
->unTimestamp
) {
518 HLH_DEBUG ( HLH_DEBUG_MAIN
,
519 ("pzrpPkg->unTimestamp = %u, pzsSlice->unTimestamp = %u",
520 pzrpPkg
->unTimestamp
, pzsSlice
->unTimestamp
) );
522 ASSERT ( pzrpPkg
->unTimestamp
== pzsSlice
->unTimestamp
);
523 ASSERT ( pzrpPkg
->usSliceLeft
!= 0 );
527 // Insert this slice into package list
529 // Check if already received
530 if (ppzsSlices
[pzsSlice
->usSliceSN
] != NULL
) {
531 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("slice already received") );
536 pzsSliceNew
= CloneSlice (pzsSlice
);
537 if (pzsSliceNew
== NULL
) {
538 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("clone slice failed") );
541 ppzsSlices
[pzsSlice
->usSliceSN
] = pzsSliceNew
;
543 // Check if finish receiving of this package
544 if ( --pzrpPkg
->usSliceLeft
== 0 ) {
545 HLH_DEBUG ( HLH_DEBUG_LUDP
,
546 ("package %u finished", pzrpPkg
->unTimestamp
) );
547 // Make a copy of that package
548 pzrpPkg
->bFinished
= true;
549 memcpy ( &zrpPkg
, pzrpPkg
, sizeof (ReceiverPackage
) );
550 pzrpPkg
->ppzsSlices
= NULL
;
552 // Unlock this instance
553 m_zhmMutex
.Unlock ();
555 // Call callback function
556 m_zolOnLudpRecv (m_pvOnLudpRecvParam
, zrpPkg
);
559 // Unlock this instance
560 m_zhmMutex
.Unlock ();
567 // Unlock this instance
568 m_zhmMutex
.Unlock ();
570 return LUDP_RECEIVER_ERR_FAILED
;
576 /******************************************************************************
577 * Func : ReceiverThread
578 * Desc : Receive data from socket, callback function of socket
579 * Args : pvThis pointer to this instance
580 * pvBuf buffer for the data received
581 * unLen length of data received
582 * zhsPeerAddr peer address of data received
584 ******************************************************************************/
585 void LudpReceiver::OnReceived (
587 void *pvBuf
, UINT32 unLen
,
588 HLH_SockAddr
&zhsPeerAddr
)
591 LudpReceiver
*pzlrReceiver
;
594 ASSERT (pvThis
!= NULL
);
595 ASSERT (pvBuf
!= NULL
);
600 pzlrReceiver
= (LudpReceiver
*) pvThis
;
601 pzsSlice
= (Slice
*) pvBuf
;
603 // Check for slice length
604 if ( unLen
< HLH_SLICE_LENGTH (pzsSlice
) ) {
605 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("length of slice too short") );
609 // Process slice according to type
610 if ( LUDP_SLICE_BASE_TYPE (pzsSlice
->usType
) == pzlrReceiver
->m_ucBaseType
611 && LUDP_SLICE_SUB_TYPE (pzsSlice
->usType
) == LUDP_SLICE_SUB_TYPE_DATA
) {
612 pzlrReceiver
->ProcessSlice (pzsSlice
);
614 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("unknown slice type") );
620 /******************************************************************************
622 * Desc : Send Request to sender to resend some slice of package (unTimestamp)
624 * .0........1.||.2.......3.|.4.......5.| ...
625 * usSliceNum || usSliceSN | usSliceSN | ...
626 * Args : unTimestamp timestamp of package to be requested to be resent
627 * Outs : If operation successed, return 0, otherwise return -1
628 ******************************************************************************/
629 int LudpReceiver::SendRequest (UINT32 unTimestamp
)
637 ReceiverPackage
*pzrpPkg
;
640 UINT8 aucBuf
[ sizeof (Slice
) - 4 + 2 + LUDP_RECEIVER_MAX_RESEND_SLICE_SN_PER_SLICE
*2 ];
644 // Check if requested unTimestamp is in current receive range
645 if ( !IsInRecvRange (unTimestamp
) ) {
646 return LUDP_RECEIVER_ERR_FAILED
;
649 // Get info of package
650 unPos
= RoundSub ( m_unPkgNum
, m_unHighPos
,
651 (UINT32
) m_zhrTimestampHigh
- unTimestamp
);
652 pzrpPkg
= & m_pzrPkgList
[unPos
];
654 ASSERT (unTimestamp
== pzrpPkg
->unTimestamp
);
655 ASSERT (pzrpPkg
->ppzsSlices
!= NULL
);
656 ASSERT (pzrpPkg
->usSliceLeft
<= pzrpPkg
->usSliceNum
);
658 usSliceLeft
= pzrpPkg
->usSliceLeft
;
660 usSliceNum
= pzrpPkg
->usSliceNum
;
661 ppzsSlices
= pzrpPkg
->ppzsSlices
;
663 while (usSliceLeft
> 0) {
665 // Caculate number of slices in current request slice
667 = usSliceLeft
>= LUDP_RECEIVER_MAX_RESEND_SLICE_SN_PER_SLICE
668 ? LUDP_RECEIVER_MAX_RESEND_SLICE_SN_PER_SLICE
: usSliceLeft
;
669 usSliceLeft
-= usSliceCur
;
671 // Fill resend request
672 pzsSlice
= (Slice
*) aucBuf
;
673 pusSliceSN
= (UINT16
*) &pzsSlice
->aucData
[2];
675 pzsSlice
->usType
= LUDP_SLICE_TYPE (m_ucBaseType
, LUDP_SLICE_SUB_TYPE_RESEND
);
676 pzsSlice
->usDataLen
= 2 + usSliceCur
*2;
677 pzsSlice
->usSliceNum
= 1;
678 pzsSlice
->usSliceSN
= 0;
679 pzsSlice
->unTimestamp
= unTimestamp
;
681 // Number of slices to resend requested by this slice
682 *(UINT16
*) (pzsSlice
->aucData
) = usSliceCur
;
684 for ( ; usSliceIt
< usSliceNum
; usSliceIt
++ ) {
685 if (ppzsSlices
[usSliceIt
] == NULL
) {
687 *pusSliceSN
++ = usSliceIt
;
688 HLH_DEBUG ( HLH_DEBUG_LUDP
,
689 ("request resend of slice %u of pkg %u",
690 usSliceIt
, unTimestamp
) );
692 if (--usSliceCur
== 0) {
698 ASSERT (usSliceCur
== 0);
700 // Send the request slice
701 m_pzhuSock
->Send ( pzsSlice
, HLH_SLICE_LENGTH (pzsSlice
) );
710 /******************************************************************************
711 * Func : LudpReceiver::CheckTimeout
712 * Desc : Check for timeout and Send request to sender to resend some slice
713 * Args : zhtTimeoutThread The thread which called this function
714 * Outs : If success, return (void*) 0, otherwise return (void*) (error code)
715 ******************************************************************************/
716 void LudpReceiver::CheckTimeout (HLH_Thread
&zhtTimeoutThread
)
722 ReceiverPackage
*pzrpPkg
;
724 HLH_Time zhtTimeStart
;
728 // Notify that this function started
729 zhtTimeoutThread
.ThreadStarted ();
731 // Lock this instance
735 // Unlock this instance
736 m_zhmMutex
.Unlock ();
737 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("not created") );
741 // Unlock this instance
742 m_zhmMutex
.Unlock ();
749 // Lock this instance
753 zhtTimeStart
= HLH_Time::GetCurrentTime ();
756 unPkgNum
= m_unPkgNum
;
757 for (unPkgIt
= 0; unPkgIt
< unPkgNum
; unPkgIt
++) {
758 pzrpPkg
= & m_pzrPkgList
[unPkgIt
];
759 // We only request to resend package with some slice received
760 if ( pzrpPkg
->ppzsSlices
!= NULL
) {
761 if (pzrpPkg
->unTimeout
!= 0) {
762 pzrpPkg
->unTimeout
--;
764 if (pzrpPkg
->unTimeout
== 0) {
765 if (pzrpPkg
->unRetry
!= 0) {
767 pzrpPkg
->unTimeout
= m_unMaxTimeout
;
769 SendRequest (pzrpPkg
->unTimestamp
);
770 HLH_DEBUG ( HLH_DEBUG_LUDP
,
771 ("request resend of pkg %u:", pzrpPkg
->unTimestamp
) );
773 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("abort resend of pkg %u.") );
781 zhtTimeEnd
= HLH_Time::GetCurrentTime ();
783 // Wait until next time tick
785 zhtTime
= zhtTimeEnd
- zhtTimeStart
;
786 if (m_zhtTimeTick
> zhtTime
) {
787 // Cacultate the time to wait
788 zhtTime
= m_zhtTimeTick
- zhtTime
;
790 // Unlock this instance
791 m_zhmMutex
.Unlock ();
793 // Wait for another time tick
794 HLH_Time::Wait (zhtTime
);
796 // Unlock this instance
797 m_zhmMutex
.Unlock ();
800 // Check for stop request.
801 if ( zhtTimeoutThread
.IsStopping () ) {
802 HLH_DEBUG ( HLH_DEBUG_LUDP
, ("stop on request") );
813 /******************************************************************************
814 * Func : TimeoutThreadFunc
815 * Desc : Simple wrapper of CheckTimeout to make it easy to call by thread
816 * Args : pvParam pointer to LudpReceiver
817 * Outs : Always return (void*)0
818 ******************************************************************************/
819 void * LudpReceiver::TimeoutThreadFunc (HLH_Thread
&zhtTimeoutThread
, void *pvThis
)
821 ASSERT (pvThis
!= NULL
);
822 ( (LudpReceiver
*) pvThis
)->CheckTimeout (zhtTimeoutThread
);