Advertisement
Guest User

Untitled

a guest
Sep 15th, 2019
227
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 12.16 KB | None | 0 0
  1. use rand::Rng;
  2. use std::collections::VecDeque;
  3. use std::time::{Duration, Instant};
  4. use std::path::Path;
  5. use std::fs::File;
  6. use std::error::Error;
  7. use std::io::prelude::*;
  8. use csv::Writer;
  9. use math::round;
  10.  
  11.  
  12.  
  13. fn main() {
  14.  
  15. let start = Instant::now();
  16. let path = "data/output_resp_2.csv";
  17. let mut writer = Writer::from_path(path).unwrap();
  18. let header = vec!["aborts_ps","resp_time","umpire_calls_ps","prob_sent_umpire","prob_umpire_aborts","lam"];
  19. writer.serialize(header).expect("CSV writer error");
  20. let arrival_rate = vec![700.0,800.0,900.0,1000.0,1100.0,1200.0];
  21. for ar in arrival_rate.iter() {
  22. let start1 = Instant::now();
  23. let mut res = alg_sim(10000.0, *ar, 100.0, 200.0, 0.2, 0.8, 51, 1000000.0);
  24. res.push(*ar);
  25. writer.serialize(res).expect("CSV writer error");
  26. println!("{} done",ar);
  27. let duration1 = start1.elapsed();
  28. println!("Time elapsed in alg_sim() is: {:?}", duration1);
  29. }
  30. let duration = start.elapsed();
  31. println!("Time elapsed in alg_sim() is: {:?}", duration);
  32.  
  33.  
  34. // let start: Instant = Instant::now();
  35. // let res = alg_sim(10000.0, 1000.0, 100.0, 200.0, 0.2, 0.8, 51, 1000000.0);
  36. // println!("Aborts/sec {}",res[0]);
  37. // println!("Resp. time {}",res[1]);
  38. // println!("Umpire calls/sec {}",res[2]);
  39. // let duration: Duration = start.elapsed();
  40. // println!("Time elapsed in alg_sim() is: {:?}", duration);
  41.  
  42.  
  43. }
  44.  
  45. enum EventType {
  46. ArrivalEvent,
  47. UpdateEvent,
  48. UmpireEvent,
  49. }
  50.  
  51. struct Transaction {
  52. id: u32,
  53. updates: u32,
  54. updates_done: u32,
  55. predecessor_list: Vec<u32>
  56. }
  57.  
  58. struct TentativeWrite {
  59. tnx_id:u32,
  60. edge_id:u32
  61. }
  62.  
  63. fn increment_f64 (x: &mut f64) -> () {
  64. *x = *x + 1.0;
  65. }
  66.  
  67. fn decrement_f64 (x: &mut f64) -> () {
  68. *x = *x - 1.0;
  69. }
  70.  
  71. fn increment_u32 (x: &mut u32) -> () {
  72. *x = *x + 1;
  73. }
  74.  
  75. fn geom_seq (start_value: f64, ratio: f64, len: usize) -> Vec<f64> {
  76.  
  77. let mut geom_seq: Vec<f64> = Vec::new(); // initialise vector to hold sequence
  78. geom_seq.push(start_value); // push start value to sequence
  79. for i in 1..len {
  80. let next_term = geom_seq[i-1] * ratio; // create next term
  81. geom_seq.push(next_term) ; // push next value to sequence
  82. }
  83.  
  84. geom_seq
  85.  
  86. }
  87.  
  88. fn cum_sum(gs: &Vec<f64>, index: u32) -> f64 {
  89.  
  90. let slice = &gs[0..index as usize];
  91. let cs:f64 = slice.iter().sum();
  92. cs
  93.  
  94. }
  95.  
  96. fn num_updates(start_value: f64, ratio: f64, len: usize) -> u32 {
  97.  
  98. let seq: Vec<f64> = geom_seq(start_value,ratio,len); // create geometric sequence
  99.  
  100. let mut rng = rand::thread_rng(); // generate random number between 0 and 1
  101. let ran: f64 = rng.gen();
  102. // println!("{}",ran);
  103.  
  104. let mut ind: u32 = 1;
  105. let mut cs = cum_sum(&seq,ind); // cumulative sum at first index
  106. // println!("{}",cs);
  107.  
  108. while ran > cs {
  109.  
  110. ind = ind + 1; // increment index
  111. cs = cum_sum(&seq,ind); // update cumulative sum
  112. // println!("{}",cs);
  113. if ind == len as u32 {
  114. break
  115. }
  116. }
  117.  
  118. ind
  119.  
  120. }
  121.  
  122. fn calculate_conflict_prob (start_value: f64, ratio: f64, len: usize, _N:f64, lam:f64, del:f64) -> f64 {
  123.  
  124. let mut seq: Vec<f64> = geom_seq(start_value,ratio,len); // create geometric sequence
  125. for e in 0..seq.len() {
  126. seq[e] = seq[e] * (e as f64 + 1.0); // multiple each element by index
  127. }
  128. let seq_sum: f64 = seq.iter().sum(); // sum sequence
  129. let xi = seq_sum * lam / _N; // particular arrival rate
  130. let _u = (2.5 * xi) / (xi + (2.0 * del)); // prob. of conflict
  131. _u
  132.  
  133. }
  134.  
  135. fn next_event_type(lam:f64, mu:f64, del:f64,_n: f64,_m:f64) -> EventType {
  136.  
  137. let _d: f64 = lam + (_n * del) + (_m * mu);
  138.  
  139. let state_prob: [f64; 3] = [lam / _d, (_n * del) / _d, (_m * mu)/_d];
  140.  
  141. let mut rng = rand::thread_rng();
  142. let ran: f64 = rng.gen();
  143.  
  144. let mut next_event = EventType::ArrivalEvent;
  145. if ran < state_prob[0] {
  146. next_event = EventType::ArrivalEvent;
  147. } else if ran < (state_prob[0] + state_prob[1]) {
  148. next_event = EventType::UpdateEvent;
  149. } else {
  150. next_event = EventType::UmpireEvent;
  151. }
  152.  
  153. next_event
  154.  
  155. }
  156.  
  157. fn arrival_event ( _L: &mut f64, arr: &mut u32, _n: &mut f64, _m: &mut f64, tr: &mut Vec<Transaction>,start_value: f64,ratio: f64,len: usize) {
  158.  
  159. *_L = *_L + *_n + *_m; // increment cumulative number of transactions in system
  160. increment_u32(arr); // increment # transactions that have arrived
  161. increment_f64(_n); // increment the # active transactions
  162. let mut new_transaction = Transaction { // create transaction
  163. id: *arr,
  164. updates: num_updates(start_value,ratio,len),
  165. updates_done: 0,
  166. predecessor_list: Vec::new()
  167. };
  168. tr.push(new_transaction);
  169.  
  170. }
  171.  
  172. fn update_event ( ind: u32,_q: &mut VecDeque<Transaction>,_al: &mut f64,_A: &mut u32, tr: &mut Vec<Transaction>, start_value: f64, ratio: f64, len: usize, _N: f64, lam: f64, del: f64, _n: &mut f64, _m: &mut f64,_prov: &mut Vec<TentativeWrite>) {
  173.  
  174. let mut rng = rand::thread_rng(); // initialise random number generator
  175. let index = rng.gen_range(0, tr.len()); // choose a transaction to attempt an update
  176. let _p: f64 = rng.gen(); // random number to determine event
  177. let _u = calculate_conflict_prob(start_value,ratio,len,_N,lam,del); // calculate conflict probability
  178.  
  179. if _p > _u { // no conflict
  180. if tr[index].updates == 1 { // single update
  181. tr.remove(index); // remove from active transaction list
  182.  
  183. decrement_f64(_n); // decrement active transactions
  184.  
  185. } else { // multiple updates
  186. tr[index].updates_done = tr[index].updates_done + 1; // increment updates completed
  187. let edge = rng.gen_range(0, _N as i32); // choose edge
  188. for pv in _prov.iter() {
  189. if pv.edge_id == edge as u32 && pv.tnx_id != tr[index].id as u32 { // tentative write on this edge: FIX 1.01
  190. tr[index].predecessor_list.push(pv.tnx_id); // add writing transaction to predecessor list
  191. }
  192. }
  193. let new_update = TentativeWrite {
  194. tnx_id: tr[index].id,
  195. edge_id: edge as u32,
  196. };
  197. _prov.push(new_update); // add update to tentative write list
  198. if tr[index].updates == tr[index].updates_done { // all updates are complete
  199. if tr[index].predecessor_list.is_empty() { // clear provisional
  200. _prov.retain(|pv| pv.tnx_id != tr[index].id); // clear tentative writes
  201. tr.remove(index); // remove from active transaction list
  202.  
  203. } else {
  204. increment_f64( _m); // increment number in umpire queue
  205. let x = tr.remove(index); // take ownership of transaction
  206. _q.push_back(x); // put in umpire queue
  207. increment_f64(_al); // increment umpire calls
  208. }
  209. decrement_f64(_n); // decrement the number active
  210. } else {
  211.  
  212. }
  213. }
  214. } else { // there is a conflict
  215. increment_u32(_A); // increment the number of aborts
  216. _prov.retain(|pv| pv.tnx_id != tr[index].id); // clear provisional
  217. tr.remove(index); // remove from active transaction list
  218. decrement_f64(_n); // decrement the number active
  219. }
  220.  
  221. }
  222.  
  223. fn umpire_event(_q: &mut VecDeque<Transaction>, _A: &mut u32, _bet: &mut f64, _hit: &mut Vec<u32>, _prov: &mut Vec<TentativeWrite>, _m: &mut f64) {
  224.  
  225. let mut next_transaction = _q.pop_front(); // get head of queue
  226.  
  227. match &mut next_transaction {
  228. Some(transaction) => {
  229. if _hit.contains(&transaction.id) { // check if transaction id in kill list
  230. increment_u32(_A); // increment aborts
  231. _hit.retain(|t| t != &transaction.id); // remove from kill list
  232. increment_f64(_bet); // increment umpire aborts
  233. } else {
  234. // fix 1.03
  235. // let mut to_remove = Vec::new();
  236. // for p in transaction.predecessor_list.iter() {
  237. // _q.retain(|t| {
  238. // let mut delete = true;
  239. // if t.id == *p {
  240. // delete = false;
  241. // to_remove.push(t.id);
  242. // };
  243. // delete
  244. // })
  245. // };
  246. // for i in to_remove.iter() {
  247. // transaction.predecessor_list.retain(|t| t != i);
  248. // increment_u32(_A);
  249. // increment_f64(_bet);
  250. // }
  251.  
  252. _hit.append(&mut transaction.predecessor_list); // add to hit list
  253. }
  254. _prov.retain(|pv| pv.tnx_id != transaction.id); // clear provisional
  255. for e in _q.iter_mut() { // clean predecessor list of transactions in queue: FIX 1.02
  256. e.predecessor_list.retain(|t| *t != transaction.id);
  257. }
  258. decrement_f64(_m);
  259. }
  260. None => (),
  261. }
  262.  
  263. }
  264.  
  265. fn alg_sim (_number_edges:f64, _arrival_rate:f64, _service_rate:f64, _network_delay:f64, start_value: f64, ratio: f64, len: usize, _arrivals:f64) -> Vec<f64> {
  266.  
  267. // data structures
  268. let mut _umpire_queue: VecDeque<Transaction> = VecDeque::new(); // umpire queue
  269. let mut _active_transactions: Vec<Transaction> = Vec::new(); // active transactions
  270. let mut _kill_list: Vec<u32> = Vec::new(); // kill list
  271. let mut _tentative_writes: Vec<TentativeWrite> = Vec::new(); // tentative writes
  272.  
  273. // system variables
  274. let mut _number_active_transactions: f64 = 0.0; // # active transactions
  275. let mut _number_umpire_queue: f64 = 0.0; // # transactions in the umpire queue
  276. let mut _calls_umpire: f64 = 0.0; // # calls to umpire
  277. let mut _aborts_umpire: f64 = 0.0; // # aborts by umpire
  278. let mut _transactions_arrived: u32 = 0; // # transactions that have arrived
  279. let mut _transactions_aborted: u32 = 0; // # transactions aborted
  280. let mut _iteration: u32 = 0; // iteration counter
  281. let mut _transactions_in_system: f64 = 0.0; // cumulative sum of transactions in system
  282.  
  283.  
  284.  
  285. // conflict probability
  286. let _conflict_prob: f64 = calculate_conflict_prob(start_value, ratio, len, _number_edges, _arrival_rate, _network_delay);
  287.  
  288. // initialise random number generator
  289. let mut rng = rand::thread_rng();
  290.  
  291. while _transactions_arrived < _arrivals as u32 {
  292. // println!("{}",_transactions_arrived );
  293. let next_event: EventType = next_event_type(_arrival_rate, _service_rate, _network_delay, _number_active_transactions, _number_umpire_queue); // calculate next event
  294. increment_u32(&mut _iteration); // increment iteration
  295.  
  296. match next_event {
  297. EventType::ArrivalEvent => {
  298. arrival_event(&mut _transactions_in_system, &mut _transactions_arrived, &mut _number_active_transactions, &mut _number_umpire_queue, &mut _active_transactions, start_value, ratio, len);
  299. },
  300. EventType::UpdateEvent => {
  301. update_event( _iteration,&mut _umpire_queue, &mut _calls_umpire, &mut _transactions_aborted, &mut _active_transactions, start_value, ratio, len, _number_edges, _arrival_rate, _network_delay, &mut _number_active_transactions, &mut _number_umpire_queue, &mut _tentative_writes);
  302. },
  303. EventType::UmpireEvent => {
  304. umpire_event(&mut _umpire_queue, &mut _transactions_aborted, &mut _aborts_umpire, &mut _kill_list, &mut _tentative_writes, &mut _number_umpire_queue);
  305. },
  306. }
  307. }
  308.  
  309.  
  310.  
  311. let aborts_ps: f64 = _transactions_aborted as f64 * (_arrival_rate / _arrivals);
  312. let resp_time: f64 = _transactions_in_system / (_arrivals * _arrival_rate);
  313. let umpire_calls_ps: f64 = _calls_umpire as f64 * (_arrival_rate / _arrivals);
  314. let prob_sent_umpire: f64 = round::ceil(_calls_umpire/_arrivals,4);
  315. let prob_umpire_aborts: f64 = round::ceil(_aborts_umpire/_calls_umpire,4);
  316. let output = vec![aborts_ps,resp_time,umpire_calls_ps,prob_sent_umpire,prob_umpire_aborts];
  317. output
  318.  
  319. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement