Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #![feature(integer_atomics)]
- use std::cell::RefCell;
- use std::thread;
- use std::io;
- extern crate crossbeam;
- use crossbeam::sync::MsQueue;
- use crossbeam::scope;
- extern crate time;
- struct Point {
- x: i64
- }
- use std::sync::atomic::{AtomicUsize, AtomicPtr, Ordering};
- use std::mem;
- use std::ptr;
- use std::boxed::Box;
- struct Node<T> {
- cells: [AtomicPtr<T>; 4],
- next: AtomicPtr<Node<T>>
- }
- struct Queue_<T> {
- node: Node<T>,
- put_index: AtomicUsize,
- pop_index: AtomicUsize,
- default_T: AtomicPtr<T>,
- default_node: AtomicPtr<Node<T>>
- }
- fn test_scoped_thread() {
- let node: Node<Point> = Node { cells: [AtomicPtr::default(), AtomicPtr::default()
- , AtomicPtr::default(), AtomicPtr::default()], next: AtomicPtr::default()};
- let queue: Queue_<Point> = Queue_ {node: node, put_index: AtomicUsize::new(0)
- , pop_index: AtomicUsize::new(0)
- , default_T: AtomicPtr::default()
- , default_node: AtomicPtr::default()};
- let shared_queue = &queue;
- println!("");
- scope(|scope| {
- scope.spawn(move || {
- let shared_node = &shared_queue.node;
- let mut vals: [usize; 4] = [0, 0, 0, 0];
- for i in 0..4 {
- let put_index = shared_queue.put_index.fetch_add(1, Ordering::Relaxed);
- if shared_queue.default_T.load(Ordering::Relaxed) == shared_node.cells[put_index].load(Ordering::Relaxed) {
- shared_node.cells[put_index].compare_and_swap(shared_queue.default_T.load(Ordering::Relaxed), &mut Point{x: i}, Ordering::Relaxed);
- }
- }
- thread::sleep_ms(3000);
- println!("over");
- });
- thread::sleep_ms(1000);
- scope.spawn(move || {
- let shared_node = &shared_queue.node;
- for _ in 0..4 {
- let pop_index = shared_queue.pop_index.fetch_add(1, Ordering::Relaxed);
- let val = &shared_node.cells[pop_index];
- while shared_queue.default_T.load(Ordering::Relaxed) == val.load(Ordering::Relaxed){
- // println!("value is {:?}", val.load(Ordering::Relaxed));
- }
- println!("value address is {:?} ", val.load(Ordering::Relaxed));
- let vp = unsafe{ &*val.load(Ordering::Relaxed) };
- println!("value is {}", vp.x );
- }
- });
- });
- println!("put {}", shared_queue.put_index.load(Ordering::Relaxed));
- println!("put {}", shared_queue.put_index.load(Ordering::Relaxed));
- }
- fn test_vec() {
- // 迭代器可以收集到 vector
- let mut collected_iterator: Vec<i32> = (2..10).collect();
- println!("Collected (0..10) into: {:?}", collected_iterator);
- // `vec!` 宏可用来初始化一个 vector
- let mut xs = vec![1i32, 2, 3];
- println!("Initial vector: {:?}", xs);
- // 在 vector 的尾部插入一个新的元素
- println!("Push 4 into the vector");
- xs.push(4);
- println!("Vector: {:?}", xs);
- // 报错!不可变 vector 不可增长
- {
- for i in 1..10 {
- let m = i;
- collected_iterator.push(m);
- }
- }
- // 改正 ^ 将此行注释掉
- // `len` 方法获得一个 vector 的当前大小
- println!("Vector size: {}", xs.len());
- // 在中括号上加索引(索引从 0 开始)
- println!("Second element: {}", xs[1]);
- // `pop` 移除 vector 的最后一个元素并将它返回
- println!("Pop last element: {:?}", xs.pop());
- // 超出索引范围将抛出一个 panic
- // println!("Fourth element: {}", xs[3]);
- println!("one: {:?}", collected_iterator.pop());
- println!("one: {:?}", collected_iterator.pop());
- }
- fn test_crossbeam() {
- let q: MsQueue<i64> = MsQueue::new();
- assert!(q.is_empty());
- const CONC_COUNT: i64 = 100000000;
- const THREAD_NUM: i64 = 4;
- const CONC_C: i64 = CONC_COUNT / THREAD_NUM;
- println!("\nstart");
- let start = time::now();
- scope(|scope| {
- for _ in 0..THREAD_NUM {
- scope.spawn(|| {
- for i in 0..CONC_C {
- q.push(i);
- q.try_pop();
- }
- });
- }
- });
- let end = time::now();//获取结束时间
- println!("done!duration: {:?}\n", (end-start));
- let j: i64 = 8;
- let k: Point = Point { x: j};
- println!("j : {} {}", k.x, j);
- }
- fn main() {
- //test_scoped_thread();
- //test_vec();
- test_crossbeam();
- }
Add Comment
Please, Sign In to add comment