Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Asynchronous resource loading - source file

By: a guest on Nov 20th, 2012  |  syntax: C++  |  size: 11.20 KB  |  views: 26  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. /*
  2. --------------------------------------------------------------
  3.         AFilePackage
  4. --------------------------------------------------------------
  5. */
  6. AFilePackage::Stream::Stream()
  7. {
  8.         ZERO_OUT(*this);
  9. }
  10.  
  11. /*
  12. --------------------------------------------------------------
  13.         FileSystem
  14. --------------------------------------------------------------
  15. */
  16. FileSystem::FileSystem()
  17. {
  18.  
  19. }
  20.  
  21. FileSystem::~FileSystem()
  22. {
  23.  
  24. }
  25.  
  26. bool FileSystem::MountPackage( AFilePackage* package )
  27. {
  28.         chkRET_FALSE_IF_NIL(package);
  29.         chkRET_FALSE_IF_NOT(!package->FindSelfInList( m_packages ));
  30.         package->PrependSelfToList( &m_packages );
  31.         return true;
  32. }
  33.  
  34. void FileSystem::UnmountPackage( AFilePackage* package )
  35. {
  36.         chkRET_IF_NIL(package);
  37.         package->RemoveSelfFromList( &m_packages );
  38. }
  39.  
  40. AFilePackage* FileSystem::OpenFile( const AssetGuid& fileId, AFilePackage::Stream *stream )
  41. {
  42.         chkRET_FALSE_IF_NIL(stream);
  43.  
  44.         AFilePackage* current = m_packages;
  45.  
  46.         while(PtrToBool( current ))
  47.         {
  48.                 if( current->OpenFile( fileId, stream ) )
  49.                 {
  50.                         return current;
  51.                 }
  52.                 current = current->next;
  53.         }
  54.  
  55.         return nil;
  56. }
  57.  
  58. static void DefaultAsyncLoad( const SLoadContext& context )
  59. {
  60.         mxUNUSED(context);
  61.         //nothing
  62. }
  63. static void DefaultFinalize( const SFinalizeContext& context )
  64. {
  65.         mxUNUSED(context);
  66.         //nothing
  67. }
  68.  
  69. SLoadRequest::SLoadRequest()
  70. {
  71.         assetId = AssetGuid_GetNull();
  72.         buffer = nil;
  73.         bytesToRead = 0;
  74.  
  75.         priority = Load_Priority_Normal;
  76.  
  77.         userData = nil;
  78.         asyncLoad = &DefaultAsyncLoad;
  79.         finalize = &DefaultFinalize;
  80. }
  81.  
  82. bool SLoadRequest_CheckValid( const SLoadRequest& o )
  83. {
  84.         chkRET_FALSE_IF_NOT(AssetGuid_IsValid(o.assetId));
  85.         chkRET_FALSE_IF_NIL(o.buffer);
  86.         //chkRET_FALSE_IF_NOT(o.bytesToRead > 0);
  87.         enum { MAX_READ_LIMIT = 8*mxMEGABYTE };
  88.         chkRET_FALSE_IF_NOT(o.bytesToRead <= MAX_READ_LIMIT);
  89.         return true;
  90. }
  91.  
  92. /*
  93. --------------------------------------------------------------
  94.         AsyncReader
  95. --------------------------------------------------------------
  96. */
  97. // size of streaming buffer must be a multiple of disk sector size
  98. enum { DISK_SECTOR_SIZE = 2048 };
  99.  
  100. static const UINT STREAMING_BUFFER_SIZE = 4 * mxMEGABYTE;       // 4 MiB
  101.  
  102. enum { INITIAL_QUEUE_SIZE = 64 };
  103.  
  104. AsyncReader::AsyncReader()
  105. {
  106.         m_isRunning = false;
  107.         m_lastRequestId = 0;
  108. }
  109.  
  110. AsyncReader::~AsyncReader()
  111. {
  112.         Assert(!m_isRunning);
  113.         Assert(m_pendingRequests.IsEmpty());
  114.         Assert(m_completedRequests.IsEmpty());
  115. }
  116.  
  117. void AsyncReader::Initialize()
  118. {
  119.         Assert(mxTHIS_IS_MAIN_THREAD);
  120.         Assert(!m_isRunning);
  121.  
  122.         m_pendingRequests.Reserve(INITIAL_QUEUE_SIZE);
  123.         m_completedRequests.Reserve(INITIAL_QUEUE_SIZE);
  124.  
  125.         m_pendingRequestsCS.Initialize();
  126.         m_completedRequestsCS.Initialize();
  127.  
  128.         m_isRunning = true;
  129.         m_lastRequestId = 0;
  130.  
  131.         Thread::CInfo   threadInfo;
  132.         {
  133.                 threadInfo.entryPoint = &StaticResourceLoaderThread;
  134.                 threadInfo.userPointer = this;
  135.                 threadInfo.priority = ThreadPriority_Low;
  136. #if MX_DEBUG
  137.                 threadInfo.debugName = "$BackgroundLoader";
  138. #endif // MX_DEBUG
  139.         }
  140.         m_IOThread.Initialize( threadInfo );
  141.  
  142.         m_wakeupEvent.Initialize( EventFlag::CreateEvent_AutoReset );
  143. }
  144.  
  145. void AsyncReader::Shutdown()
  146. {
  147.         Assert(mxTHIS_IS_MAIN_THREAD);
  148.         Assert(m_isRunning);
  149.  
  150.         m_isRunning = false;
  151.  
  152.         this->CancelAll();
  153.         m_wakeupEvent.Signal();
  154.         m_IOThread.Join();
  155.  
  156.         Assert(m_pendingRequests.IsEmpty());
  157.         Assert(m_completedRequests.IsEmpty());
  158.  
  159.         m_IOThread.Shutdown();
  160.  
  161.         m_pendingRequestsCS.Shutdown();
  162.         m_completedRequestsCS.Shutdown();
  163.  
  164.         m_wakeupEvent.Shutdown();
  165.         m_workDoneEvent.Shutdown();
  166. }
  167.  
  168. bool AsyncReader::AddLoadRequest( const SLoadRequest& request, LoadRequestId *id )
  169. {
  170.         Assert(mxTHIS_IS_MAIN_THREAD);
  171.  
  172.         chkRET_FALSE_IF_NOT(SLoadRequest_CheckValid(request));
  173.  
  174.         AFilePackage::Stream fileStream;
  175.         AFilePackage* filePackage = gCore.files->OpenFile( request.assetId, &fileStream );
  176.         if( !filePackage )
  177.         {
  178.                 return false;
  179.         }
  180.  
  181.         const UINT bytesToRead = (request.bytesToRead != nil) ? request.bytesToRead : fileStream.uncompressedSize;
  182.  
  183.         // Add the request to the read queue
  184.         {
  185.                 SpinWait::Lock  lockJobList( m_pendingRequestsCS );
  186.  
  187.                 mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
  188.  
  189.                 SPendingRequest & newQueuedRequest = m_pendingRequests.Add();
  190.                 {
  191.                         newQueuedRequest.package = filePackage;
  192.                         newQueuedRequest.stream = fileStream;
  193.                         newQueuedRequest.priority = request.priority;
  194.  
  195.                         newQueuedRequest.buffer = request.buffer;
  196.                         newQueuedRequest.bytesToRead = bytesToRead;
  197.  
  198.                         newQueuedRequest.userData = request.userData;
  199.                         newQueuedRequest.asyncLoad = request.asyncLoad;
  200.                         newQueuedRequest.finalize = request.finalize;
  201.  
  202.                         newQueuedRequest.uid = m_lastRequestId++;
  203.                         if( id != nil ) {
  204.                                 *id = newQueuedRequest.uid;
  205.                         }
  206.                 }
  207.  
  208.                 // sort load requests by file offsets
  209.                 this->SortQueuedRequests_NoLock();
  210.  
  211.                 mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
  212.         }
  213.  
  214.         // Signal that we have something to read
  215.         m_wakeupEvent.Signal();
  216.  
  217.         return true;
  218. }
  219.  
  220. bool AsyncReader::Cancel( LoadRequestId requestId )
  221. {
  222.         Assert(mxTHIS_IS_MAIN_THREAD);
  223.  
  224.         // If the request hasn't been yet processed, then it must be in the list of pending requests.
  225.         // Remove it from the list.
  226.         {
  227.                 SpinWait::Lock  scopedLock( m_pendingRequestsCS );
  228.  
  229.                 SPendingRequest* pendingRequest = this->FindPendingRequest_NoLock( requestId );
  230.                 if( pendingRequest != nil )
  231.                 {
  232.                         m_pendingRequests.RemoveContainedItem( pendingRequest );
  233.                         return true;
  234.                 }
  235.         }
  236.  
  237.         // Try to remove it from the list of requests awaiting finalization.
  238.         {
  239.                 SpinWait::Lock  scopedLock( m_completedRequestsCS );
  240.  
  241.                 SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
  242.                 if( completedRequest != nil )
  243.                 {
  244.                         m_completedRequests.RemoveContainedItem( completedRequest );
  245.                         return true;
  246.                 }
  247.         }
  248.  
  249.         // The given request couldn't be found.
  250.         return true;
  251. }
  252.  
  253. // Block the current thread until the load request with the specified ID completes.
  254. //
  255. bool AsyncReader::WaitFor( LoadRequestId requestId, UINT timeOutMilliseconds )
  256. {
  257.         Assert(mxTHIS_IS_MAIN_THREAD);
  258.  
  259.         // wait until the item is present in the m_completedRequests list or a timeout reached
  260.         const UINT startTimeMSec = mxGetTimeInMilliseconds();
  261.         while(true)
  262.         {
  263.                 {
  264.                         SpinWait::Lock  scopedLock( m_completedRequestsCS );
  265.  
  266.                         SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
  267.                         if( completedRequest != nil )
  268.                         {
  269.                                 completedRequest->Finalize();
  270.                                 m_completedRequests.RemoveContainedItem( completedRequest );
  271.                                 return true;
  272.                         }
  273.                 }
  274.  
  275.                 //CurrentThread::Yield();
  276.                 mxSleepMilliseconds(1);
  277.  
  278.                 const UINT currentTimeMSec = mxGetTimeInMilliseconds();
  279.                 if( currentTimeMSec - startTimeMSec >= timeOutMilliseconds ) {
  280.                         return false;
  281.                 }
  282.         }
  283.  
  284.         return false;
  285. }
  286.  
  287. void AsyncReader::WaitForAll( UINT milliseconds )
  288. {
  289.         Assert(mxTHIS_IS_MAIN_THREAD);
  290.  
  291.         m_wakeupEvent.Signal();
  292.  
  293.         m_workDoneEvent.WaitTimeOut( milliseconds );
  294.  
  295.         Assert(m_pendingRequests.IsEmpty());
  296.  
  297.         this->FinalizeCompletedRequests();
  298. }
  299.  
  300. // One may not want to process all the completed requests at once
  301. // because this may cause a noticeable temporary slow down in the game's performance.
  302. // So, this API could be called once per frame
  303. // so that only a small number work items would be set to the device on each frame,
  304. // thereby spreading the work load of binding resources to the device over several frames.
  305. //
  306. void AsyncReader::FinalizeCompletedRequests( UINT maxNumRequests )
  307. {
  308.         Assert(mxTHIS_IS_MAIN_THREAD);
  309.  
  310.         SpinWait::Lock  scopedLock( m_completedRequestsCS );
  311.  
  312.         const UINT numCompletedRequests = m_completedRequests.Num();
  313.         const UINT numRequestsToProcess = (maxNumRequests > 0) ? smallest(maxNumRequests, numCompletedRequests) : numCompletedRequests;
  314.  
  315.         for( UINT iCompletedRequest = 0; iCompletedRequest < numRequestsToProcess; iCompletedRequest++ )
  316.         {
  317.                 SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
  318.                 completedRequest.Finalize();
  319.         }
  320.  
  321.         m_completedRequests.RemoveAt( 0, numRequestsToProcess );
  322. }
  323.  
  324. void AsyncReader::CancelAll()
  325. {
  326.         Assert(mxTHIS_IS_MAIN_THREAD);
  327.         mxUNDONE;
  328.         {
  329.                 SpinWait::Lock  scopedLock( m_pendingRequestsCS );
  330.                 m_pendingRequests.Empty();
  331.         }
  332.         {
  333.                 SpinWait::Lock  scopedLock( m_completedRequestsCS );
  334.                 m_completedRequests.Empty();
  335.         }
  336. }
  337.  
  338. AsyncReader::SPendingRequest* AsyncReader::FindPendingRequest_NoLock( LoadRequestId requestId )
  339. {
  340.         for( UINT iPendingRequest = 0; iPendingRequest < m_pendingRequests.Num(); iPendingRequest++ )
  341.         {
  342.                 SPendingRequest& pendingRequest = m_pendingRequests[ iPendingRequest ];
  343.                 if( pendingRequest.uid == requestId ) {
  344.                         return &pendingRequest;
  345.                 }
  346.         }
  347.         return nil;
  348. }
  349.  
  350. AsyncReader::SCompletedRequest* AsyncReader::FindCompletedRequest_NoLock( LoadRequestId requestId )
  351. {
  352.         for( UINT iCompletedRequest = 0; iCompletedRequest < m_completedRequests.Num(); iCompletedRequest++ )
  353.         {
  354.                 SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
  355.                 if( completedRequest.uid == requestId ) {
  356.                         return &completedRequest;
  357.                 }
  358.         }
  359.         return nil;
  360. }
  361.  
  362. void AsyncReader::SortQueuedRequests_NoLock()
  363. {
  364.         // sort by priority and physical location (file offsets)
  365.         if( m_pendingRequests.Num() > 1 )
  366.         {
  367.                 struct CompareLoadRequests
  368.                 {
  369.                         static bool Compare( const SPendingRequest& a, const SPendingRequest& b )
  370.                         {
  371.                                 if( a.priority < b.priority ) {
  372.                                         return false;
  373.                                 }
  374.                                 if( a.stream.sortKey < b.stream.sortKey ) {
  375.                                         return false;
  376.                                 }
  377.                                 return true;
  378.                         }
  379.                 };
  380.  
  381.                 InsertionSort< CompareLoadRequests >( m_pendingRequests.ToPtr(), 0, m_pendingRequests.Num()-1 );
  382.         }
  383. }
  384.  
  385. UINT32 PASCAL AsyncReader::StaticResourceLoaderThread( void* userData )
  386. {
  387.         AsyncReader* loader = (AsyncReader*) userData;
  388.         return loader->ResourceLoaderThread();
  389. }
  390.  
  391. UINT32 AsyncReader::ResourceLoaderThread()
  392. {
  393.         while( m_isRunning )
  394.         {
  395.                 m_wakeupEvent.Wait();
  396.  
  397.                 if( !m_isRunning ) {
  398.                         break;
  399.                 }
  400.  
  401.                 if( m_pendingRequests.NonEmpty() )
  402.                 {
  403.                         SPendingRequest request;        //<= must be copied via value
  404.                         ZERO_OUT(request);
  405.  
  406.                         // Pop a request off of the queue
  407.                         {
  408.                                 SpinWait::Lock  lockJobList( m_pendingRequestsCS );
  409.                                 request = m_pendingRequests.GetFirst();
  410.                                 m_pendingRequests.RemoveAt( 0 );
  411.                         }
  412.  
  413.                         // Handle a read request
  414.                         const UINT bytesRead = request.package->ReadFile( &request.stream, 0, request.buffer, request.bytesToRead );
  415.                         Assert(bytesRead == request.bytesToRead);
  416.  
  417.                         // Invoke user callback.
  418.                         {
  419.                                 SLoadContext    args;
  420.                                 {
  421.                                         args.buffer             = request.buffer;
  422.                                         args.size               = bytesRead;
  423.                                         args.userData   = request.userData;
  424.                                         args.requestId  = request.uid;
  425.                                         //args.assetId = assetId;
  426.                                 }
  427.                                 (*request.asyncLoad)( args );
  428.                         }
  429.  
  430.                         // Add it to the list of completed requests.
  431.                         {
  432.                                 SpinWait::Lock  scopedLock( m_completedRequestsCS );
  433.  
  434.                                 SCompletedRequest & newCompletedRequest = m_completedRequests.Add();
  435.                                 {
  436.                                         newCompletedRequest.buffer = request.buffer;
  437.                                         newCompletedRequest.bytesRead = bytesRead;
  438.  
  439.                                         // copy data for invoking callbacks
  440.                                         newCompletedRequest.userData = request.userData;
  441.                                         newCompletedRequest.finalize = request.finalize;
  442.  
  443.                                         newCompletedRequest.uid = request.uid;
  444.                                 }
  445.                         }
  446.                 }
  447.                 else
  448.                 {
  449.                         m_workDoneEvent.Signal();
  450.                 }
  451.         }
  452.         mxDBGMSG("Resource loader thread is exiting...\n");
  453.         return 0;
  454. }