Create Project for repo.or.cz
[vp.git] / src / ludp / LudpSender.cpp
blob20af9824b370b9bc0cea233fe9565ca828084553
1 /*******************************************************************************
2 * File Name : LudpSender.cpp
3 *
4 * Author : Henry He
5 * Created Time : 2009-10-13 8:58:38
6 * Description :
7 ******************************************************************************/
10 /*******************************************************************************
11 * Desc : Includes Files
12 ******************************************************************************/
13 #include "LudpSender.h"
16 /*******************************************************************************
17 * Desc : Macro Definations
18 ******************************************************************************/
21 /*******************************************************************************
22 * Desc : Type Definations
23 ******************************************************************************/
26 /*******************************************************************************
27 * Desc : Global Variables
28 ******************************************************************************/
31 /*******************************************************************************
32 * Desc : File Variables
33 ******************************************************************************/
40 /******************************************************************************
41 * Func : RoundAdd
42 * Desc :
43 * Args :
44 * Outs :
45 ******************************************************************************/
46 static inline UINT32 RoundAdd (UINT32 unRound, UINT32 unVal1, UINT32 unVal2)
48 UINT32 unResult;
50 unResult = (unVal1 + unVal2) % unRound;
52 return unResult;
56 /******************************************************************************
57 * Func : RoundSub
58 * Desc :
59 * Args :
60 * Outs :
61 ******************************************************************************/
62 static inline UINT32 RoundSub (UINT32 unRound, UINT32 unVal1, UINT32 unVal2)
64 UINT32 unResult;
66 if (unVal1 >= unVal2) {
67 unResult = unVal1 - unVal2;
68 } else {
69 unResult = unRound + unVal1 - unVal2;
72 return unResult;
79 /******************************************************************************
80 * Desc : Constructor / Deconstructor of LudpSender
81 ******************************************************************************/
89 /******************************************************************************
90 * Func : LudpSender
91 * Desc : Constructor of LudpSender
92 * Args : NONE
93 * Outs : NONE
94 ******************************************************************************/
95 LudpSender::LudpSender ()
98 // Init mutex and condition
99 m_zhmMutex.Init ();
100 m_zhcSendRequestCond.Init ();
102 // Notify that this instance not created
103 m_bCreated = false;
109 /******************************************************************************
110 * Func : LudpSender::~LudpSender
111 * Desc : Deconstructor of LudpSender
112 * Args : NONE
113 * Outs : NONE
114 ******************************************************************************/
115 LudpSender::~LudpSender ()
117 Destroy ();
124 /******************************************************************************
125 * Desc : Operations
126 ******************************************************************************/
132 /******************************************************************************
133 * Func : LudpSender::Create
134 * Desc : Create instance of LudpSender
135 * Args : zhuSock Socket used by this LudpSender (shared)
136 * ucBaseType Base type of data process by this sender
137 * used by OnReceived to filter packages
138 * unTimestamp Initial timestamp
139 * unPkgNum Number of package to be store for resend
140 * usMaxSliceDataLen Max length of Slice Data
141 * Outs : If successe, return 0, otherwise return error code
142 ******************************************************************************/
143 int LudpSender::Create (
144 HLH_UDPSock &zhuSock,
145 UINT8 ucBaseType,
146 UINT32 unTimestamp,
147 UINT32 unPkgNum,
148 UINT16 usMaxSliceDataLen
152 int nRetVal;
154 // Check arguments
155 ASSERT (unPkgNum > 0);
156 ASSERT (usMaxSliceDataLen > 0);
158 // Lock this instance
159 m_zhmMutex.Lock ();
161 // Check if already created
162 if (m_bCreated) {
163 // Unlock this instance
164 m_zhmMutex.Unlock ();
165 HLH_DEBUG ( HLH_DEBUG_LUDP, ("already created") );
166 return LUDP_SENDER_ERR_ALREADY_CREATED;
169 // Set socket pointer (share with other)
170 m_pzhuSock = &zhuSock;
171 zhuSock.AddRecvHandler (OnReceived, this);
173 m_ucBaseType = ucBaseType;
175 // Initial Max Slice Data Len
176 m_usMaxSliceDataLen = usMaxSliceDataLen;
178 // Initial timestamp of package to be send
179 m_zhrTimestampCur = unTimestamp;
181 // Initial Send Queue
182 m_slSendQueue.clear ();
184 // Initial Package List
185 m_unPkgNum = unPkgNum;
186 m_unCurPos = 0;
188 m_pzsPkgList = (SenderPackage *) malloc ( sizeof (SenderPackage) * unPkgNum );
189 if (m_pzsPkgList == NULL) {
190 HLH_DEBUG ( HLH_DEBUG_LUDP, ("memory allocate send pkg list failed") );
191 goto failed;
193 memset ( m_pzsPkgList, 0, sizeof(SenderPackage) * unPkgNum );
195 // Start Send thread
196 nRetVal = m_zhtSendThread.Start (SendThreadFunc, this);
197 if (nRetVal < 0) {
198 HLH_DEBUG ( HLH_DEBUG_LUDP, ("start send thread failed") );
199 goto failed;
202 // Notify that this instance created
203 m_bCreated = true;
205 // Unlock this instance
206 m_zhmMutex.Unlock ();
208 return 0;
210 failed:
212 if (m_pzsPkgList != NULL) {
213 free (m_pzsPkgList);
214 m_pzsPkgList = NULL;
217 // Unlock this instance
218 m_zhmMutex.Unlock ();
220 return LUDP_SENDER_ERR_CANT_CREATE;
225 /******************************************************************************
226 * Func : LudpSender::IsCreated
227 * Desc : Whether this instance created
228 * Args : NONE
229 * Outs : If created return true, otherwise return false.
230 ******************************************************************************/
231 bool LudpSender::IsCreated ()
233 bool bCreated;
235 m_zhmMutex.Lock ();
236 bCreated = m_bCreated;
237 m_zhmMutex.Unlock ();
239 return bCreated;
242 /******************************************************************************
243 * Func : LudpSender::Stop
244 * Desc : Stop Ludp Sender Connection
245 * Args : NONE
246 * Outs : If success return 0, otherwise return error code
247 ******************************************************************************/
248 int LudpSender::Destroy ()
250 UINT32 unPkgIt;
251 UINT16 usSliceIt;
253 SenderPackage *pzspPkg;
254 Slice **ppzsSlices;
255 Slice *pzsSlice;
257 ////////////////////////////////////////////////////////////////////////////////
258 // Stop send thread if started
259 m_zhtSendThread.Stop ();
262 // Lock this instance
263 m_zhmMutex.Lock ();
265 if (!m_bCreated) {
266 // Unlock this instance
267 m_zhmMutex.Unlock ();
268 return LUDP_SENDER_ERR_NOT_CREATED;
271 ////////////////////////////////////////////////////////////////////////////////
272 // Release package list
273 if (m_pzsPkgList != NULL) {
275 // Release packages
276 for (unPkgIt = 0; unPkgIt < m_unPkgNum; unPkgIt ++) {
277 pzspPkg = & m_pzsPkgList [unPkgIt];
278 ppzsSlices = pzspPkg->ppzsSlices;
279 if (ppzsSlices != NULL) {
280 // Release slices
281 for (usSliceIt = 0; usSliceIt < pzspPkg->usSliceNum; usSliceIt ++) {
282 free (ppzsSlices [usSliceIt]);
284 // Release the slice pointer list
285 free (ppzsSlices);
289 free (m_pzsPkgList);
290 m_pzsPkgList = NULL;
294 ////////////////////////////////////////////////////////////////////////////////
295 // Release send queue
297 while ( !m_slSendQueue.empty() ) {
298 pzsSlice = m_slSendQueue.front ();
299 free (pzsSlice);
300 m_slSendQueue.pop_front ();
303 ////////////////////////////////////////////////////////////////////////////////
304 // Notify that this instance not created
305 m_bCreated = false;
308 // Unlock this instance
309 m_zhmMutex.Unlock ();
311 return 0;
315 /******************************************************************************
316 * Func : SendPackage
317 * Desc : Divide package into slice and put them into transmit queue
318 * Args : pucBuffer Buffer of package to be send
319 * unLen Length of package
320 * Outs : If operation successed, return 0, otherwise return error code
321 ******************************************************************************/
322 int LudpSender::SendPackage (UINT8 *pucBuffer, UINT32 unLen)
324 UINT32 unSliceNum;
325 UINT32 unSliceIt;
327 UINT32 unSliceLen;
329 Slice **ppzsSlices;
330 Slice *pzsSlice;
332 UINT8 *pucData;
334 bool bSend = false;
336 // Lock this instance
337 m_zhmMutex.Lock ();
339 if (!m_bCreated) {
340 // Unlock this instance
341 m_zhmMutex.Unlock ();
342 HLH_DEBUG ( HLH_DEBUG_LUDP, ("not created") );
343 return LUDP_SENDER_ERR_NOT_CREATED;
346 ////////////////////////////////////////////////////////////////////////////////
348 // If current position isn't empty, free resource used by that package
349 unSliceNum = m_pzsPkgList [m_unCurPos].usSliceNum;
350 ppzsSlices = m_pzsPkgList [m_unCurPos].ppzsSlices;
351 if ( unSliceNum != 0 && ppzsSlices != NULL ) {
353 for (unSliceIt = 0; unSliceIt < unSliceNum; unSliceIt ++) {
354 free ( ppzsSlices [unSliceIt] );
357 free (ppzsSlices);
359 m_pzsPkgList [m_unCurPos].usSliceNum = 0;
360 m_pzsPkgList [m_unCurPos].ppzsSlices = NULL;
364 ////////////////////////////////////////////////////////////////////////////////
366 // Calculate the number of slices
367 unSliceNum = (unLen + m_usMaxSliceDataLen - 1) / m_usMaxSliceDataLen;
370 // Allocate space to store the slice pointers
371 ppzsSlices = (Slice **) malloc ( sizeof (Slice *) * unSliceNum );
372 if (ppzsSlices == NULL) {
373 HLH_DEBUG ( HLH_DEBUG_LUDP, ("allocate slices failed") );
374 goto failed;
377 memset ( ppzsSlices, 0, sizeof (Slice *) * unSliceNum );
379 m_pzsPkgList [m_unCurPos].zhrTimestamp = m_zhrTimestampCur;
380 m_pzsPkgList [m_unCurPos].usSliceNum = unSliceNum;
381 m_pzsPkgList [m_unCurPos].ppzsSlices = ppzsSlices;
383 // Process all the slices
384 pucData = pucBuffer;
386 for (unSliceIt = 0; unSliceIt < unSliceNum; unSliceIt++) {
388 // Caculate length of slice data
389 unSliceLen = unLen < m_usMaxSliceDataLen ? unLen : m_usMaxSliceDataLen;
391 ////////////////////////////////////////////////////////////////////////////////
392 // Put slice into resend list
394 // Allocate one Slice
395 pzsSlice = (Slice *) malloc (sizeof (Slice) - 4 + unSliceLen);
396 if (pzsSlice == NULL) {
397 goto failed;
400 // Set info of slice
401 pzsSlice->usType = LUDP_SLICE_TYPE (m_ucBaseType, LUDP_SLICE_SUB_TYPE_DATA);
402 pzsSlice->usDataLen = unSliceLen;
403 pzsSlice->usSliceNum = unSliceNum;
404 pzsSlice->usSliceSN = unSliceIt;
405 pzsSlice->unTimestamp = m_zhrTimestampCur;
407 // Copy data to buffer
408 memcpy (pzsSlice->aucData, pucData, unSliceLen);
410 // Put it into list
411 ppzsSlices [unSliceIt] = pzsSlice;
414 ////////////////////////////////////////////////////////////////////////////////
415 // Put slice into send queue
417 // Allocate one Slice
418 pzsSlice = CloneSlice (pzsSlice);
419 if (pzsSlice == NULL) {
420 HLH_DEBUG ( HLH_DEBUG_LUDP, ("clone slice failed") );
421 goto failed;
424 // Put slice into queue
425 m_slSendQueue.push_back (pzsSlice);
427 bSend = true;
429 ////////////////////////////////////////////////////////////////////////////////
430 // Update pointers / counters
432 pucData += unSliceLen;
433 unLen -= unSliceLen;
437 // Increase timestamp (auto round up)
438 m_zhrTimestampCur ++;
440 // Move ring buffer pointer
441 m_unCurPos = RoundAdd (m_unPkgNum, m_unCurPos, 1);
444 // Notify send thread if need
445 if (bSend) {
446 //HLH_DEBUG ( HLH_DEBUG_LUDP, ("signal send thread to send") );
447 m_zhcSendRequestCond.Signal ();
450 // Unlock this instance
451 m_zhmMutex.Unlock ();
453 // Successed
454 return 0;
456 failed:
458 ////////////////////////////////////////////////////////////////////////////////
459 // If we can't allocate memory successfully, just abort this package sending
460 ////////////////////////////////////////////////////////////////////////////////
462 unSliceNum = m_pzsPkgList [m_unCurPos].usSliceNum;
463 ppzsSlices = m_pzsPkgList [m_unCurPos].ppzsSlices;
465 if (ppzsSlices != NULL) {
466 // Remove slice of current timestamp from list
467 for (unSliceIt = 0; unSliceIt < unSliceNum; unSliceIt++) {
468 free ( ppzsSlices [unSliceIt] );
471 free (ppzsSlices);
474 m_pzsPkgList [m_unCurPos].usSliceNum = 0;
475 m_pzsPkgList [m_unCurPos].ppzsSlices = NULL;
477 // Remove slice of current timestamp from queue
478 while ( ( pzsSlice = m_slSendQueue.back () ) != NULL
479 && pzsSlice->unTimestamp == (UINT32) m_zhrTimestampCur ) {
480 free (pzsSlice);
481 m_slSendQueue.pop_back ();
484 // Unlock this instance
485 m_zhmMutex.Unlock ();
487 return LUDP_SENDER_ERR_FAILED;
490 /******************************************************************************
491 * Func : LudpSender::__SendThreadFunc
492 * Desc : Send Thread Function:
493 * Send the slices in pszSendQueue
494 * Args : zhtThread send thread
495 * Outs : If success started, return (void*)0, otherwise return (void*)-1
496 ******************************************************************************/
497 void LudpSender::__SendThreadFunc (HLH_Thread &zhtThread)
500 int nRetVal;
502 Slice *pzsSlice;
503 UINT32 unPollType;
504 HLH_Time zhtTime;
506 // Notify zhtThread that SendThreadFunc () started
507 zhtThread.ThreadStarted ();
509 if ( !IsCreated() ) {
510 HLH_DEBUG ( HLH_DEBUG_LUDP, ("not created") );
511 return;
514 do {
516 ////////////////////////////////////////////////////////////////////////////////
517 // Lock this instance
518 m_zhmMutex.Lock ();
520 // Check if send request
522 // If no send request, wait for a while
523 if ( m_slSendQueue.empty () ) {
524 m_zhcSendRequestCond.TimeWait ( m_zhmMutex,
525 HLH_Time (LUDP_SENDER_TIMEOUT_SEND_COND_WAIT_SECONDS, 0) );
527 // If there is still no send,
528 if ( m_slSendQueue.empty () ) {
529 HLH_DEBUG ( HLH_DEBUG_LUDP, ("timeout and send queue still empty") );
530 // Unlock this instance
531 m_zhmMutex.Unlock ();
532 // Check for stop condition
533 goto check_stop;
537 // Unlock this instance
538 m_zhmMutex.Unlock ();
541 ////////////////////////////////////////////////////////////////////////////////
542 // There are some slices to send
544 // Whether socket is ready for send until timeout
545 unPollType = HLH_SOCK_POLL_WRITE;
546 zhtTime = HLH_Time (LUDP_SENDER_TIMEOUT_SOCK_WRITE_WAIT_SECONDS, 0);
547 nRetVal = m_pzhuSock->PollWait ( unPollType, zhtTime);
548 if ( nRetVal < 0 ) {
549 HLH_DEBUG ( HLH_DEBUG_LUDP, ("poll failed") );
550 // Socket hasn't ready for send yet
551 goto check_stop;
555 ////////////////////////////////////////////////////////////////////////////////
556 // Send the slice now
558 // Lock this instance
559 m_zhmMutex.Lock ();
561 // Maybe send queue changed
562 if ( m_slSendQueue.empty () ) {
563 // Unlock this instance
564 m_zhmMutex.Unlock ();
566 goto check_stop;
569 // Send the slice
570 pzsSlice = m_slSendQueue.front ();
571 ASSERT (pzsSlice != NULL);
573 #if 1
574 m_pzhuSock->Send ( pzsSlice, HLH_SLICE_LENGTH (pzsSlice) );
575 #else /* simulate for slice lost */
577 int nRand;
578 int nLimit = 1023 * 3/4;
580 nRand = rand () % 1023;
581 if (nRand < nLimit) {
582 m_pzhuSock->Send ( pzsSlice, HLH_SLICE_LENGTH (pzsSlice) );
585 #endif
587 // Release the slices sent
588 free (pzsSlice);
589 m_slSendQueue.pop_front ();
591 // Unlock this instance
592 m_zhmMutex.Unlock ();
594 check_stop:
595 // Check for stop condition
596 if ( zhtThread.IsStopping () ) {
597 HLH_DEBUG ( HLH_DEBUG_LUDP, ("stop on request") );
598 // Exit the thread
599 return;
602 } while (1);
607 /******************************************************************************
608 * Func : LudpSender::SendThreadFunc
609 * Desc : Send Thread Function:
610 * Send the slices in pszSendQueue
611 * Args : zhtThread send thread
612 * pvThis pointer to this instance
613 * Outs : Always return 0
614 ******************************************************************************/
615 void * LudpSender::SendThreadFunc (HLH_Thread &zhtThread, void *pvThis)
617 ( (LudpSender*) pvThis )->__SendThreadFunc (zhtThread);
619 return (void*) 0;
623 /******************************************************************************
624 * Func : ResendSlices
625 * Desc : Resend Package: Respond the slice which request to resend slices
626 * Args : pzsSlice the slice send by receiver to request some old slices
627 * Data Format:
628 * .0........1.||.2.......3.|.4.......5.| ...
629 * usSliceNum || usSliceSN | usSliceSN | ...
630 * Outs : NONE
631 ******************************************************************************/
632 void LudpSender::ResendSlices (Slice *pzsSlice)
635 HLH_RoundU32 zhrTimestamp;
636 HLH_RoundU32 zhrTimestampLow;
638 UINT16 usSliceNum;
639 UINT16 usSliceIt;
640 UINT16 usSliceSN;
642 UINT8 *pucData;
643 UINT16 *pusSliceSN;
645 UINT32 unPos;
647 SenderPackage *pzsPkg;
649 Slice **ppzsSlices;
650 Slice *pzsSliceSrc;
651 Slice *pzsSliceDst;
653 bool bSend = false;
655 // Get Some Info from package
656 pucData = pzsSlice->aucData;
658 // Prepare resend info
659 zhrTimestamp = pzsSlice->unTimestamp;
660 usSliceNum = * (UINT16*) &pucData[0];
661 pusSliceSN = (UINT16*) &pucData[2];
665 // Lock this instance
666 m_zhmMutex.Lock ();
668 // Check if \c zhrTimestamp is valid
669 zhrTimestampLow = (UINT32) m_zhrTimestampCur - m_unPkgNum;
670 if ( zhrTimestamp < zhrTimestampLow || zhrTimestamp >= m_zhrTimestampCur) {
671 HLH_DEBUG ( HLH_DEBUG_LUDP,
672 ( "resend %u out of range %u ~ %u",
673 (UINT32)zhrTimestamp, (UINT32)zhrTimestampLow,
674 (UINT32)m_zhrTimestampCur-1 ) );
675 goto failed;
678 // Get Package info from resend list
679 unPos = RoundSub ( m_unPkgNum, m_unCurPos, m_zhrTimestampCur - zhrTimestamp );
680 pzsPkg = & m_pzsPkgList [unPos];
682 // Package may be absend as a result of memory allocate error
683 if ( IS_NULL_SENDER_PACKAGE (pzsPkg) ) {
684 HLH_DEBUG ( HLH_DEBUG_LUDP, ("resend of null package") );
685 goto failed;
688 // Check for valid
689 ASSERT (zhrTimestamp == pzsPkg->zhrTimestamp);
693 // Resend the request slices
696 ppzsSlices = pzsPkg->ppzsSlices;
698 for (usSliceIt = 0; usSliceIt < usSliceNum; usSliceIt++) {
700 // Get SliceSN from slice
701 usSliceSN = *pusSliceSN++;
703 HLH_DEBUG ( HLH_DEBUG_LUDP,
704 ("resend slice %u of pkg %u", usSliceSN, (UINT32)zhrTimestamp) );
705 // Check for valid
706 ASSERT ( usSliceSN < pzsPkg->usSliceNum );
708 // Make a copy of that slice
709 pzsSliceSrc = ppzsSlices [usSliceSN];
710 pzsSliceDst = CloneSlice (pzsSliceSrc);
711 if (pzsSliceDst == NULL) {
712 HLH_DEBUG ( HLH_DEBUG_LUDP, ("clone slice failed") );
713 goto failed;
716 // Queue that slice
717 m_slSendQueue.push_back (pzsSliceDst);
718 bSend = true;
722 // Notify send thread if need
723 if (bSend) {
724 //HLH_DEBUG ( HLH_DEBUG_LUDP, ("signal send thread to send") );
725 m_zhcSendRequestCond.Signal ();
728 // Finished
729 // Falling through
731 failed:
732 m_zhmMutex.Unlock ();
733 return;
742 /******************************************************************************
743 * Func : LudpSender::OnReceived
744 * Desc : Callback function: receive data from socket
745 * Receive the slices sent by receiver and respond to them
746 * Args : pvThis pointer to this instance
747 * pvBuf buffer for the data received
748 * unLen length of data received
749 * zhsPeerAddr peer address of data received
750 * Outs :
751 ******************************************************************************/
752 void LudpSender::OnReceived (
753 void *pvThis,
754 void *pvBuf, UINT32 unLen,
755 HLH_SockAddr &zhsPeerAddr)
757 LudpSender *pzlsSender;
758 Slice *pzsSlice;
760 ASSERT ( pvThis != NULL );
761 ASSERT ( pvBuf != NULL );
762 ASSERT ( unLen != 0 );
764 (void)zhsPeerAddr;
766 // Pointer to this instance
767 pzlsSender = (LudpSender *) pvThis;
769 // Make it convenience to access slice member
770 pzsSlice = (Slice *) pvBuf;
773 // If resend requested
774 if ( LUDP_SLICE_BASE_TYPE(pzsSlice->usType) == pzlsSender->m_ucBaseType
775 && LUDP_SLICE_SUB_TYPE (pzsSlice->usType) == LUDP_SLICE_SUB_TYPE_RESEND) {
776 pzlsSender->ResendSlices (pzsSlice);
777 } else {
778 HLH_DEBUG ( HLH_DEBUG_LUDP, ("unknown slice type") );