Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use rand::Rng;
- use std::collections::VecDeque;
- use std::time::{Duration, Instant};
- use std::path::Path;
- use std::fs::File;
- use std::error::Error;
- use std::io::prelude::*;
- use csv::Writer;
- use math::round;
- fn main() {
- let start = Instant::now();
- let path = "data/output_resp_2.csv";
- let mut writer = Writer::from_path(path).unwrap();
- let header = vec!["aborts_ps","resp_time","umpire_calls_ps","prob_sent_umpire","prob_umpire_aborts","lam"];
- writer.serialize(header).expect("CSV writer error");
- let arrival_rate = vec![700.0,800.0,900.0,1000.0,1100.0,1200.0];
- for ar in arrival_rate.iter() {
- let start1 = Instant::now();
- let mut res = alg_sim(10000.0, *ar, 100.0, 200.0, 0.2, 0.8, 51, 1000000.0);
- res.push(*ar);
- writer.serialize(res).expect("CSV writer error");
- println!("{} done",ar);
- let duration1 = start1.elapsed();
- println!("Time elapsed in alg_sim() is: {:?}", duration1);
- }
- let duration = start.elapsed();
- println!("Time elapsed in alg_sim() is: {:?}", duration);
- // let start: Instant = Instant::now();
- // let res = alg_sim(10000.0, 1000.0, 100.0, 200.0, 0.2, 0.8, 51, 1000000.0);
- // println!("Aborts/sec {}",res[0]);
- // println!("Resp. time {}",res[1]);
- // println!("Umpire calls/sec {}",res[2]);
- // let duration: Duration = start.elapsed();
- // println!("Time elapsed in alg_sim() is: {:?}", duration);
- }
- enum EventType {
- ArrivalEvent,
- UpdateEvent,
- UmpireEvent,
- }
- struct Transaction {
- id: u32,
- updates: u32,
- updates_done: u32,
- predecessor_list: Vec<u32>
- }
- struct TentativeWrite {
- tnx_id:u32,
- edge_id:u32
- }
- fn increment_f64 (x: &mut f64) -> () {
- *x = *x + 1.0;
- }
- fn decrement_f64 (x: &mut f64) -> () {
- *x = *x - 1.0;
- }
- fn increment_u32 (x: &mut u32) -> () {
- *x = *x + 1;
- }
- fn geom_seq (start_value: f64, ratio: f64, len: usize) -> Vec<f64> {
- let mut geom_seq: Vec<f64> = Vec::new(); // initialise vector to hold sequence
- geom_seq.push(start_value); // push start value to sequence
- for i in 1..len {
- let next_term = geom_seq[i-1] * ratio; // create next term
- geom_seq.push(next_term) ; // push next value to sequence
- }
- geom_seq
- }
- fn cum_sum(gs: &Vec<f64>, index: u32) -> f64 {
- let slice = &gs[0..index as usize];
- let cs:f64 = slice.iter().sum();
- cs
- }
- fn num_updates(start_value: f64, ratio: f64, len: usize) -> u32 {
- let seq: Vec<f64> = geom_seq(start_value,ratio,len); // create geometric sequence
- let mut rng = rand::thread_rng(); // generate random number between 0 and 1
- let ran: f64 = rng.gen();
- // println!("{}",ran);
- let mut ind: u32 = 1;
- let mut cs = cum_sum(&seq,ind); // cumulative sum at first index
- // println!("{}",cs);
- while ran > cs {
- ind = ind + 1; // increment index
- cs = cum_sum(&seq,ind); // update cumulative sum
- // println!("{}",cs);
- if ind == len as u32 {
- break
- }
- }
- ind
- }
- fn calculate_conflict_prob (start_value: f64, ratio: f64, len: usize, _N:f64, lam:f64, del:f64) -> f64 {
- let mut seq: Vec<f64> = geom_seq(start_value,ratio,len); // create geometric sequence
- for e in 0..seq.len() {
- seq[e] = seq[e] * (e as f64 + 1.0); // multiple each element by index
- }
- let seq_sum: f64 = seq.iter().sum(); // sum sequence
- let xi = seq_sum * lam / _N; // particular arrival rate
- let _u = (2.5 * xi) / (xi + (2.0 * del)); // prob. of conflict
- _u
- }
- fn next_event_type(lam:f64, mu:f64, del:f64,_n: f64,_m:f64) -> EventType {
- let _d: f64 = lam + (_n * del) + (_m * mu);
- let state_prob: [f64; 3] = [lam / _d, (_n * del) / _d, (_m * mu)/_d];
- let mut rng = rand::thread_rng();
- let ran: f64 = rng.gen();
- let mut next_event = EventType::ArrivalEvent;
- if ran < state_prob[0] {
- next_event = EventType::ArrivalEvent;
- } else if ran < (state_prob[0] + state_prob[1]) {
- next_event = EventType::UpdateEvent;
- } else {
- next_event = EventType::UmpireEvent;
- }
- next_event
- }
- fn arrival_event ( _L: &mut f64, arr: &mut u32, _n: &mut f64, _m: &mut f64, tr: &mut Vec<Transaction>,start_value: f64,ratio: f64,len: usize) {
- *_L = *_L + *_n + *_m; // increment cumulative number of transactions in system
- increment_u32(arr); // increment # transactions that have arrived
- increment_f64(_n); // increment the # active transactions
- let mut new_transaction = Transaction { // create transaction
- id: *arr,
- updates: num_updates(start_value,ratio,len),
- updates_done: 0,
- predecessor_list: Vec::new()
- };
- tr.push(new_transaction);
- }
- fn update_event ( ind: u32,_q: &mut VecDeque<Transaction>,_al: &mut f64,_A: &mut u32, tr: &mut Vec<Transaction>, start_value: f64, ratio: f64, len: usize, _N: f64, lam: f64, del: f64, _n: &mut f64, _m: &mut f64,_prov: &mut Vec<TentativeWrite>) {
- let mut rng = rand::thread_rng(); // initialise random number generator
- let index = rng.gen_range(0, tr.len()); // choose a transaction to attempt an update
- let _p: f64 = rng.gen(); // random number to determine event
- let _u = calculate_conflict_prob(start_value,ratio,len,_N,lam,del); // calculate conflict probability
- if _p > _u { // no conflict
- if tr[index].updates == 1 { // single update
- tr.remove(index); // remove from active transaction list
- decrement_f64(_n); // decrement active transactions
- } else { // multiple updates
- tr[index].updates_done = tr[index].updates_done + 1; // increment updates completed
- let edge = rng.gen_range(0, _N as i32); // choose edge
- for pv in _prov.iter() {
- if pv.edge_id == edge as u32 && pv.tnx_id != tr[index].id as u32 { // tentative write on this edge: FIX 1.01
- tr[index].predecessor_list.push(pv.tnx_id); // add writing transaction to predecessor list
- }
- }
- let new_update = TentativeWrite {
- tnx_id: tr[index].id,
- edge_id: edge as u32,
- };
- _prov.push(new_update); // add update to tentative write list
- if tr[index].updates == tr[index].updates_done { // all updates are complete
- if tr[index].predecessor_list.is_empty() { // clear provisional
- _prov.retain(|pv| pv.tnx_id != tr[index].id); // clear tentative writes
- tr.remove(index); // remove from active transaction list
- } else {
- increment_f64( _m); // increment number in umpire queue
- let x = tr.remove(index); // take ownership of transaction
- _q.push_back(x); // put in umpire queue
- increment_f64(_al); // increment umpire calls
- }
- decrement_f64(_n); // decrement the number active
- } else {
- }
- }
- } else { // there is a conflict
- increment_u32(_A); // increment the number of aborts
- _prov.retain(|pv| pv.tnx_id != tr[index].id); // clear provisional
- tr.remove(index); // remove from active transaction list
- decrement_f64(_n); // decrement the number active
- }
- }
- fn umpire_event(_q: &mut VecDeque<Transaction>, _A: &mut u32, _bet: &mut f64, _hit: &mut Vec<u32>, _prov: &mut Vec<TentativeWrite>, _m: &mut f64) {
- let mut next_transaction = _q.pop_front(); // get head of queue
- match &mut next_transaction {
- Some(transaction) => {
- if _hit.contains(&transaction.id) { // check if transaction id in kill list
- increment_u32(_A); // increment aborts
- _hit.retain(|t| t != &transaction.id); // remove from kill list
- increment_f64(_bet); // increment umpire aborts
- } else {
- // fix 1.03
- // let mut to_remove = Vec::new();
- // for p in transaction.predecessor_list.iter() {
- // _q.retain(|t| {
- // let mut delete = true;
- // if t.id == *p {
- // delete = false;
- // to_remove.push(t.id);
- // };
- // delete
- // })
- // };
- // for i in to_remove.iter() {
- // transaction.predecessor_list.retain(|t| t != i);
- // increment_u32(_A);
- // increment_f64(_bet);
- // }
- _hit.append(&mut transaction.predecessor_list); // add to hit list
- }
- _prov.retain(|pv| pv.tnx_id != transaction.id); // clear provisional
- for e in _q.iter_mut() { // clean predecessor list of transactions in queue: FIX 1.02
- e.predecessor_list.retain(|t| *t != transaction.id);
- }
- decrement_f64(_m);
- }
- None => (),
- }
- }
- fn alg_sim (_number_edges:f64, _arrival_rate:f64, _service_rate:f64, _network_delay:f64, start_value: f64, ratio: f64, len: usize, _arrivals:f64) -> Vec<f64> {
- // data structures
- let mut _umpire_queue: VecDeque<Transaction> = VecDeque::new(); // umpire queue
- let mut _active_transactions: Vec<Transaction> = Vec::new(); // active transactions
- let mut _kill_list: Vec<u32> = Vec::new(); // kill list
- let mut _tentative_writes: Vec<TentativeWrite> = Vec::new(); // tentative writes
- // system variables
- let mut _number_active_transactions: f64 = 0.0; // # active transactions
- let mut _number_umpire_queue: f64 = 0.0; // # transactions in the umpire queue
- let mut _calls_umpire: f64 = 0.0; // # calls to umpire
- let mut _aborts_umpire: f64 = 0.0; // # aborts by umpire
- let mut _transactions_arrived: u32 = 0; // # transactions that have arrived
- let mut _transactions_aborted: u32 = 0; // # transactions aborted
- let mut _iteration: u32 = 0; // iteration counter
- let mut _transactions_in_system: f64 = 0.0; // cumulative sum of transactions in system
- // conflict probability
- let _conflict_prob: f64 = calculate_conflict_prob(start_value, ratio, len, _number_edges, _arrival_rate, _network_delay);
- // initialise random number generator
- let mut rng = rand::thread_rng();
- while _transactions_arrived < _arrivals as u32 {
- // println!("{}",_transactions_arrived );
- let next_event: EventType = next_event_type(_arrival_rate, _service_rate, _network_delay, _number_active_transactions, _number_umpire_queue); // calculate next event
- increment_u32(&mut _iteration); // increment iteration
- match next_event {
- EventType::ArrivalEvent => {
- arrival_event(&mut _transactions_in_system, &mut _transactions_arrived, &mut _number_active_transactions, &mut _number_umpire_queue, &mut _active_transactions, start_value, ratio, len);
- },
- EventType::UpdateEvent => {
- update_event( _iteration,&mut _umpire_queue, &mut _calls_umpire, &mut _transactions_aborted, &mut _active_transactions, start_value, ratio, len, _number_edges, _arrival_rate, _network_delay, &mut _number_active_transactions, &mut _number_umpire_queue, &mut _tentative_writes);
- },
- EventType::UmpireEvent => {
- umpire_event(&mut _umpire_queue, &mut _transactions_aborted, &mut _aborts_umpire, &mut _kill_list, &mut _tentative_writes, &mut _number_umpire_queue);
- },
- }
- }
- let aborts_ps: f64 = _transactions_aborted as f64 * (_arrival_rate / _arrivals);
- let resp_time: f64 = _transactions_in_system / (_arrivals * _arrival_rate);
- let umpire_calls_ps: f64 = _calls_umpire as f64 * (_arrival_rate / _arrivals);
- let prob_sent_umpire: f64 = round::ceil(_calls_umpire/_arrivals,4);
- let prob_umpire_aborts: f64 = round::ceil(_aborts_umpire/_calls_umpire,4);
- let output = vec![aborts_ps,resp_time,umpire_calls_ps,prob_sent_umpire,prob_umpire_aborts];
- output
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement