Guest User

memory mapped read and insert

a guest
Apr 2nd, 2014
163
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // Rationale: our inputs are huge (in the 30+gb range, possibly higher), and
  2. // the original technique of reading the entire file into a string would not
  3. // work on most computers (an aciss compute node has 72gb of ram), and we would
  4. // need alternative methods. Current candidates are:
  5. // 1. Memory mapping - platform dependent and no experience atm - done ~103k inserts per sec
  6. // 2. Read in big ram chunks - performance might be affected
  7. // 3. Capability dependent - the function checks for system memory size then
  8. // determines which method to use; e.g., input > memory, chunk. input < mem,
  9. // read entire file into string. (appears to be the most flexible)
  10. // Boost memory mapped files seem to be the easiest and performant way
  11. // to insert large files into the db.
  12.  
  13. // table format is:
  14. // "CREATE TABLE rawreads(instrument TEXT,runid INT,flowcell TEXT,lane INT,tile INT,x INT,y INT,pair INT,filter INT,control INT,index_sequence TEXT,qualityformat INT,data BLOB);"
  15. int mm_read_and_insert(const string& filename, sqlite3* db)
  16. {
  17.     // create a file mapping from filename in readonly mode
  18.     bi::file_mapping m_file(filename.c_str(),bi::read_only);
  19.     // create the mapped region of the entire file in read only mode
  20.     bi::mapped_region region(m_file,bi::read_only);
  21.     region.advise(bi::mapped_region::advice_sequential); // inform the OS of our access pattern
  22.     auto* addr = (char*)region.get_address(); // starting pointer
  23.     auto size = region.get_size();
  24.    
  25.     if (size > 0) {
  26.         auto startClock = clock();
  27.         sqlite3_stmt *stmt;
  28.         char *sql_error_msg = 0;
  29.         // const char* insert_rawreads = "INSERT INTO rawreads VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13);";
  30.         if (sqlite3_prepare_v2(db,sqlite::insert_rawreads,-1,&stmt, NULL) != SQLITE_OK) {
  31.             cout << sqlite3_errmsg(db) << endl;
  32.             return 0;
  33.         }
  34.         sqlite3_exec(db, "PRAGMA cache_size=400000;PRAGMA journal_mode=MEMORY;PRAGMA locking_mode=EXCLUSIVE;PRAGMA count_changes=OFF;PRAGMA auto_vacuum=NONE;PRAGMA temp_store = MEMORY;PRAGMA synchronous=OFF;BEGIN EXCLUSIVE TRANSACTION", NULL, NULL, &sql_error_msg);
  35.         unsigned long long n = 0; // we will be dealing with huge numbers here
  36.         pip::fastq fq; // a struct
  37.         pip::fastq_parser<const char*> g; // boost::spirit grammar
  38.         char const* f(addr); // first iterator/pointer
  39.         char const* l(f + size); // last iterator/pointer
  40.         while (parse(f,l,g,fq)) {
  41.             pack::Pack packed(fq.sequence,fq.quality,1); // packing/compression function
  42.             // Bind parameters to sequence data
  43.             sqlite3_bind_text(stmt,1,fq.instrument.c_str(),-1,SQLITE_TRANSIENT);
  44.             sqlite3_bind_int(stmt,2,fq.run);
  45.             sqlite3_bind_text(stmt,3,fq.flowcell.c_str(),-1,SQLITE_TRANSIENT);
  46.             sqlite3_bind_int(stmt,4,fq.lane);
  47.             sqlite3_bind_int(stmt,5,fq.tile);
  48.             sqlite3_bind_int(stmt,6,fq.x);
  49.             sqlite3_bind_int(stmt,7,fq.y);
  50.             sqlite3_bind_int(stmt,8,fq.pair);
  51.             sqlite3_bind_int(stmt,9,fq.filter == 'Y' ? 1:0);
  52.             sqlite3_bind_int(stmt,10,fq.control);
  53.             sqlite3_bind_text(stmt,11,fq.index.c_str(),-1,SQLITE_TRANSIENT);
  54.             sqlite3_bind_int(stmt,12,packed.qualityFormat());
  55.                 sqlite3_bind_blob(stmt,13,packed.rawData(),fq.sequence.length(),SQLITE_TRANSIENT);
  56.             sqlite3_step(stmt);
  57.             sqlite3_reset(stmt);
  58.             ++n;
  59.             fq = {};
  60.             // Show an update every 100k inserts
  61.             if (n % 100000 == 0)
  62.                 cerr << "Inserted " << n << " sequences...\r";
  63.         }
  64.         sqlite3_exec(db, "COMMIT TRANSACTION", NULL, NULL, &sql_error_msg);
  65.         sqlite3_finalize(stmt);
  66.         auto endClock = clock() - startClock;
  67.         printf("Pip: %llu sequences imported in %4.2f seconds\n",n,endClock/(double)CLOCKS_PER_SEC);
  68.         return n; // return number of rows inserted
  69.     }
  70.     return 0;
  71. }
Advertisement
Add Comment
Please, Sign In to add comment