Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- uint32 i;
- for(i = nCurDate; i <= nEndDate; i = DataTimeUtil::GetNextDate(i))
- {
- sprintf(strFileName,"%s\\%04d\\%02d\\%d%c.txt", filePath, i/10000, (i%10000)/100, i, prefix);
- if(sys::file_exist(strFileName))
- {
- //打开实时更新文件读取第一个long数据,判断是否包含更新数据
- if(nFileHandle == -1 && (nFileHandle = _open(strFileName, _O_RDONLY | _O_BINARY, _S_IREAD)) == -1){
- log_error("Fail to open realtimefile %s, errorcode = %d(ScanRealTimeData)", strFileName, GetLastError());
- throw "Fail to open file";
- }
- //读取当前实时文件数据总数
- uint32 nTotalSize = (i == nEndDate && bOpenFlag) ? nTotalFileSize : _lseek(nFileHandle, 0, SEEK_END);
- if((int32)nTotalSize < 0) throw "Fail to _lseek RealTimeFile";
- if(nCurPos > nTotalFileSize){
- log_error(" oops! nTotalFileSize is %u,nCurPos is %u",nTotalFileSize,nCurPos);
- if (nFileHandle != -1) _close(nFileHandle);
- if((nFileHandle = _open(strFileName, _O_RDONLY | _O_BINARY, _S_IREAD)) == -1) {
- log_error("Fail to Reopen realtimefile %s, errorcode = %d(ScanRealTimeData)", strFileName, GetLastError());
- throw "Fail to Re open file";
- }
- nTotalSize = _lseek(nFileHandle, 0, SEEK_END);
- if((int32)nTotalSize < 0) throw "Fail to Re _lseek RealTimeFile";
- }
- //当前扫描到的实时文件有数据,但之前有效的实时服务器未读完没有完成切换时,不读取该文件内容
- if(i == nNowDate)
- {
- if (prefix == T_FILE && nRealServerNum != g_TFileRTServerNum)break;
- if (prefix == F_FILE && nRealServerNum != g_FFileRTServerNum)break;
- if (prefix == W_FILE && nRealServerNum != g_WFileRTServerNum)break;
- }
- //如果需要停机维护,则只同步到指定的位置
- if(bSync && i == nDstDate && nDstPos < nTotalSize) nTotalSize = nDstPos;
- //循环处理每一行实时更新数据
- while(nCurPos < nTotalSize){
- //寻址到指定的位置, 因为上次读取的数据可能不是正好对齐的,一行数据可能只被读取了一半
- if(_lseek(nFileHandle, nCurPos, SEEK_SET) == -1) throw "Fail to _lseek.";
- //获取需要读取的数据块,缺省读取5w数据,但是最后实际大小不足5w,则读取实际大小
- uint32 nSingleReadSize = nSingleMaxCount;
- if(nTotalSize - nCurPos < nSingleReadSize) nSingleReadSize = nTotalSize - nCurPos;
- //读取指定大小的实时更新文件内容到缓冲区
- int32 nActualSize = (int32)_read(nFileHandle, strFileBuffer, nSingleReadSize);
- if(nActualSize != (int32)nSingleReadSize){
- log_error("Fail to _read strFileBuffer, Start:%ld, should read %ld, Actual read %ld", nCurPos, nSingleReadSize, nActualSize);
- throw "Fail to read file";
- }
- strFileBuffer[nSingleReadSize] = '\0';
- // 处理数据
- char* pFileData = strFileBuffer;
- if ( prefix == T_FILE )
- {
- //对于T文件,先处理新增记录,然后再处理删除记录,这样需要循环两遍
- TimeRuler tm;
- uint32 nTmpPos = nCurPos;
- //处理TFile
- int32 nRetT = ProcessLineBuffer(pFileData, T_FILE, nTmpPos);
- if(nRetT == 0) throw "Fail to call ProcessLineBuffer TFile";
- //新增记录时间
- g_RealStat.t_time += (uint32)tm.elapsed_ms();
- g_nUpdateRealTimeStatus = T_FILE;
- //处理DFile
- pFileData = strFileBuffer;
- int32 nRetD = ProcessLineBuffer(pFileData, D_FILE, nCurPos);
- if(nRetD == 0) throw "Fail to call ProcessLineBuffer DFile";
- //如果字符串结束,最后可能包含的字符为不全的字符串,下次再处理
- if(nRetT == 2 || nRetD == 2) break;
- }else{
- int32 nRet = ProcessLineBuffer(pFileData, prefix, nCurPos);
- if(nRet == 0){
- log_error("Fail to call ProcessLineBuffer %cFile ", prefix);
- throw "Fail to ProcessLineBuffer";
- }
- if(nRet == 2)break;
- }
- }
- }else{
- // 如果服务器出现异常,无法正常连接时,则关闭原有文件句柄,等网络恢复后自动从断点运行
- if(nFileHandle != -1 || i != nCurDate){
- if(nFileHandle!=-1) _close(nFileHandle),nFileHandle = -1;
- if(bOpenFlag) log_error(" %s is not exists!",strFileName);
- break;
- }
- }
- nCurPos = 0, bOpenFlag = false;
- if(nFileHandle!=-1) _close(nFileHandle), nFileHandle = -1;
- }
- nCurDate = i;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement