Advertisement
Guest User

Asynchronous resource loading - source file

a guest
Nov 20th, 2012
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.20 KB | None | 0 0
  1. /*
  2. --------------------------------------------------------------
  3.     AFilePackage
  4. --------------------------------------------------------------
  5. */
  6. AFilePackage::Stream::Stream()
  7. {
  8.     ZERO_OUT(*this);
  9. }
  10.  
  11. /*
  12. --------------------------------------------------------------
  13.     FileSystem
  14. --------------------------------------------------------------
  15. */
  16. FileSystem::FileSystem()
  17. {
  18.  
  19. }
  20.  
  21. FileSystem::~FileSystem()
  22. {
  23.  
  24. }
  25.  
  26. bool FileSystem::MountPackage( AFilePackage* package )
  27. {
  28.     chkRET_FALSE_IF_NIL(package);
  29.     chkRET_FALSE_IF_NOT(!package->FindSelfInList( m_packages ));
  30.     package->PrependSelfToList( &m_packages );
  31.     return true;
  32. }
  33.  
  34. void FileSystem::UnmountPackage( AFilePackage* package )
  35. {
  36.     chkRET_IF_NIL(package);
  37.     package->RemoveSelfFromList( &m_packages );
  38. }
  39.  
  40. AFilePackage* FileSystem::OpenFile( const AssetGuid& fileId, AFilePackage::Stream *stream )
  41. {
  42.     chkRET_FALSE_IF_NIL(stream);
  43.  
  44.     AFilePackage* current = m_packages;
  45.  
  46.     while(PtrToBool( current ))
  47.     {
  48.         if( current->OpenFile( fileId, stream ) )
  49.         {
  50.             return current;
  51.         }
  52.         current = current->next;
  53.     }
  54.  
  55.     return nil;
  56. }
  57.  
  58. static void DefaultAsyncLoad( const SLoadContext& context )
  59. {
  60.     mxUNUSED(context);
  61.     //nothing
  62. }
  63. static void DefaultFinalize( const SFinalizeContext& context )
  64. {
  65.     mxUNUSED(context);
  66.     //nothing
  67. }
  68.  
  69. SLoadRequest::SLoadRequest()
  70. {
  71.     assetId = AssetGuid_GetNull();
  72.     buffer = nil;
  73.     bytesToRead = 0;
  74.  
  75.     priority = Load_Priority_Normal;
  76.  
  77.     userData = nil;
  78.     asyncLoad = &DefaultAsyncLoad;
  79.     finalize = &DefaultFinalize;
  80. }
  81.  
  82. bool SLoadRequest_CheckValid( const SLoadRequest& o )
  83. {
  84.     chkRET_FALSE_IF_NOT(AssetGuid_IsValid(o.assetId));
  85.     chkRET_FALSE_IF_NIL(o.buffer);
  86.     //chkRET_FALSE_IF_NOT(o.bytesToRead > 0);
  87.     enum { MAX_READ_LIMIT = 8*mxMEGABYTE };
  88.     chkRET_FALSE_IF_NOT(o.bytesToRead <= MAX_READ_LIMIT);
  89.     return true;
  90. }
  91.  
  92. /*
  93. --------------------------------------------------------------
  94.     AsyncReader
  95. --------------------------------------------------------------
  96. */
  97. // size of streaming buffer must be a multiple of disk sector size
  98. enum { DISK_SECTOR_SIZE = 2048 };
  99.  
  100. static const UINT STREAMING_BUFFER_SIZE = 4 * mxMEGABYTE;   // 4 MiB
  101.  
  102. enum { INITIAL_QUEUE_SIZE = 64 };
  103.  
  104. AsyncReader::AsyncReader()
  105. {
  106.     m_isRunning = false;
  107.     m_lastRequestId = 0;
  108. }
  109.  
  110. AsyncReader::~AsyncReader()
  111. {
  112.     Assert(!m_isRunning);
  113.     Assert(m_pendingRequests.IsEmpty());
  114.     Assert(m_completedRequests.IsEmpty());
  115. }
  116.  
  117. void AsyncReader::Initialize()
  118. {
  119.     Assert(mxTHIS_IS_MAIN_THREAD);
  120.     Assert(!m_isRunning);
  121.  
  122.     m_pendingRequests.Reserve(INITIAL_QUEUE_SIZE);
  123.     m_completedRequests.Reserve(INITIAL_QUEUE_SIZE);
  124.  
  125.     m_pendingRequestsCS.Initialize();
  126.     m_completedRequestsCS.Initialize();
  127.  
  128.     m_isRunning = true;
  129.     m_lastRequestId = 0;
  130.  
  131.     Thread::CInfo   threadInfo;
  132.     {
  133.         threadInfo.entryPoint = &StaticResourceLoaderThread;
  134.         threadInfo.userPointer = this;
  135.         threadInfo.priority = ThreadPriority_Low;
  136. #if MX_DEBUG
  137.         threadInfo.debugName = "$BackgroundLoader";
  138. #endif // MX_DEBUG
  139.     }
  140.     m_IOThread.Initialize( threadInfo );
  141.  
  142.     m_wakeupEvent.Initialize( EventFlag::CreateEvent_AutoReset );
  143. }
  144.  
  145. void AsyncReader::Shutdown()
  146. {
  147.     Assert(mxTHIS_IS_MAIN_THREAD);
  148.     Assert(m_isRunning);
  149.  
  150.     m_isRunning = false;
  151.  
  152.     this->CancelAll();
  153.     m_wakeupEvent.Signal();
  154.     m_IOThread.Join();
  155.  
  156.     Assert(m_pendingRequests.IsEmpty());
  157.     Assert(m_completedRequests.IsEmpty());
  158.  
  159.     m_IOThread.Shutdown();
  160.  
  161.     m_pendingRequestsCS.Shutdown();
  162.     m_completedRequestsCS.Shutdown();
  163.  
  164.     m_wakeupEvent.Shutdown();
  165.     m_workDoneEvent.Shutdown();
  166. }
  167.  
  168. bool AsyncReader::AddLoadRequest( const SLoadRequest& request, LoadRequestId *id )
  169. {
  170.     Assert(mxTHIS_IS_MAIN_THREAD);
  171.  
  172.     chkRET_FALSE_IF_NOT(SLoadRequest_CheckValid(request));
  173.  
  174.     AFilePackage::Stream fileStream;
  175.     AFilePackage* filePackage = gCore.files->OpenFile( request.assetId, &fileStream );
  176.     if( !filePackage )
  177.     {
  178.         return false;
  179.     }
  180.  
  181.     const UINT bytesToRead = (request.bytesToRead != nil) ? request.bytesToRead : fileStream.uncompressedSize;
  182.  
  183.     // Add the request to the read queue
  184.     {
  185.         SpinWait::Lock  lockJobList( m_pendingRequestsCS );
  186.  
  187.         mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
  188.  
  189.         SPendingRequest & newQueuedRequest = m_pendingRequests.Add();
  190.         {
  191.             newQueuedRequest.package = filePackage;
  192.             newQueuedRequest.stream = fileStream;
  193.             newQueuedRequest.priority = request.priority;
  194.  
  195.             newQueuedRequest.buffer = request.buffer;
  196.             newQueuedRequest.bytesToRead = bytesToRead;
  197.  
  198.             newQueuedRequest.userData = request.userData;
  199.             newQueuedRequest.asyncLoad = request.asyncLoad;
  200.             newQueuedRequest.finalize = request.finalize;
  201.  
  202.             newQueuedRequest.uid = m_lastRequestId++;
  203.             if( id != nil ) {
  204.                 *id = newQueuedRequest.uid;
  205.             }
  206.         }
  207.  
  208.         // sort load requests by file offsets
  209.         this->SortQueuedRequests_NoLock();
  210.  
  211.         mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
  212.     }
  213.  
  214.     // Signal that we have something to read
  215.     m_wakeupEvent.Signal();
  216.  
  217.     return true;
  218. }
  219.  
  220. bool AsyncReader::Cancel( LoadRequestId requestId )
  221. {
  222.     Assert(mxTHIS_IS_MAIN_THREAD);
  223.  
  224.     // If the request hasn't been yet processed, then it must be in the list of pending requests.
  225.     // Remove it from the list.
  226.     {
  227.         SpinWait::Lock  scopedLock( m_pendingRequestsCS );
  228.  
  229.         SPendingRequest* pendingRequest = this->FindPendingRequest_NoLock( requestId );
  230.         if( pendingRequest != nil )
  231.         {
  232.             m_pendingRequests.RemoveContainedItem( pendingRequest );
  233.             return true;
  234.         }
  235.     }
  236.  
  237.     // Try to remove it from the list of requests awaiting finalization.
  238.     {
  239.         SpinWait::Lock  scopedLock( m_completedRequestsCS );
  240.  
  241.         SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
  242.         if( completedRequest != nil )
  243.         {
  244.             m_completedRequests.RemoveContainedItem( completedRequest );
  245.             return true;
  246.         }
  247.     }
  248.  
  249.     // The given request couldn't be found.
  250.     return true;
  251. }
  252.  
  253. // Block the current thread until the load request with the specified ID completes.
  254. //
  255. bool AsyncReader::WaitFor( LoadRequestId requestId, UINT timeOutMilliseconds )
  256. {
  257.     Assert(mxTHIS_IS_MAIN_THREAD);
  258.  
  259.     // wait until the item is present in the m_completedRequests list or a timeout reached
  260.     const UINT startTimeMSec = mxGetTimeInMilliseconds();
  261.     while(true)
  262.     {
  263.         {
  264.             SpinWait::Lock  scopedLock( m_completedRequestsCS );
  265.  
  266.             SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
  267.             if( completedRequest != nil )
  268.             {
  269.                 completedRequest->Finalize();
  270.                 m_completedRequests.RemoveContainedItem( completedRequest );
  271.                 return true;
  272.             }
  273.         }
  274.  
  275.         //CurrentThread::Yield();
  276.         mxSleepMilliseconds(1);
  277.  
  278.         const UINT currentTimeMSec = mxGetTimeInMilliseconds();
  279.         if( currentTimeMSec - startTimeMSec >= timeOutMilliseconds ) {
  280.             return false;
  281.         }
  282.     }
  283.  
  284.     return false;
  285. }
  286.  
  287. void AsyncReader::WaitForAll( UINT milliseconds )
  288. {
  289.     Assert(mxTHIS_IS_MAIN_THREAD);
  290.  
  291.     m_wakeupEvent.Signal();
  292.  
  293.     m_workDoneEvent.WaitTimeOut( milliseconds );
  294.  
  295.     Assert(m_pendingRequests.IsEmpty());
  296.  
  297.     this->FinalizeCompletedRequests();
  298. }
  299.  
  300. // One may not want to process all the completed requests at once
  301. // because this may cause a noticeable temporary slow down in the game's performance.
  302. // So, this API could be called once per frame
  303. // so that only a small number work items would be set to the device on each frame,
  304. // thereby spreading the work load of binding resources to the device over several frames.
  305. //
  306. void AsyncReader::FinalizeCompletedRequests( UINT maxNumRequests )
  307. {
  308.     Assert(mxTHIS_IS_MAIN_THREAD);
  309.  
  310.     SpinWait::Lock  scopedLock( m_completedRequestsCS );
  311.  
  312.     const UINT numCompletedRequests = m_completedRequests.Num();
  313.     const UINT numRequestsToProcess = (maxNumRequests > 0) ? smallest(maxNumRequests, numCompletedRequests) : numCompletedRequests;
  314.  
  315.     for( UINT iCompletedRequest = 0; iCompletedRequest < numRequestsToProcess; iCompletedRequest++ )
  316.     {
  317.         SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
  318.         completedRequest.Finalize();
  319.     }
  320.  
  321.     m_completedRequests.RemoveAt( 0, numRequestsToProcess );
  322. }
  323.  
  324. void AsyncReader::CancelAll()
  325. {
  326.     Assert(mxTHIS_IS_MAIN_THREAD);
  327.     mxUNDONE;
  328.     {
  329.         SpinWait::Lock  scopedLock( m_pendingRequestsCS );
  330.         m_pendingRequests.Empty();
  331.     }
  332.     {
  333.         SpinWait::Lock  scopedLock( m_completedRequestsCS );
  334.         m_completedRequests.Empty();
  335.     }
  336. }
  337.  
  338. AsyncReader::SPendingRequest* AsyncReader::FindPendingRequest_NoLock( LoadRequestId requestId )
  339. {
  340.     for( UINT iPendingRequest = 0; iPendingRequest < m_pendingRequests.Num(); iPendingRequest++ )
  341.     {
  342.         SPendingRequest& pendingRequest = m_pendingRequests[ iPendingRequest ];
  343.         if( pendingRequest.uid == requestId ) {
  344.             return &pendingRequest;
  345.         }
  346.     }
  347.     return nil;
  348. }
  349.  
  350. AsyncReader::SCompletedRequest* AsyncReader::FindCompletedRequest_NoLock( LoadRequestId requestId )
  351. {
  352.     for( UINT iCompletedRequest = 0; iCompletedRequest < m_completedRequests.Num(); iCompletedRequest++ )
  353.     {
  354.         SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
  355.         if( completedRequest.uid == requestId ) {
  356.             return &completedRequest;
  357.         }
  358.     }
  359.     return nil;
  360. }
  361.  
  362. void AsyncReader::SortQueuedRequests_NoLock()
  363. {
  364.     // sort by priority and physical location (file offsets)
  365.     if( m_pendingRequests.Num() > 1 )
  366.     {
  367.         struct CompareLoadRequests
  368.         {
  369.             static bool Compare( const SPendingRequest& a, const SPendingRequest& b )
  370.             {
  371.                 if( a.priority < b.priority ) {
  372.                     return false;
  373.                 }
  374.                 if( a.stream.sortKey < b.stream.sortKey ) {
  375.                     return false;
  376.                 }
  377.                 return true;
  378.             }
  379.         };
  380.  
  381.         InsertionSort< CompareLoadRequests >( m_pendingRequests.ToPtr(), 0, m_pendingRequests.Num()-1 );
  382.     }
  383. }
  384.  
  385. UINT32 PASCAL AsyncReader::StaticResourceLoaderThread( void* userData )
  386. {
  387.     AsyncReader* loader = (AsyncReader*) userData;
  388.     return loader->ResourceLoaderThread();
  389. }
  390.  
  391. UINT32 AsyncReader::ResourceLoaderThread()
  392. {
  393.     while( m_isRunning )
  394.     {
  395.         m_wakeupEvent.Wait();
  396.  
  397.         if( !m_isRunning ) {
  398.             break;
  399.         }
  400.  
  401.         if( m_pendingRequests.NonEmpty() )
  402.         {
  403.             SPendingRequest request;    //<= must be copied via value
  404.             ZERO_OUT(request);
  405.  
  406.             // Pop a request off of the queue
  407.             {
  408.                 SpinWait::Lock  lockJobList( m_pendingRequestsCS );
  409.                 request = m_pendingRequests.GetFirst();
  410.                 m_pendingRequests.RemoveAt( 0 );
  411.             }
  412.  
  413.             // Handle a read request
  414.             const UINT bytesRead = request.package->ReadFile( &request.stream, 0, request.buffer, request.bytesToRead );
  415.             Assert(bytesRead == request.bytesToRead);
  416.  
  417.             // Invoke user callback.
  418.             {
  419.                 SLoadContext    args;
  420.                 {
  421.                     args.buffer     = request.buffer;
  422.                     args.size       = bytesRead;
  423.                     args.userData   = request.userData;
  424.                     args.requestId  = request.uid;
  425.                     //args.assetId = assetId;
  426.                 }
  427.                 (*request.asyncLoad)( args );
  428.             }
  429.  
  430.             // Add it to the list of completed requests.
  431.             {
  432.                 SpinWait::Lock  scopedLock( m_completedRequestsCS );
  433.  
  434.                 SCompletedRequest & newCompletedRequest = m_completedRequests.Add();
  435.                 {
  436.                     newCompletedRequest.buffer = request.buffer;
  437.                     newCompletedRequest.bytesRead = bytesRead;
  438.  
  439.                     // copy data for invoking callbacks
  440.                     newCompletedRequest.userData = request.userData;
  441.                     newCompletedRequest.finalize = request.finalize;
  442.  
  443.                     newCompletedRequest.uid = request.uid;
  444.                 }
  445.             }
  446.         }
  447.         else
  448.         {
  449.             m_workDoneEvent.Signal();
  450.         }
  451.     }
  452.     mxDBGMSG("Resource loader thread is exiting...\n");
  453.     return 0;
  454. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement