/* -------------------------------------------------------------- AFilePackage -------------------------------------------------------------- */ AFilePackage::Stream::Stream() { ZERO_OUT(*this); } /* -------------------------------------------------------------- FileSystem -------------------------------------------------------------- */ FileSystem::FileSystem() { } FileSystem::~FileSystem() { } bool FileSystem::MountPackage( AFilePackage* package ) { chkRET_FALSE_IF_NIL(package); chkRET_FALSE_IF_NOT(!package->FindSelfInList( m_packages )); package->PrependSelfToList( &m_packages ); return true; } void FileSystem::UnmountPackage( AFilePackage* package ) { chkRET_IF_NIL(package); package->RemoveSelfFromList( &m_packages ); } AFilePackage* FileSystem::OpenFile( const AssetGuid& fileId, AFilePackage::Stream *stream ) { chkRET_FALSE_IF_NIL(stream); AFilePackage* current = m_packages; while(PtrToBool( current )) { if( current->OpenFile( fileId, stream ) ) { return current; } current = current->next; } return nil; } static void DefaultAsyncLoad( const SLoadContext& context ) { mxUNUSED(context); //nothing } static void DefaultFinalize( const SFinalizeContext& context ) { mxUNUSED(context); //nothing } SLoadRequest::SLoadRequest() { assetId = AssetGuid_GetNull(); buffer = nil; bytesToRead = 0; priority = Load_Priority_Normal; userData = nil; asyncLoad = &DefaultAsyncLoad; finalize = &DefaultFinalize; } bool SLoadRequest_CheckValid( const SLoadRequest& o ) { chkRET_FALSE_IF_NOT(AssetGuid_IsValid(o.assetId)); chkRET_FALSE_IF_NIL(o.buffer); //chkRET_FALSE_IF_NOT(o.bytesToRead > 0); enum { MAX_READ_LIMIT = 8*mxMEGABYTE }; chkRET_FALSE_IF_NOT(o.bytesToRead <= MAX_READ_LIMIT); return true; } /* -------------------------------------------------------------- AsyncReader -------------------------------------------------------------- */ // size of streaming buffer must be a multiple of disk sector size enum { DISK_SECTOR_SIZE = 2048 }; static const UINT STREAMING_BUFFER_SIZE = 4 * mxMEGABYTE; // 4 MiB enum { INITIAL_QUEUE_SIZE = 64 }; AsyncReader::AsyncReader() { m_isRunning = false; m_lastRequestId = 0; } AsyncReader::~AsyncReader() { Assert(!m_isRunning); Assert(m_pendingRequests.IsEmpty()); Assert(m_completedRequests.IsEmpty()); } void AsyncReader::Initialize() { Assert(mxTHIS_IS_MAIN_THREAD); Assert(!m_isRunning); m_pendingRequests.Reserve(INITIAL_QUEUE_SIZE); m_completedRequests.Reserve(INITIAL_QUEUE_SIZE); m_pendingRequestsCS.Initialize(); m_completedRequestsCS.Initialize(); m_isRunning = true; m_lastRequestId = 0; Thread::CInfo threadInfo; { threadInfo.entryPoint = &StaticResourceLoaderThread; threadInfo.userPointer = this; threadInfo.priority = ThreadPriority_Low; #if MX_DEBUG threadInfo.debugName = "$BackgroundLoader"; #endif // MX_DEBUG } m_IOThread.Initialize( threadInfo ); m_wakeupEvent.Initialize( EventFlag::CreateEvent_AutoReset ); } void AsyncReader::Shutdown() { Assert(mxTHIS_IS_MAIN_THREAD); Assert(m_isRunning); m_isRunning = false; this->CancelAll(); m_wakeupEvent.Signal(); m_IOThread.Join(); Assert(m_pendingRequests.IsEmpty()); Assert(m_completedRequests.IsEmpty()); m_IOThread.Shutdown(); m_pendingRequestsCS.Shutdown(); m_completedRequestsCS.Shutdown(); m_wakeupEvent.Shutdown(); m_workDoneEvent.Shutdown(); } bool AsyncReader::AddLoadRequest( const SLoadRequest& request, LoadRequestId *id ) { Assert(mxTHIS_IS_MAIN_THREAD); chkRET_FALSE_IF_NOT(SLoadRequest_CheckValid(request)); AFilePackage::Stream fileStream; AFilePackage* filePackage = gCore.files->OpenFile( request.assetId, &fileStream ); if( !filePackage ) { return false; } const UINT bytesToRead = (request.bytesToRead != nil) ? request.bytesToRead : fileStream.uncompressedSize; // Add the request to the read queue { SpinWait::Lock lockJobList( m_pendingRequestsCS ); mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding. SPendingRequest & newQueuedRequest = m_pendingRequests.Add(); { newQueuedRequest.package = filePackage; newQueuedRequest.stream = fileStream; newQueuedRequest.priority = request.priority; newQueuedRequest.buffer = request.buffer; newQueuedRequest.bytesToRead = bytesToRead; newQueuedRequest.userData = request.userData; newQueuedRequest.asyncLoad = request.asyncLoad; newQueuedRequest.finalize = request.finalize; newQueuedRequest.uid = m_lastRequestId++; if( id != nil ) { *id = newQueuedRequest.uid; } } // sort load requests by file offsets this->SortQueuedRequests_NoLock(); mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding. } // Signal that we have something to read m_wakeupEvent.Signal(); return true; } bool AsyncReader::Cancel( LoadRequestId requestId ) { Assert(mxTHIS_IS_MAIN_THREAD); // If the request hasn't been yet processed, then it must be in the list of pending requests. // Remove it from the list. { SpinWait::Lock scopedLock( m_pendingRequestsCS ); SPendingRequest* pendingRequest = this->FindPendingRequest_NoLock( requestId ); if( pendingRequest != nil ) { m_pendingRequests.RemoveContainedItem( pendingRequest ); return true; } } // Try to remove it from the list of requests awaiting finalization. { SpinWait::Lock scopedLock( m_completedRequestsCS ); SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId ); if( completedRequest != nil ) { m_completedRequests.RemoveContainedItem( completedRequest ); return true; } } // The given request couldn't be found. return true; } // Block the current thread until the load request with the specified ID completes. // bool AsyncReader::WaitFor( LoadRequestId requestId, UINT timeOutMilliseconds ) { Assert(mxTHIS_IS_MAIN_THREAD); // wait until the item is present in the m_completedRequests list or a timeout reached const UINT startTimeMSec = mxGetTimeInMilliseconds(); while(true) { { SpinWait::Lock scopedLock( m_completedRequestsCS ); SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId ); if( completedRequest != nil ) { completedRequest->Finalize(); m_completedRequests.RemoveContainedItem( completedRequest ); return true; } } //CurrentThread::Yield(); mxSleepMilliseconds(1); const UINT currentTimeMSec = mxGetTimeInMilliseconds(); if( currentTimeMSec - startTimeMSec >= timeOutMilliseconds ) { return false; } } return false; } void AsyncReader::WaitForAll( UINT milliseconds ) { Assert(mxTHIS_IS_MAIN_THREAD); m_wakeupEvent.Signal(); m_workDoneEvent.WaitTimeOut( milliseconds ); Assert(m_pendingRequests.IsEmpty()); this->FinalizeCompletedRequests(); } // One may not want to process all the completed requests at once // because this may cause a noticeable temporary slow down in the game's performance. // So, this API could be called once per frame // so that only a small number work items would be set to the device on each frame, // thereby spreading the work load of binding resources to the device over several frames. // void AsyncReader::FinalizeCompletedRequests( UINT maxNumRequests ) { Assert(mxTHIS_IS_MAIN_THREAD); SpinWait::Lock scopedLock( m_completedRequestsCS ); const UINT numCompletedRequests = m_completedRequests.Num(); const UINT numRequestsToProcess = (maxNumRequests > 0) ? smallest(maxNumRequests, numCompletedRequests) : numCompletedRequests; for( UINT iCompletedRequest = 0; iCompletedRequest < numRequestsToProcess; iCompletedRequest++ ) { SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ]; completedRequest.Finalize(); } m_completedRequests.RemoveAt( 0, numRequestsToProcess ); } void AsyncReader::CancelAll() { Assert(mxTHIS_IS_MAIN_THREAD); mxUNDONE; { SpinWait::Lock scopedLock( m_pendingRequestsCS ); m_pendingRequests.Empty(); } { SpinWait::Lock scopedLock( m_completedRequestsCS ); m_completedRequests.Empty(); } } AsyncReader::SPendingRequest* AsyncReader::FindPendingRequest_NoLock( LoadRequestId requestId ) { for( UINT iPendingRequest = 0; iPendingRequest < m_pendingRequests.Num(); iPendingRequest++ ) { SPendingRequest& pendingRequest = m_pendingRequests[ iPendingRequest ]; if( pendingRequest.uid == requestId ) { return &pendingRequest; } } return nil; } AsyncReader::SCompletedRequest* AsyncReader::FindCompletedRequest_NoLock( LoadRequestId requestId ) { for( UINT iCompletedRequest = 0; iCompletedRequest < m_completedRequests.Num(); iCompletedRequest++ ) { SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ]; if( completedRequest.uid == requestId ) { return &completedRequest; } } return nil; } void AsyncReader::SortQueuedRequests_NoLock() { // sort by priority and physical location (file offsets) if( m_pendingRequests.Num() > 1 ) { struct CompareLoadRequests { static bool Compare( const SPendingRequest& a, const SPendingRequest& b ) { if( a.priority < b.priority ) { return false; } if( a.stream.sortKey < b.stream.sortKey ) { return false; } return true; } }; InsertionSort< CompareLoadRequests >( m_pendingRequests.ToPtr(), 0, m_pendingRequests.Num()-1 ); } } UINT32 PASCAL AsyncReader::StaticResourceLoaderThread( void* userData ) { AsyncReader* loader = (AsyncReader*) userData; return loader->ResourceLoaderThread(); } UINT32 AsyncReader::ResourceLoaderThread() { while( m_isRunning ) { m_wakeupEvent.Wait(); if( !m_isRunning ) { break; } if( m_pendingRequests.NonEmpty() ) { SPendingRequest request; //<= must be copied via value ZERO_OUT(request); // Pop a request off of the queue { SpinWait::Lock lockJobList( m_pendingRequestsCS ); request = m_pendingRequests.GetFirst(); m_pendingRequests.RemoveAt( 0 ); } // Handle a read request const UINT bytesRead = request.package->ReadFile( &request.stream, 0, request.buffer, request.bytesToRead ); Assert(bytesRead == request.bytesToRead); // Invoke user callback. { SLoadContext args; { args.buffer = request.buffer; args.size = bytesRead; args.userData = request.userData; args.requestId = request.uid; //args.assetId = assetId; } (*request.asyncLoad)( args ); } // Add it to the list of completed requests. { SpinWait::Lock scopedLock( m_completedRequestsCS ); SCompletedRequest & newCompletedRequest = m_completedRequests.Add(); { newCompletedRequest.buffer = request.buffer; newCompletedRequest.bytesRead = bytesRead; // copy data for invoking callbacks newCompletedRequest.userData = request.userData; newCompletedRequest.finalize = request.finalize; newCompletedRequest.uid = request.uid; } } } else { m_workDoneEvent.Signal(); } } mxDBGMSG("Resource loader thread is exiting...\n"); return 0; }