48 #define QDBG_FUNC_START(x) TRACE(TICS, (">> %s\n", #x)) 49 #define QDBG_FUNC_END(x) TRACE(TICS, ("<< %s\n", #x)) 51 #define QDBG_FUNC_START(x) 52 #define QDBG_FUNC_END(x) 56 #define QUEUE_CHECK() QueueCheck() 62 #define QUEUE_PRINT() QueuePrint() 67 #define PDBG(x) TRACE(TICS, x) 73 #define MIN_THRESHOLD_SIZE 1048576 79 void SendSignalToTask(
void);
81 BOOLEAN IsDataAvailableForStreaming(
void);
84 static void* QueueAlloc(U32BIT size);
85 static void QueueFree(
void *block);
87 static void QueueCheck(
void);
90 static void QueuePrint(
void);
99 static U32BIT queueAlign;
100 static U32BIT queueAlignMask;
101 static U8BIT *queueMemPool;
102 static U32BIT queueMemPoolSize;
103 static U32BIT queueMinBlockSize;
104 static U32BIT queueLastOffset;
105 static U32BIT queueThresholdSize;
110 static void *queueMutex = NULL;
111 static BOOLEAN queueStreamingEnabled = FALSE;
112 static U32BIT queueStreamingRequestId = 0;
114 static U32BIT queueBufferedBytes;
116 static void (*insertCallback)(U32BIT downloadId,
U64BIT base,
117 U64BIT position, U32BIT len, BOOLEAN last) = 0;
118 static void (*releaseCallback)(U32BIT downloadId,
U64BIT base,
119 U64BIT position, U32BIT len, BOOLEAN last) = 0;
134 if (queueMutex == NULL)
136 TRACE(TERROR, (
"Streaming queue mutex failure"));
137 result = MHERR_SYSTEM_RESOURCE_MUTEX;
141 queueItemHead = NULL;
142 queueItemTail = NULL;
144 queueBufferedBytes = 0;
175 queueAlign =
sizeof(U32BIT) * 2;
176 queueAlignMask = ~(queueAlign - 1);
177 queueMemPool = buffer;
178 queueMemPoolSize = bufferSize & queueAlignMask;
179 queueMinBlockSize = 2 *
sizeof(U32BIT);
181 if (bufferSize < MIN_THRESHOLD_SIZE)
183 queueThresholdSize = bufferSize;
187 queueThresholdSize = MIN_THRESHOLD_SIZE + ((bufferSize-MIN_THRESHOLD_SIZE)/2);
190 if (queueMemPoolSize >= 2 * queueMinBlockSize)
195 size = queueMemPoolSize - queueMinBlockSize;
196 assert((size & 0x3) == 0);
201 *((U32BIT *)(queueMemPool + size -
sizeof(U32BIT))) = size;
208 *((U32BIT *)queueMemPool) = size;
215 *((U32BIT *)(queueMemPool + (size & ~0x3))) = queueMinBlockSize | 0x2;
220 DBG((
"QueueInit: Buffer too small"));
222 queueMemPoolSize = 0;
224 #ifdef ERROR_ON_SMALL_ICS_BUFFER 227 result = MHERR_BAD_ICS_BUFFER_SIZE;
249 item = QueueAlloc(
sizeof *item);
252 item->data = QueueAlloc(len);
253 if (item->data != NULL)
255 item->requestId = requestId;
258 item->referenceCount = 1;
286 if (queueItemHead == NULL)
288 assert(queueItemTail == NULL);
289 queueItemHead = item;
290 queueItemTail = item;
294 assert(queueItemTail != NULL);
295 queueItemTail->next = item;
296 queueItemTail = item;
299 queueBufferedBytes += item->len;
300 DBG((
"queueBufferedBytes = %d [1]", queueBufferedBytes));
308 DBG((
"%p (%p,%d) -> ", item, item->data, item->len));
313 DBG((
"tail = %p", queueItemTail));
319 if (insertCallback != NULL)
322 insertCallback(item->downloadId, item->base, item->position,
323 item->len, item->last);
344 if (queueStreamingEnabled)
346 item = queueItemHead;
349 if (item->requestId == queueStreamingRequestId)
352 ++item->referenceCount;
368 item = &queueInvalidItem;
386 BOOLEAN valid = TRUE;
388 if (item == &queueInvalidItem)
409 U32BIT requestId = 0;
413 BOOLEAN last = FALSE;
420 --item->referenceCount;
423 if (item == queueItemHead)
425 item->offset += processed;
432 requestId = item->requestId;
434 position = item->position;
441 QueueFree(item->data);
451 DBG((
"%p (%p,%d) -> ", item, item->data, item->len));
456 DBG((
"tail = %p", queueItemTail));
462 if (call && releaseCallback != NULL)
464 releaseCallback(requestId, base, position, len, last);
483 BOOLEAN last, resume;
490 requestId = item->requestId;
492 position = item->position;
497 --item->referenceCount;
500 if (item == queueItemHead)
503 queueItemHead = queueItemHead->next;
504 if (item == queueItemTail)
506 assert(queueItemHead == NULL);
507 queueItemTail = NULL;
510 queueBufferedBytes -= item->len;
511 DBG((
"queueBufferedBytes = %d [2]", queueBufferedBytes));
513 if (queueBufferedBytes < queueThresholdSize)
518 --item->referenceCount;
519 if (item->referenceCount == 0)
521 QueueFree(item->data);
527 PDBG((
"Qbytes=%u, item->referenceCount=%d, item->len=%u", queueBufferedBytes, item->referenceCount, item->len));
528 if (item->referenceCount <= 1)
530 assert(item->referenceCount == 0);
531 QueueFree(item->data);
542 DBG((
"%p (%p,%d) -> ", item, item->data, item->len));
547 DBG((
"tail = %p", queueItemTail));
553 if (releaseCallback != NULL)
556 releaseCallback(requestId, base, position, len, last);
576 return queueBufferedBytes;
591 PDBG((
"QueueEnableStreaming"));
592 enabled = queueStreamingEnabled;
593 queueStreamingEnabled = TRUE;
594 queueStreamingRequestId = requestId;
614 PDBG((
"QueueDisableStreaming"));
615 queueStreamingEnabled = FALSE;
616 queueStreamingRequestId = 0;
628 return queueStreamingEnabled;
643 queueBufferedBytes -= QueueReleaseRequestItems(&queueItemHead,
647 resume = (queueBufferedBytes < queueThresholdSize)? TRUE : FALSE;
655 DBG((
"%p (%p,%d) -> ", item, item->data, item->len));
660 DBG((
"tail = %p", queueItemTail));
688 pItem = &queueItemHead;
689 while (*pItem != NULL)
692 if (item->requestId != requestId || ULL_Compare(item->position,marker) > 0)
698 pos = item->position;
699 ULL_Add32(pos,item->len);
700 if (ULL_Compare(pos,marker) <= 0)
703 queueBufferedBytes -= item->len;
704 --item->referenceCount;
705 if (item->referenceCount == 0)
707 QueueFree(item->data);
714 ULL_Sub(pos,item->position);
715 item->offset = ULL_Get32(pos);
720 resume = (queueBufferedBytes < queueThresholdSize)? TRUE : FALSE;
723 item = queueItemHead;
726 queueItemTail = NULL;
730 while (item->next != NULL)
734 queueItemTail = item;
755 QueueReleaseAllItems(&queueItemHead, &queueItemTail);
756 queueBufferedBytes = 0;
758 DBG((
"queueBufferedBytes = 0 [4]"));
777 insertCallback = callback;
792 releaseCallback = callback;
806 queueMemPoolSize = 0;
820 static void* QueueAlloc(U32BIT size)
830 QDBG_ALLOC((
">> QueueAlloc(%d)\n", size));
835 requestSize = (size +
sizeof(U32BIT) + queueAlign - 1) & queueAlignMask;
836 QDBG_ALLOC((
"size = %d, requestSize = %d\n", size, requestSize));
838 QDBG_ALLOC((
"queueLastOffset = %d\n", queueLastOffset));
839 offset = queueLastOffset;
840 while (block == NULL && offset < queueMemPoolSize)
842 blockHeader = *((U32BIT *)(queueMemPool + offset));
843 blockSize = blockHeader & ~0x3;
844 if ((blockHeader & 0x2) || (blockSize < requestSize))
849 QDBG_ALLOC((
"Nothing at %d, trying next offset %d\n",
850 offset, offset + blockSize));
852 if (offset == queueMemPoolSize)
857 if (offset == queueLastOffset)
866 QDBG_ALLOC((
"Found a block - blockSize = %d\n", blockSize));
867 if (blockSize - requestSize >= queueMinBlockSize)
870 QDBG_ALLOC((
"Need to split the block\n"));
871 newHeader = requestSize;
873 newHeader |= blockHeader & 0x1;
877 *((U32BIT *)(queueMemPool + offset)) = newHeader;
880 newHeaderPtr = queueMemPool + offset + requestSize;
881 newHeader = blockSize - requestSize;
883 *((U32BIT *)(newHeaderPtr + newHeader -
sizeof(U32BIT))) = newHeader;
888 *((U32BIT *)newHeaderPtr) = newHeader;
901 QDBG_ALLOC((
"Use entire block\n"));
902 requestSize = blockSize;
903 newHeader = requestSize;
905 newHeader |= blockHeader & 0x1;
909 *((U32BIT *)(queueMemPool + offset)) = newHeader;
914 *((U32BIT *)(queueMemPool + offset + requestSize)) |= 0x1;
919 block = queueMemPool + offset +
sizeof(U32BIT);
922 queueLastOffset = offset + requestSize;
926 QDBG_ALLOC((
"<< QueueAlloc() -> %p\n", block));
936 static void QueueFree(
void *block)
938 U8BIT *blockHeaderPtr;
939 U8BIT *nextBlockHeaderPtr;
940 U8BIT *prevBlockHeaderPtr;
944 U32BIT nextBlockSize;
945 U32BIT prevBlockSize;
946 U32BIT prevBlockHeader;
948 QDBG_FREE((
">> QueueFree(%p)\n", block));
959 blockOffset = (U8BIT *)block -
sizeof(U32BIT) - queueMemPool;
960 assert(blockOffset < queueMemPoolSize);
962 blockHeaderPtr = queueMemPool + blockOffset;
963 blockHeader = *((U32BIT *)blockHeaderPtr);
964 blockSize = blockHeader & ~0x3;
967 assert(blockHeader & 0x2);
971 *((U32BIT *)blockHeaderPtr) = blockHeader;
974 *((U32BIT *)(blockHeaderPtr + blockSize -
sizeof(U32BIT))) = blockSize;
977 nextBlockHeaderPtr = blockHeaderPtr + blockSize;
978 *((U32BIT *)nextBlockHeaderPtr) &= ~0x1;
982 if ((*((U32BIT *)nextBlockHeaderPtr) & 0x2) == 0)
985 nextBlockSize = *((U32BIT *)nextBlockHeaderPtr) & ~0x3;
988 blockSize += nextBlockSize;
989 blockHeader = blockSize | (blockHeader & 0x3);
990 *((U32BIT *)blockHeaderPtr) = blockHeader;
993 *((U32BIT *)(blockHeaderPtr + blockSize -
sizeof(U32BIT))) = blockSize;
999 if (queueLastOffset == nextBlockHeaderPtr - queueMemPool)
1002 queueLastOffset = blockOffset;
1007 if ((blockHeader & 0x1) == 0)
1010 prevBlockSize = *((U32BIT *)(blockHeaderPtr -
sizeof(U32BIT)));
1011 prevBlockHeaderPtr = blockHeaderPtr - prevBlockSize;
1012 prevBlockHeader = *((U32BIT *)prevBlockHeaderPtr);
1015 prevBlockSize += blockSize;
1016 prevBlockHeader = prevBlockSize | (prevBlockHeader & 0x3);
1017 *((U32BIT *)prevBlockHeaderPtr) = prevBlockHeader;
1020 *((U32BIT *)(prevBlockHeaderPtr + prevBlockSize -
sizeof(U32BIT))) = prevBlockSize;
1022 if (queueLastOffset == blockOffset)
1025 queueLastOffset = prevBlockHeaderPtr - queueMemPool;
1030 QDBG_FREE((
"<< QueueFree()\n"));
1038 static void QueueCheck(
void)
1042 U32BIT prevUsed, currUsed, realPrevUsed;
1044 BOOLEAN seenLastOffset;
1050 seenLastOffset = FALSE;
1052 while (offset < queueMemPoolSize)
1054 if (queueLastOffset == offset)
1056 seenLastOffset = TRUE;
1058 size = *((U32BIT *)(queueMemPool + offset));
1059 prevUsed = size & 0x1;
1060 currUsed = (size >> 1) & 0x1;
1062 assert(offset + size <= queueMemPoolSize);
1063 assert(prevUsed == realPrevUsed);
1064 realPrevUsed = currUsed;
1065 lastWord = *((U32BIT *)(queueMemPool + offset + size -
sizeof(U32BIT)));
1066 assert(currUsed || lastWord == size);
1070 assert(seenLastOffset);
1081 static void QueuePrint(
void)
1085 U32BIT prevUsed, currUsed;
1088 DBG((
">> QueuePrint"));
1091 while (offset < queueMemPoolSize)
1093 size = *((U32BIT *)(queueMemPool + offset));
1094 prevUsed = size & 0x1;
1095 currUsed = (size >> 1) & 0x1;
1097 lastWord = *((U32BIT *)(queueMemPool + offset + size -
sizeof(U32BIT)));
1098 DBG((
"%p: offset=%d, size=%d, prevUsed=%d, currUsed=%d, lastWord=%d",
1099 queueMemPool + offset +
sizeof(U32BIT),
1100 offset, size, prevUsed, currUsed, lastWord));
1104 DBG((
"<< QueuePrint"));
1125 QDBG_FUNC_START(QueueReleaseRequestItems);
1130 while (*pItem != NULL)
1133 if (item->requestId == requestId)
1135 *pItem = item->next;
1137 released += item->len;
1140 --item->referenceCount;
1141 if (item->referenceCount == 0)
1143 QueueFree(item->data);
1149 pItem = &item->next;
1155 while (*tail != NULL && (*tail)->next != NULL)
1157 *tail = (*tail)->next;
1160 QDBG_FUNC_END(QueueReleaseRequestItems);
1175 QDBG_FUNC_START(QueueReleaseAllItems);
1178 while (item != NULL)
1180 nextItem = item->next;
1183 --item->referenceCount;
1184 if (item->referenceCount == 0)
1186 QueueFree(item->data);
1196 QDBG_FUNC_END(QueueReleaseAllItems);
Task functions for IC Streamer.
Common header internal to IC streamer.
void STB_OSDeleteMutex(void *mutex)
Delete a mutex.
void MHEG5QueueUpdateItem(MHEG5QueueItem *item, U32BIT processed)
Update the number of bytes processed in a queue item. The same item will be returned by the next call...
BOOLEAN MHEG5QueueIsValidItem(MHEG5QueueItem *item)
Tell whether the item returned by MHEG5QueueGetHeadItem is valid. If the item is invalid, then streaming is disabled.
void MHEG5QueueRegisterReleaseCallback(void(*callback)(U32BIT requestId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item release events. If a callback is already registered for the e...
U32BIT MHEG5QueueGetBufferedBytes(void)
Return number of buffered bytes in the queue (regardless of request)
void MHEG5QueueTerm(void)
Terminate queue manager.
void STB_OSMutexUnlock(void *mutex)
Unlock a mutex (a.k.a. 'leave', 'signal' or 'release')
IC Streamer queue manager.
This file defines the profile for the MHEG engine.
void * STB_OSCreateMutex(void)
Create a mutex.
void MHEG5QueueReleaseItems(U32BIT requestId, U64BIT marker)
Release queue items that are related to a given request up to a marker.
void MHEG5StreamerSendSignalToTask(void)
Send signal to streamer task to wake it up (if it's asleep)
MHEG5QueueItem * MHEG5QueueGetHeadItem(void)
Return the item at the head of the queue (the next item to consume). The function returns an "invalid...
void MHEG5QueueInsertItem(MHEG5QueueItem *item)
Insert a queue item into the queue.
void MHEG5QueueEnableStreaming(U32BIT requestId)
Allow streaming data from the queue.
void MHEG5QueueRegisterInsertCallback(void(*callback)(U32BIT downloadId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item insertion event. If a callback is already registered for the ...
BOOLEAN MHEG5QueueIsStreamingEnabled(void)
Tell whether streaming is enabled.
MHEG5QueueItem * MHEG5QueueAllocItem(U32BIT requestId, U32BIT len)
Allocate a new queue item and initialise with request ID and block length.
void STB_OSMutexLock(void *mutex)
Lock a mutex (a.k.a. 'enter', 'wait' or 'get').
E_MhegErr MHEG5QueueInit(U8BIT *buffer, U32BIT bufferSize)
Initialise queue manager.
void MHEG5QueueReleaseRequestItems(U32BIT requestId)
Release all queue items that are related to a given request.
void MHEG5QueueReleaseItem(MHEG5QueueItem *item)
Release an item from the queue. If the item is no longer in the queue (because the queue has been cle...
void MHEG5QueueReleaseAllItems(void)
Release all queue items - clear the queue completely.
void MHEG5ResumeDownload(void)
Resume download of the active request. Download may or may not have been paused due to the buffer bei...
void MHEG5QueueDisableStreaming(void)
Do not allow streaming data from the queue.
IC Streamer download manager.
Header file - Function prototypes for operating system.