Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Rationale: our inputs are huge (in the 30+gb range, possibly higher), and
- // the original technique of reading the entire file into a string would not
- // work on most computers (an aciss compute node has 72gb of ram), and we would
- // need alternative methods. Current candidates are:
- // 1. Memory mapping - platform dependent and no experience atm - done ~103k inserts per sec
- // 2. Read in big ram chunks - performance might be affected
- // 3. Capability dependent - the function checks for system memory size then
- // determines which method to use; e.g., input > memory, chunk. input < mem,
- // read entire file into string. (appears to be the most flexible)
- // Boost memory mapped files seem to be the easiest and performant way
- // to insert large files into the db.
- // table format is:
- // "CREATE TABLE rawreads(instrument TEXT,runid INT,flowcell TEXT,lane INT,tile INT,x INT,y INT,pair INT,filter INT,control INT,index_sequence TEXT,qualityformat INT,data BLOB);"
- int mm_read_and_insert(const string& filename, sqlite3* db)
- {
- // create a file mapping from filename in readonly mode
- bi::file_mapping m_file(filename.c_str(),bi::read_only);
- // create the mapped region of the entire file in read only mode
- bi::mapped_region region(m_file,bi::read_only);
- region.advise(bi::mapped_region::advice_sequential); // inform the OS of our access pattern
- auto* addr = (char*)region.get_address(); // starting pointer
- auto size = region.get_size();
- if (size > 0) {
- auto startClock = clock();
- sqlite3_stmt *stmt;
- char *sql_error_msg = 0;
- // const char* insert_rawreads = "INSERT INTO rawreads VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13);";
- if (sqlite3_prepare_v2(db,sqlite::insert_rawreads,-1,&stmt, NULL) != SQLITE_OK) {
- cout << sqlite3_errmsg(db) << endl;
- return 0;
- }
- sqlite3_exec(db, "PRAGMA cache_size=400000;PRAGMA journal_mode=MEMORY;PRAGMA locking_mode=EXCLUSIVE;PRAGMA count_changes=OFF;PRAGMA auto_vacuum=NONE;PRAGMA temp_store = MEMORY;PRAGMA synchronous=OFF;BEGIN EXCLUSIVE TRANSACTION", NULL, NULL, &sql_error_msg);
- unsigned long long n = 0; // we will be dealing with huge numbers here
- pip::fastq fq; // a struct
- pip::fastq_parser<const char*> g; // boost::spirit grammar
- char const* f(addr); // first iterator/pointer
- char const* l(f + size); // last iterator/pointer
- while (parse(f,l,g,fq)) {
- pack::Pack packed(fq.sequence,fq.quality,1); // packing/compression function
- // Bind parameters to sequence data
- sqlite3_bind_text(stmt,1,fq.instrument.c_str(),-1,SQLITE_TRANSIENT);
- sqlite3_bind_int(stmt,2,fq.run);
- sqlite3_bind_text(stmt,3,fq.flowcell.c_str(),-1,SQLITE_TRANSIENT);
- sqlite3_bind_int(stmt,4,fq.lane);
- sqlite3_bind_int(stmt,5,fq.tile);
- sqlite3_bind_int(stmt,6,fq.x);
- sqlite3_bind_int(stmt,7,fq.y);
- sqlite3_bind_int(stmt,8,fq.pair);
- sqlite3_bind_int(stmt,9,fq.filter == 'Y' ? 1:0);
- sqlite3_bind_int(stmt,10,fq.control);
- sqlite3_bind_text(stmt,11,fq.index.c_str(),-1,SQLITE_TRANSIENT);
- sqlite3_bind_int(stmt,12,packed.qualityFormat());
- sqlite3_bind_blob(stmt,13,packed.rawData(),fq.sequence.length(),SQLITE_TRANSIENT);
- sqlite3_step(stmt);
- sqlite3_reset(stmt);
- ++n;
- fq = {};
- // Show an update every 100k inserts
- if (n % 100000 == 0)
- cerr << "Inserted " << n << " sequences...\r";
- }
- sqlite3_exec(db, "COMMIT TRANSACTION", NULL, NULL, &sql_error_msg);
- sqlite3_finalize(stmt);
- auto endClock = clock() - startClock;
- printf("Pip: %llu sequences imported in %4.2f seconds\n",n,endClock/(double)CLOCKS_PER_SEC);
- return n; // return number of rows inserted
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment