/*
--------------------------------------------------------------
AFilePackage
--------------------------------------------------------------
*/
AFilePackage::Stream::Stream()
{
ZERO_OUT(*this);
}
/*
--------------------------------------------------------------
FileSystem
--------------------------------------------------------------
*/
FileSystem::FileSystem()
{
}
FileSystem::~FileSystem()
{
}
bool FileSystem::MountPackage( AFilePackage* package )
{
chkRET_FALSE_IF_NIL(package);
chkRET_FALSE_IF_NOT(!package->FindSelfInList( m_packages ));
package->PrependSelfToList( &m_packages );
return true;
}
void FileSystem::UnmountPackage( AFilePackage* package )
{
chkRET_IF_NIL(package);
package->RemoveSelfFromList( &m_packages );
}
AFilePackage* FileSystem::OpenFile( const AssetGuid& fileId, AFilePackage::Stream *stream )
{
chkRET_FALSE_IF_NIL(stream);
AFilePackage* current = m_packages;
while(PtrToBool( current ))
{
if( current->OpenFile( fileId, stream ) )
{
return current;
}
current = current->next;
}
return nil;
}
static void DefaultAsyncLoad( const SLoadContext& context )
{
mxUNUSED(context);
//nothing
}
static void DefaultFinalize( const SFinalizeContext& context )
{
mxUNUSED(context);
//nothing
}
SLoadRequest::SLoadRequest()
{
assetId = AssetGuid_GetNull();
buffer = nil;
bytesToRead = 0;
priority = Load_Priority_Normal;
userData = nil;
asyncLoad = &DefaultAsyncLoad;
finalize = &DefaultFinalize;
}
bool SLoadRequest_CheckValid( const SLoadRequest& o )
{
chkRET_FALSE_IF_NOT(AssetGuid_IsValid(o.assetId));
chkRET_FALSE_IF_NIL(o.buffer);
//chkRET_FALSE_IF_NOT(o.bytesToRead > 0);
enum { MAX_READ_LIMIT = 8*mxMEGABYTE };
chkRET_FALSE_IF_NOT(o.bytesToRead <= MAX_READ_LIMIT);
return true;
}
/*
--------------------------------------------------------------
AsyncReader
--------------------------------------------------------------
*/
// size of streaming buffer must be a multiple of disk sector size
enum { DISK_SECTOR_SIZE = 2048 };
static const UINT STREAMING_BUFFER_SIZE = 4 * mxMEGABYTE; // 4 MiB
enum { INITIAL_QUEUE_SIZE = 64 };
AsyncReader::AsyncReader()
{
m_isRunning = false;
m_lastRequestId = 0;
}
AsyncReader::~AsyncReader()
{
Assert(!m_isRunning);
Assert(m_pendingRequests.IsEmpty());
Assert(m_completedRequests.IsEmpty());
}
void AsyncReader::Initialize()
{
Assert(mxTHIS_IS_MAIN_THREAD);
Assert(!m_isRunning);
m_pendingRequests.Reserve(INITIAL_QUEUE_SIZE);
m_completedRequests.Reserve(INITIAL_QUEUE_SIZE);
m_pendingRequestsCS.Initialize();
m_completedRequestsCS.Initialize();
m_isRunning = true;
m_lastRequestId = 0;
Thread::CInfo threadInfo;
{
threadInfo.entryPoint = &StaticResourceLoaderThread;
threadInfo.userPointer = this;
threadInfo.priority = ThreadPriority_Low;
#if MX_DEBUG
threadInfo.debugName = "$BackgroundLoader";
#endif // MX_DEBUG
}
m_IOThread.Initialize( threadInfo );
m_wakeupEvent.Initialize( EventFlag::CreateEvent_AutoReset );
}
void AsyncReader::Shutdown()
{
Assert(mxTHIS_IS_MAIN_THREAD);
Assert(m_isRunning);
m_isRunning = false;
this->CancelAll();
m_wakeupEvent.Signal();
m_IOThread.Join();
Assert(m_pendingRequests.IsEmpty());
Assert(m_completedRequests.IsEmpty());
m_IOThread.Shutdown();
m_pendingRequestsCS.Shutdown();
m_completedRequestsCS.Shutdown();
m_wakeupEvent.Shutdown();
m_workDoneEvent.Shutdown();
}
bool AsyncReader::AddLoadRequest( const SLoadRequest& request, LoadRequestId *id )
{
Assert(mxTHIS_IS_MAIN_THREAD);
chkRET_FALSE_IF_NOT(SLoadRequest_CheckValid(request));
AFilePackage::Stream fileStream;
AFilePackage* filePackage = gCore.files->OpenFile( request.assetId, &fileStream );
if( !filePackage )
{
return false;
}
const UINT bytesToRead = (request.bytesToRead != nil) ? request.bytesToRead : fileStream.uncompressedSize;
// Add the request to the read queue
{
SpinWait::Lock lockJobList( m_pendingRequestsCS );
mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
SPendingRequest & newQueuedRequest = m_pendingRequests.Add();
{
newQueuedRequest.package = filePackage;
newQueuedRequest.stream = fileStream;
newQueuedRequest.priority = request.priority;
newQueuedRequest.buffer = request.buffer;
newQueuedRequest.bytesToRead = bytesToRead;
newQueuedRequest.userData = request.userData;
newQueuedRequest.asyncLoad = request.asyncLoad;
newQueuedRequest.finalize = request.finalize;
newQueuedRequest.uid = m_lastRequestId++;
if( id != nil ) {
*id = newQueuedRequest.uid;
}
}
// sort load requests by file offsets
this->SortQueuedRequests_NoLock();
mxREAD_WRITE_BARRIER;//ensure previous writes are complete before proceeding.
}
// Signal that we have something to read
m_wakeupEvent.Signal();
return true;
}
bool AsyncReader::Cancel( LoadRequestId requestId )
{
Assert(mxTHIS_IS_MAIN_THREAD);
// If the request hasn't been yet processed, then it must be in the list of pending requests.
// Remove it from the list.
{
SpinWait::Lock scopedLock( m_pendingRequestsCS );
SPendingRequest* pendingRequest = this->FindPendingRequest_NoLock( requestId );
if( pendingRequest != nil )
{
m_pendingRequests.RemoveContainedItem( pendingRequest );
return true;
}
}
// Try to remove it from the list of requests awaiting finalization.
{
SpinWait::Lock scopedLock( m_completedRequestsCS );
SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
if( completedRequest != nil )
{
m_completedRequests.RemoveContainedItem( completedRequest );
return true;
}
}
// The given request couldn't be found.
return true;
}
// Block the current thread until the load request with the specified ID completes.
//
bool AsyncReader::WaitFor( LoadRequestId requestId, UINT timeOutMilliseconds )
{
Assert(mxTHIS_IS_MAIN_THREAD);
// wait until the item is present in the m_completedRequests list or a timeout reached
const UINT startTimeMSec = mxGetTimeInMilliseconds();
while(true)
{
{
SpinWait::Lock scopedLock( m_completedRequestsCS );
SCompletedRequest* completedRequest = this->FindCompletedRequest_NoLock( requestId );
if( completedRequest != nil )
{
completedRequest->Finalize();
m_completedRequests.RemoveContainedItem( completedRequest );
return true;
}
}
//CurrentThread::Yield();
mxSleepMilliseconds(1);
const UINT currentTimeMSec = mxGetTimeInMilliseconds();
if( currentTimeMSec - startTimeMSec >= timeOutMilliseconds ) {
return false;
}
}
return false;
}
void AsyncReader::WaitForAll( UINT milliseconds )
{
Assert(mxTHIS_IS_MAIN_THREAD);
m_wakeupEvent.Signal();
m_workDoneEvent.WaitTimeOut( milliseconds );
Assert(m_pendingRequests.IsEmpty());
this->FinalizeCompletedRequests();
}
// One may not want to process all the completed requests at once
// because this may cause a noticeable temporary slow down in the game's performance.
// So, this API could be called once per frame
// so that only a small number work items would be set to the device on each frame,
// thereby spreading the work load of binding resources to the device over several frames.
//
void AsyncReader::FinalizeCompletedRequests( UINT maxNumRequests )
{
Assert(mxTHIS_IS_MAIN_THREAD);
SpinWait::Lock scopedLock( m_completedRequestsCS );
const UINT numCompletedRequests = m_completedRequests.Num();
const UINT numRequestsToProcess = (maxNumRequests > 0) ? smallest(maxNumRequests, numCompletedRequests) : numCompletedRequests;
for( UINT iCompletedRequest = 0; iCompletedRequest < numRequestsToProcess; iCompletedRequest++ )
{
SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
completedRequest.Finalize();
}
m_completedRequests.RemoveAt( 0, numRequestsToProcess );
}
void AsyncReader::CancelAll()
{
Assert(mxTHIS_IS_MAIN_THREAD);
mxUNDONE;
{
SpinWait::Lock scopedLock( m_pendingRequestsCS );
m_pendingRequests.Empty();
}
{
SpinWait::Lock scopedLock( m_completedRequestsCS );
m_completedRequests.Empty();
}
}
AsyncReader::SPendingRequest* AsyncReader::FindPendingRequest_NoLock( LoadRequestId requestId )
{
for( UINT iPendingRequest = 0; iPendingRequest < m_pendingRequests.Num(); iPendingRequest++ )
{
SPendingRequest& pendingRequest = m_pendingRequests[ iPendingRequest ];
if( pendingRequest.uid == requestId ) {
return &pendingRequest;
}
}
return nil;
}
AsyncReader::SCompletedRequest* AsyncReader::FindCompletedRequest_NoLock( LoadRequestId requestId )
{
for( UINT iCompletedRequest = 0; iCompletedRequest < m_completedRequests.Num(); iCompletedRequest++ )
{
SCompletedRequest& completedRequest = m_completedRequests[ iCompletedRequest ];
if( completedRequest.uid == requestId ) {
return &completedRequest;
}
}
return nil;
}
void AsyncReader::SortQueuedRequests_NoLock()
{
// sort by priority and physical location (file offsets)
if( m_pendingRequests.Num() > 1 )
{
struct CompareLoadRequests
{
static bool Compare( const SPendingRequest& a, const SPendingRequest& b )
{
if( a.priority < b.priority ) {
return false;
}
if( a.stream.sortKey < b.stream.sortKey ) {
return false;
}
return true;
}
};
InsertionSort< CompareLoadRequests >( m_pendingRequests.ToPtr(), 0, m_pendingRequests.Num()-1 );
}
}
UINT32 PASCAL AsyncReader::StaticResourceLoaderThread( void* userData )
{
AsyncReader* loader = (AsyncReader*) userData;
return loader->ResourceLoaderThread();
}
UINT32 AsyncReader::ResourceLoaderThread()
{
while( m_isRunning )
{
m_wakeupEvent.Wait();
if( !m_isRunning ) {
break;
}
if( m_pendingRequests.NonEmpty() )
{
SPendingRequest request; //<= must be copied via value
ZERO_OUT(request);
// Pop a request off of the queue
{
SpinWait::Lock lockJobList( m_pendingRequestsCS );
request = m_pendingRequests.GetFirst();
m_pendingRequests.RemoveAt( 0 );
}
// Handle a read request
const UINT bytesRead = request.package->ReadFile( &request.stream, 0, request.buffer, request.bytesToRead );
Assert(bytesRead == request.bytesToRead);
// Invoke user callback.
{
SLoadContext args;
{
args.buffer = request.buffer;
args.size = bytesRead;
args.userData = request.userData;
args.requestId = request.uid;
//args.assetId = assetId;
}
(*request.asyncLoad)( args );
}
// Add it to the list of completed requests.
{
SpinWait::Lock scopedLock( m_completedRequestsCS );
SCompletedRequest & newCompletedRequest = m_completedRequests.Add();
{
newCompletedRequest.buffer = request.buffer;
newCompletedRequest.bytesRead = bytesRead;
// copy data for invoking callbacks
newCompletedRequest.userData = request.userData;
newCompletedRequest.finalize = request.finalize;
newCompletedRequest.uid = request.uid;
}
}
}
else
{
m_workDoneEvent.Signal();
}
}
mxDBGMSG("Resource loader thread is exiting...\n");
return 0;
}