Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- --------------------------------------------------------------
- AFilePackage
- --------------------------------------------------------------
- */
- AFilePackage::Stream::Stream()
- {
- ZERO_OUT(*this);
- }
- /*
- --------------------------------------------------------------
- FileSystem
- --------------------------------------------------------------
- */
- FileSystem::FileSystem()
- {
- }
- FileSystem::~FileSystem()
- {
- }
- bool FileSystem::MountPackage( AFilePackage* package )
- {
- chkRET_FALSE_IF_NIL(package);
- chkRET_FALSE_IF_NOT(!package->FindSelfInList( m_packages ));
- package->PrependSelfToList( &m_packages );
- return true;
- }
- void FileSystem::UnmountPackage( AFilePackage* package )
- {
- chkRET_IF_NIL(package);
- package->RemoveSelfFromList( &m_packages );
- }
- AFilePackage* FileSystem::OpenFile( const AssetGuid& fileId, AFilePackage::Stream *stream )
- {
- chkRET_FALSE_IF_NIL(stream);
- AFilePackage* current = m_packages;
- while(PtrToBool( current ))
- {
- if( current->OpenFile( fileId, stream ) )
- {
- return current;
- }
- current = current->next;
- }
- return nil;
- }
- static void DefaultAsyncLoad( const SLoadContext& context )
- {
- mxUNUSED(context);
- //nothing
- }
- static void DefaultFinalize( const SFinalizeContext& context )
- {
- mxUNUSED(context);
- //nothing
- }
- SLoadRequest::SLoadRequest()
- {
- assetId = AssetGuid_GetNull();
- buffer = nil;
- bytesToRead = 0;
- priority = Load_Priority_Normal;
- userData = nil;
- asyncLoad = &DefaultAsyncLoad;
- finalize = &DefaultFinalize;
- }
- bool SLoadRequest_CheckValid( const SLoadRequest& o )
- {
- chkRET_FALSE_IF_NOT(AssetGuid_IsValid(o.assetId));
- chkRET_FALSE_IF_NIL(o.buffer);
- //chkRET_FALSE_IF_NOT(o.bytesToRead > 0);
- enum { MAX_READ_LIMIT = 8*mxMEGABYTE };
- chkRET_FALSE_IF_NOT(o.bytesToRead <= MAX_READ_LIMIT);
- return true;
- }
- /*
- --------------------------------------------------------------
- AsyncReader
- --------------------------------------------------------------
- */
- // size of streaming buffer must be a multiple of disk sector size
- enum { DISK_SECTOR_SIZE = 2048 };
- static const UINT STREAMING_BUFFER_SIZE = 4 * mxMEGABYTE; // 4 MiB
- enum { INITIAL_QUEUE_SIZE = 64 };
- AsyncReader::AsyncReader()
- {
- m_isRunning = false;
- m_lastRequestId = 0;
- }
- AsyncReader::~AsyncReader()
- {
- Assert(!m_isRunning);
- Assert(m_pendingRequests.IsEmpty());
- Assert(m_completedRequests.IsEmpty());
- }
- void AsyncReader::Initialize()
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- Assert(!m_isRunning);
- m_pendingRequests.Reserve(INITIAL_QUEUE_SIZE);
- m_completedRequests.Reserve(INITIAL_QUEUE_SIZE);
- m_pendingRequestsCS.Initialize();
- m_completedRequestsCS.Initialize();
- m_isRunning = true;
- m_lastRequestId = 0;
- Thread::CInfo threadInfo;
- {
- threadInfo.entryPoint = &StaticResourceLoaderThread;
- threadInfo.userPointer = this;
- threadInfo.priority = ThreadPriority_Low;
- #if MX_DEBUG
- threadInfo.debugName = "$BackgroundLoader";
- #endif // MX_DEBUG
- }
- m_IOThread.Initialize( threadInfo );
- m_wakeupEvent.Initialize( EventFlag::CreateEvent_AutoReset );
- }
- void AsyncReader::Shutdown()
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- Assert(m_isRunning);
- m_isRunning = false;
- this->CancelAll();
- m_wakeupEvent.Signal();
- m_IOThread.Join();
- Assert(m_pendingRequests.IsEmpty());
- Assert(m_completedRequests.IsEmpty());
- m_IOThread.Shutdown();
- m_pendingRequestsCS.Shutdown();
- m_completedRequestsCS.Shutdown();
- m_wakeupEvent.Shutdown();
- m_workDoneEvent.Shutdown();
- }
- bool AsyncReader::AddLoadRequest( const SLoadRequest& request, LoadRequestId *id )
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- chkRET_FALSE_IF_NOT(SLoadRequest_CheckValid(request));
- AFilePackage::Stream fileStream;
- AFilePackage* filePackage = gCore.files->OpenFile( request.assetId, &fileStream );
- if( !filePackage )
- {
- return false;
- }
- const UINT bytesToRead = (request.bytesToRead != nil) ? request.bytesToRead : fileStream.uncompressedSize;
- // Add the request to the read queue
- {
- SpinWait::Lock lockJobList( m_pendingRequestsCS );
- mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
- SPendingRequest & newQueuedRequest = m_pendingRequests.Add();
- {
- newQueuedRequest.package = filePackage;
- newQueuedRequest.stream = fileStream;
- newQueuedRequest.priority = request.priority;
- newQueuedRequest.buffer = request.buffer;
- newQueuedRequest.bytesToRead = bytesToRead;
- newQueuedRequest.userData = request.userData;
- newQueuedRequest.asyncLoad = request.asyncLoad;
- newQueuedRequest.finalize = request.finalize;
- newQueuedRequest.uid = m_lastRequestId++;
- if( id != nil ) {
- *id = newQueuedRequest.uid;
- }
- }
- // sort load requests by file offsets
- this->SortQueuedRequests_NoLock();
- mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
- }
- // Signal that we have something to read
- m_wakeupEvent.Signal();
- return true;
- }
- bool AsyncReader::Cancel( LoadRequestId requestId )
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- // If the request hasn't been yet processed, then it must be in the list of pending requests.
- // Remove it from the list.
- {
- SpinWait::Lock scopedLock( m_pendingRequestsCS );
- SPendingRequest* pendingRequest = this->FindPendingRequest_NoLock( requestId );
- if( pendingRequest != nil )
- {
- m_pendingRequests.RemoveContainedItem( pendingRequest );
- return true;
- }
- }
- // Try to remove it from the list of requests awaiting finalization.
- {
- SpinWait::Lock scopedLock( m_completedRequestsCS );
- SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
- if( completedRequest != nil )
- {
- m_completedRequests.RemoveContainedItem( completedRequest );
- return true;
- }
- }
- // The given request couldn't be found.
- return true;
- }
- // Block the current thread until the load request with the specified ID completes.
- //
- bool AsyncReader::WaitFor( LoadRequestId requestId, UINT timeOutMilliseconds )
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- // wait until the item is present in the m_completedRequests list or a timeout reached
- const UINT startTimeMSec = mxGetTimeInMilliseconds();
- while(true)
- {
- {
- SpinWait::Lock scopedLock( m_completedRequestsCS );
- SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
- if( completedRequest != nil )
- {
- completedRequest->Finalize();
- m_completedRequests.RemoveContainedItem( completedRequest );
- return true;
- }
- }
- //CurrentThread::Yield();
- mxSleepMilliseconds(1);
- const UINT currentTimeMSec = mxGetTimeInMilliseconds();
- if( currentTimeMSec - startTimeMSec >= timeOutMilliseconds ) {
- return false;
- }
- }
- return false;
- }
- void AsyncReader::WaitForAll( UINT milliseconds )
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- m_wakeupEvent.Signal();
- m_workDoneEvent.WaitTimeOut( milliseconds );
- Assert(m_pendingRequests.IsEmpty());
- this->FinalizeCompletedRequests();
- }
- // One may not want to process all the completed requests at once
- // because this may cause a noticeable temporary slow down in the game's performance.
- // So, this API could be called once per frame
- // so that only a small number work items would be set to the device on each frame,
- // thereby spreading the work load of binding resources to the device over several frames.
- //
- void AsyncReader::FinalizeCompletedRequests( UINT maxNumRequests )
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- SpinWait::Lock scopedLock( m_completedRequestsCS );
- const UINT numCompletedRequests = m_completedRequests.Num();
- const UINT numRequestsToProcess = (maxNumRequests > 0) ? smallest(maxNumRequests, numCompletedRequests) : numCompletedRequests;
- for( UINT iCompletedRequest = 0; iCompletedRequest < numRequestsToProcess; iCompletedRequest++ )
- {
- SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
- completedRequest.Finalize();
- }
- m_completedRequests.RemoveAt( 0, numRequestsToProcess );
- }
- void AsyncReader::CancelAll()
- {
- Assert(mxTHIS_IS_MAIN_THREAD);
- mxUNDONE;
- {
- SpinWait::Lock scopedLock( m_pendingRequestsCS );
- m_pendingRequests.Empty();
- }
- {
- SpinWait::Lock scopedLock( m_completedRequestsCS );
- m_completedRequests.Empty();
- }
- }
- AsyncReader::SPendingRequest* AsyncReader::FindPendingRequest_NoLock( LoadRequestId requestId )
- {
- for( UINT iPendingRequest = 0; iPendingRequest < m_pendingRequests.Num(); iPendingRequest++ )
- {
- SPendingRequest& pendingRequest = m_pendingRequests[ iPendingRequest ];
- if( pendingRequest.uid == requestId ) {
- return &pendingRequest;
- }
- }
- return nil;
- }
- AsyncReader::SCompletedRequest* AsyncReader::FindCompletedRequest_NoLock( LoadRequestId requestId )
- {
- for( UINT iCompletedRequest = 0; iCompletedRequest < m_completedRequests.Num(); iCompletedRequest++ )
- {
- SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
- if( completedRequest.uid == requestId ) {
- return &completedRequest;
- }
- }
- return nil;
- }
- void AsyncReader::SortQueuedRequests_NoLock()
- {
- // sort by priority and physical location (file offsets)
- if( m_pendingRequests.Num() > 1 )
- {
- struct CompareLoadRequests
- {
- static bool Compare( const SPendingRequest& a, const SPendingRequest& b )
- {
- if( a.priority < b.priority ) {
- return false;
- }
- if( a.stream.sortKey < b.stream.sortKey ) {
- return false;
- }
- return true;
- }
- };
- InsertionSort< CompareLoadRequests >( m_pendingRequests.ToPtr(), 0, m_pendingRequests.Num()-1 );
- }
- }
- UINT32 PASCAL AsyncReader::StaticResourceLoaderThread( void* userData )
- {
- AsyncReader* loader = (AsyncReader*) userData;
- return loader->ResourceLoaderThread();
- }
- UINT32 AsyncReader::ResourceLoaderThread()
- {
- while( m_isRunning )
- {
- m_wakeupEvent.Wait();
- if( !m_isRunning ) {
- break;
- }
- if( m_pendingRequests.NonEmpty() )
- {
- SPendingRequest request; //<= must be copied via value
- ZERO_OUT(request);
- // Pop a request off of the queue
- {
- SpinWait::Lock lockJobList( m_pendingRequestsCS );
- request = m_pendingRequests.GetFirst();
- m_pendingRequests.RemoveAt( 0 );
- }
- // Handle a read request
- const UINT bytesRead = request.package->ReadFile( &request.stream, 0, request.buffer, request.bytesToRead );
- Assert(bytesRead == request.bytesToRead);
- // Invoke user callback.
- {
- SLoadContext args;
- {
- args.buffer = request.buffer;
- args.size = bytesRead;
- args.userData = request.userData;
- args.requestId = request.uid;
- //args.assetId = assetId;
- }
- (*request.asyncLoad)( args );
- }
- // Add it to the list of completed requests.
- {
- SpinWait::Lock scopedLock( m_completedRequestsCS );
- SCompletedRequest & newCompletedRequest = m_completedRequests.Add();
- {
- newCompletedRequest.buffer = request.buffer;
- newCompletedRequest.bytesRead = bytesRead;
- // copy data for invoking callbacks
- newCompletedRequest.userData = request.userData;
- newCompletedRequest.finalize = request.finalize;
- newCompletedRequest.uid = request.uid;
- }
- }
- }
- else
- {
- m_workDoneEvent.Signal();
- }
- }
- mxDBGMSG("Resource loader thread is exiting...\n");
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement