Advertisement
rosuav

durable.pike

Oct 24th, 2012
2,830
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Pike 2.53 KB | None | 0 0
  1. //Test PostgreSQL transaction durability
  2.  
  3. int pool=100,nthrd=80,testlength=60; //TODO: Support command-line parameters for these
  4. mapping(int:int) possible=([]);
  5. mapping(int:int) confirmed=([]);
  6.  
  7. object getdb()
  8. {
  9.     object db=Sql.Sql("pgsql://USERNAME:PASSWORD@HOST/DATABASE",(["reconnect":0]));
  10.     db->master_sql->settimeout(10);
  11.     return db;
  12. }
  13.  
  14. void transact()
  15. {
  16.     object db=getdb();
  17.     mixed ex=catch
  18.     {
  19.         while (testlength)
  20.         {
  21.             db->big_query("begin");
  22.             array(int) cur=({random(pool)+1,0,time()});
  23.             db->big_query("update public.durability set value=value+1 where id=:id",(["id":cur[0]]));
  24.             db->big_query("update public.durability set value=value+1 where id=0");
  25.             possible[cur[0]]++; possible[0]++;
  26.             db->big_query("commit");
  27.             confirmed[cur[0]]++; confirmed[0]++;
  28.         }
  29.     };
  30.     //Currently, I don't actually much care about exceptions. Should probably do something with them.
  31.     catch {db->master_sql->close(); destruct(db);}; //Ignore errors on closing connection
  32. }
  33.  
  34. int main()
  35. {
  36.     object db=getdb(); //Initialize in autocommit mode
  37.     db->big_query("create table if not exists public.durability (id integer primary key,value integer not null default 0)");
  38.     db->big_query("truncate public.durability");
  39.     db->big_query("insert into public.durability (id) select generate_series(0,:pool)",(["pool":pool]));
  40.     db->master_sql->close(); destruct(db);
  41.     signal(2,lambda() {error("Ctrl-C!");});
  42.     int start=time();
  43.     write("Durability test: thrds %d, pool %d, time %d, start %s",nthrd,pool,testlength,ctime(start));
  44.     array threads=({ }); while (nthrd--) threads+=({Thread.Thread(transact)});
  45.     catch {while (has_value(threads->status(),0))
  46.     {
  47.         float tm=time(start);
  48.         werror("%d/%fs=%dtps...\r",confirmed[0],tm,(int)(confirmed[0]/tm));
  49.         if (tm>testlength) break;
  50.         sleep(1);
  51.     }};
  52.     signal(2); //Reset Ctrl-C handler
  53.     testlength=0; threads->kill();
  54.     write("Completed %d transactions in %fs, %f tps.\n",confirmed[0],time(start),confirmed[0]/time(start));
  55.     write("Reconnecting to db..."); while (catch {db=getdb();} || !db) {sleep(1); write(".");}
  56.     write("\nReconnected, checking.\n");
  57.     int sum=0;
  58.     foreach (db->typed_query("select * from public.durability order by id"),mapping row)
  59.     {
  60.         if (row->id) sum+=row->value; else sum-=row->value;
  61.         if (row->value<confirmed[row->id] || row->value>possible[row->id]) write("Fail: Row %d [%d] s/be %d-%d\n",row->id,row->value,confirmed[row->id],possible[row->id]);
  62.     }
  63.     if (sum) write("*** ERROR: Inconsistent data [sum is wrong by %d] ***\n",sum);
  64.     write("Verification complete.\n");
  65.     destruct(db);
  66. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement