fwoodruff

Atomic Vector (draft)

Dec 4th, 2020 (edited)
806
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 18.93 KB | None | 0 0
  1. /**
  2. Copyright 2020 Frederick Woodruff
  3.  
  4. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
  5.  
  6. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
  7.  
  8. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  9. */
  10.  
  11. #include <atomic>
  12. #include <type_traits>
  13. #include <cstdlib.h>
  14.  
  15. #pragma once
  16.  
  17. namespace fbw {
  18.    
  19. template<int unused>
  20. std::atomic<void*>& hazard_pointer_for_this_thread();
  21.    
  22. template<int id> void release_hazard();
  23. template<typename T> void reclaim(T*);
  24. void delete_nodes_with_no_hazards();
  25.  
  26. template <template<class> typename H, typename T>
  27. class fetch_op_support;
  28.  
  29. template<typename T>
  30. class atomic_vector {
  31. private:
  32.   buffer_handle load_buffers();
  33.   void release_buffers();
  34.   size_t capacity_hint();
  35.   bool cas_buffers(size_t& expected_capacity, buffer*);
  36.   bool cas_index_weak(size_t, T&, T);
  37.   bool cas_index_strong(size_t, T&, T);
  38.   struct buffer;
  39.   struct buffer_handle;
  40.   struct pack;
  41.   static constexpr size_t INITIAL_CAPACITY = 30;
  42. public:
  43.   class  atomic_reference;
  44.  
  45.   atomic_vector(const atomic_vector&) = delete;
  46.   atomic_vector operator=(const atomic_vector&) = delete;
  47.    
  48.   static constexpr bool is_always_lock_free =
  49.     std::atomic<pack>::is_always_lock_free &&
  50.     std::atomic<buffer_handle>::is_always_lock_free;
  51.      
  52.   atomic_vector() {  
  53.     auto* buff = new (INITIAL_CAPACITY) buffer;
  54.     atomic_handle.store({buff,nullptr});
  55.     size.store(0);
  56.   }
  57.  
  58.   ~atomic_vector() {
  59.     auto buffers = atomic_handle.load();
  60.     assert(!buffers.old_ptr);
  61.     delete buffers.ptr;
  62.   }
  63.  
  64.   /* returns index object was placed at.
  65.      This lets us use the container as a
  66.      wait-free stack if a sufficiently
  67.      large buffer has been reserved.
  68.      
  69.      push_back not possible because
  70.      atomics are not moveable.
  71.   */
  72.   size_t emplace_back(T val) {
  73.     auto expected_cap = capacity_hint();
  74.     auto idx = size.fetch_add(1);
  75.     auto new_cap = max(idx+1,expected_cap*2);
  76.     if(idx > expected_cap) {
  77.        auto* pbuffer = new (new_cap) buffer;
  78.        bool success_code;
  79.        do {
  80.          success_code = cas_buffers(expected_cap, pbuffer);
  81.        } while (idx > expected_cap && !success_code
  82.                && size.load() > idx);
  83.        if(!success_code) {
  84.           delete buff;
  85.        }
  86.     }
  87.     T temp;
  88.     cas_index_strong(idx, temp, temp);
  89.     cas_index_strong(idx, temp, val);
  90.   }
  91.  
  92.   void pop_back() {
  93.     size--;
  94.   }
  95.    
  96.   void reserve(size_t capacity) {
  97.     auto expected_cap = capacity_hint();
  98.     if(capacity > expected_cap) {
  99.        auto* pbuffer = new(capacity) buffer;
  100.        bool success_code;
  101.        do {
  102.          success_code = cas_buffers(expected_cap, pbuffer);
  103.        } while (capacity > expected_cap && !success_code);
  104.        if(!success_code) {
  105.           delete buff;
  106.        }
  107.   }
  108.      
  109.   void resize(size_t siz) {
  110.     reserve(siz);
  111.     size.store(siz);
  112.   }
  113.  
  114.   void shrink_to_fit() {
  115.     auto idx = size.load();
  116.     if(idx < cap) {
  117.        auto* shrunk_buffer = new(idx) buffer;
  118.        if(!cas_buffers(cap, shrunk_buffer)) {
  119.           delete shrunk_buffer;
  120.        }
  121.     }
  122.   }
  123.  
  124.   size_t size() {
  125.     auto s = size.load();
  126.     return max(s,0);
  127.   }
  128.    
  129.   /*
  130.    we lose linearisability
  131.    but not correctness when shrink_to_fit
  132.    and emplace_back ard called concurrently
  133.   */
  134.   size_t capacity() {
  135.     return capacity_hint();
  136.   }
  137.  
  138.   void clear() {
  139.     size.store(0);
  140.     auto* buff = new(1) buffer;
  141.     while(!cas_buffers)
  142.   }
  143.  
  144.   atomic_reference operator[](size_t idx) {
  145.     return atomic_reference(idx,this);
  146.   }
  147.      
  148.   atomic_reference back() {
  149.     auto idx = size.load()-1;
  150.     assert(idx > 0);
  151.     return atomic_reference(idx,this);
  152.  
  153.   atomic_reference front() {
  154.     return atomic_reference(0,this);
  155.   }
  156.      
  157.   void assign(size_t count, const T& value) {
  158.      size_t cap = (count*3)/2 + 1;
  159.      auto* pbuffer = new(cap) buffer;
  160.      for(int i = 0; i < count; ++i) {
  161.         pbuffer->array[i] = {value, state::active};
  162.      }
  163.      auto expected_cap = capacity_hint();
  164.      while(!cas_buffers(expected_cap, pbuffer));
  165.      size.store(count);
  166.   }
  167.      
  168.  
  169.   class atomic_reference : fetch_op_support<atomic_reference> {
  170.     size_t idx;
  171.     atomic_vector* v;
  172.   public:
  173.     atomic_reference(const atomic_reference&) = delete;
  174.     atomic_reference
  175.         operator=(const atomic_reference&) = delete;
  176.      
  177.     bool compare_exchange_weak(
  178.              T& expected,
  179.              T desired) {
  180.       assert(idx < v->size.load());
  181.       return v->cas_index_weak(idx, expected, desired)==0;
  182.     }
  183.  
  184.     bool compare_exchange_strong(
  185.              T& expected,
  186.              T desired) {
  187.        assert(idx < v->size.load());
  188.        return v->cas_index_strong(idx, expected, desired)==0;
  189.     }
  190.      
  191.     T load() {
  192.       T ret;
  193.       compare_exchange_strong(ret,ret);
  194.       return ret;
  195.     }
  196.      
  197.     operator T() {
  198.       return load();
  199.     }
  200.      
  201.     T exchange(T new_value) {
  202.       T temp;
  203.       while(!compare_exchange_weak(temp, new_value));
  204.       return temp;
  205.     }
  206.      
  207.     void store(T new_value) {
  208.       return (void)exchange(new_value);
  209.     }
  210.   };
  211.  
  212.   friend class atomic_reference;
  213.  
  214. private:
  215.   std::atomic<buffer_handle> atomic_handle;
  216.   std::atomic<intptr_t> size;
  217.   struct awaitable_ptr {
  218.     awaitable* ptr;
  219.     long count;
  220.   };
  221.      
  222.   void push_awaitable(awaitable* waitable) {
  223.      awaitable_ptr old_head = awaitables.load();
  224.      do {
  225.         waitable->next = old_head.ptr;
  226.      while(awaitables.compare_exchange_weak(old_head,
  227.                                             {waitable,1});
  228.   }
  229.  
  230.   bool pop_awaitable(T value) {
  231.     awaitable_ptr old_head = awaitables.load();
  232.     if(!old_head.ptr) {
  233.        return false;
  234.     }
  235.     while(true) {
  236.        while(!awaitables.
  237.         compare_exchange_weak(old_head,{old_head.ptr,
  238.                                         old_head.count+1);
  239.        if(!old_head.ptr) {
  240.            return false;
  241.        }                          
  242.        if(awaitables.
  243.          compare_exchange_strong(old_head,
  244.                                 old_head.ptr->next) {
  245.           /* here we have ownership of the awaitable
  246.              and can set its value but it is not safe
  247.              to resume the coroutine because another
  248.              thread may pathologically dereference
  249.              old_head.ptr->next after its coroutine
  250.              frame has been deleted
  251.           */
  252.           old_head.ptr->value = value;
  253.           long increase = old_head.count-2;
  254.           if(!old_head.ptr->count+=increase) {
  255.             old_head.ptr->co_handle.resume();
  256.           }
  257.           return true;
  258.        } else if(!ptr->internal_count--) {
  259.           old_head.ptr->co_handle.resume();
  260.        }
  261.     }
  262.   }
  263.  
  264.   struct buffer_handle {
  265.     buffer* ptr;
  266.     buffer* old_ptr;
  267.   };
  268.  
  269.   enum class state : unsigned char {
  270.       active,
  271.       locked,
  272.       uninitialised
  273.   };
  274.  
  275.   struct pack {
  276.     T value;
  277.     state mark;
  278.   };
  279.  
  280.   struct buffer {
  281.     size_t capacity;
  282.     atomic<pack> array[1];
  283.     static void* operator new(size_t sz, size_t capacity) {
  284.        buffer* pbuff =
  285.         ::operator new(sz +
  286.                (min(capacity-1,1)) * sizeof(atomic<pack>);
  287.       pbuff->array =
  288.           new (&pbuff->array[0]) atomic<pack>[capacity];
  289.       for(int i = 0; i < capacity; ++i) {
  290.          pbuff->array[i].store({T(), state::uninitialised});
  291.       }
  292.       pbuff->capacity = capacity;
  293.       return pbuff;
  294.     }
  295.   };                  
  296.  
  297.    
  298.   buffer_handle sync_buffer() {
  299.       auto buffers = load_buffers();
  300.       while(buffers.old_ptr) {
  301.         transfer_buffers(buffers.old_ptr,
  302.                        buffers.ptr);
  303.         release_buffers();
  304.         if(atomic_buffer.compare_exchange_strong(
  305.               buffer,
  306.              {buffer.ptr, nullptr})) {
  307.          
  308.            reclaim(buffer.old_ptr);
  309.            buffer.old_ptr = nullptr;
  310.            return buffers;
  311.         }
  312.         buffers = load_buffers();
  313.      }
  314.    }
  315.    
  316.      
  317.   bool cas_buffer(size_t& expected
  318.                            buffer* new_buffer) {
  319.     auto cap  = capacity_hint();
  320.     if(cap != expected) {
  321.       expected = cap;
  322.       return false;
  323.     }
  324.     auto buffers = sync_buffer();
  325.     buffer_handle new_buffers = {new_buffer, buffers.ptr};
  326.  
  327.     if(!atomic_buffer.compare_exchange_strong(
  328.                         buffers,
  329.                         new_buffers) {
  330.         expected = capacity_hint();
  331.         return false;
  332.     }
  333.     sync_buffer();
  334.     delete_nodes_with_no_hazards();
  335.     return true;
  336.   }
  337.  
  338.   template<bool strong>
  339.   bool cas_index(size_t idx,
  340.                  T& expected,
  341.                  T desired) {
  342.     while(true) {
  343.       auto buffers = load_buffers();
  344.       if(buffers.ptr->capacity < idx) {
  345.         release_buffers();
  346.         expected = T();
  347.         return false;
  348.       }
  349.       if(buffers.ptr) {
  350.         transfer_buffer_index(idx, buffers.old_ptr,
  351.                               buffers.ptr);
  352.       }
  353.       release_old_buffer();
  354.       pack mark_expected = { expected, state::active };
  355.       pack mark_desired  = { desired,  state::active };
  356.       auto& ref = buffers.ptr->array[idx];
  357.       if constexpr (strong) {
  358.         if(ref.compare_exchange_strong(mark_expected,
  359.                                        mark_desired)) {
  360.           release_new_buffer();
  361.           return true;
  362.         }
  363.       } else {
  364.         if(ref.compare_exchange_weak(mark_expected,
  365.                                      mark_desired)) {
  366.           release_new_buffer();
  367.           return true;
  368.         }
  369.       }
  370.       release_new_buffer();
  371.       expected = mark_expected.value;
  372.       if(mark_expected.mark == state::active) {
  373.         return false;
  374.       }
  375.     }
  376.   }
  377.      
  378.   bool cas_index_weak(size_t idx, T& expected, T desired) {
  379.     return cas_index<false>(idx, expected, desired);
  380.   }
  381.      
  382.   bool cas_index_strong(size_t idx, T& expected, T desired) {
  383.     return cas_index<true>(idx, expected, desired);
  384.   }
  385.  
  386.        
  387.   void transfer_buffer_index(size_t idx, buffer* old_buffer,
  388.                            buffer* new_buffer) {
  389.     auto cap = old_buffer->capacity;
  390.     pack old_val = (idx<cap)?
  391.                old_buffer->array[idx].load() :
  392.                {nullptr, state::locked};
  393.     if (idx<cap) {
  394.        while(old_val.mark == state::active &&
  395.             !old_buffer->array[idx].compare_exchange_weak(
  396.                 old_val, {old_val.value, state::locked});
  397.      }
  398.      pack new_val = new_buffer->array[idx].load();
  399.          
  400.      while(new_val.mark == state::uninitialised &&
  401.            !new_buffer->array[idx].compare_exchange_weak(
  402.                new_val, {old_val.value, state::active}));
  403.  }
  404.          
  405.  /* TODO after profiling:
  406.     Each thread should perform transfers on different
  407.     large chunks of values then CAS the first marker from each
  408.     chunk from state::locked to state::block_complete.
  409.     Reading off each 'block_complete' marker,
  410.     any thread can quickly skip through, determining
  411.     what needs to be done to finish transferring a block.
  412.    
  413.     Alternatively I may find that allocations
  414.     are my bottleneck or maybe memory ordering.
  415.    
  416.     Replacing the marked values with immutable pointers would
  417.     allow larger lock-free types and fetch_add support but
  418.     involves lots of dereferencing.
  419.    
  420.     I should find a motivating use-case then implement
  421.     iterator-based methods like insert() using some
  422.     similar 'marking' scheme.
  423.  */
  424.          
  425.  void transfer_buffers(buffer* old_buffer,
  426.                        buffer* new_buffer) {
  427.    for(int i = 0; i < new_buffer->capacity; ++i) {
  428.        transfer_buffer_index(idx, old_buffer, new_buffer};
  429.    }
  430.  }
  431.                              
  432.   void transfer_block(int block_size, int block_id,
  433.                      buffer* old_buffer, buffer* new_buffer) {
  434.      size_t start = block_size*block_id;
  435.      size_t end = min(new_buffer->capacity, start+block_size)
  436.      auto& ref = old_buffer->array[start];
  437.      auto val = ref.load();
  438.      if(val.mark == state::block_complete) {
  439.         return;
  440.      }
  441.      
  442.      for(size_t i = start; i < end, ++i) {
  443.         transfer_buffer_index(i,old_buffer, new_buffer);
  444.      }
  445.      val = ref.load();
  446.      ref.store({val.value, state::block_complete});
  447.   }
  448.  
  449.  size_t capacity_hint() {
  450.    auto cap = load_new_buffer()->capacity;
  451.    release_new_buffer();
  452.    return cap;
  453.  }
  454.  
  455. buffer* load_new_buffer() {
  456.     auto buffers = atomic_handle.load();
  457.     do {
  458.       auto temp = buffers;
  459.       auto& hp = hazard_ptr_for_this_thread<0>();
  460.       hp0.store(temp.ptr);
  461.       buffers = atomic_handle.load();
  462.     } while (buffers!=temp);
  463.     return buffers.ptr;
  464. }
  465.  
  466.  buffer_handle load_buffers() {
  467.   auto buffers = atomic_handle.load();
  468.   do {
  469.    auto temp = buffers;
  470.    auto& hp0 = hazard_ptr_for_this_thread<0>();
  471.    auto& hp1 = hazard_ptr_for_this_thread<1>();
  472.    hp0.store(temp.ptr);
  473.    hp1.store(temp.old_ptr);
  474.    buffers = atomic_handle.load();
  475.   } while (buffers!=temp);
  476.   return buffers;
  477.  }
  478.  
  479.  void release_buffers() {
  480.    release_hazard<0>();
  481.    release_hazard<1>();
  482.  }
  483.          
  484.  void release_new_buffer() {
  485.    release_hazard<0>();
  486.  }
  487.  
  488.  void release_old_buffer() {
  489.    release_hazard<1>();
  490.  }
  491. };
  492.  
  493. // Most of this is from Anthony William's
  494. // C++ Concurrency in Action
  495. // I have replaced the fixed size array of hazard
  496. // pointers with a linked block list to avoid an artificial
  497. // limit on the number of threads
  498.      
  499. struct hazard_pointer {
  500.     std::atomic<std::thread::id> id;
  501.     std::atomic<void*> pointer;
  502. };
  503.      
  504. // this should be a soft upper bound on the
  505. // number of active threads at any one time x2
  506. constexpr size_t HAZARD_BLOCK = 64;
  507.          
  508. struct hazard_block {
  509.     hazard_pointer block[HAZARD_BLOCK];
  510.     atomic<hazard_pointers*> next = nullptr;
  511. }
  512.  
  513. hazard_block hazard_ptrs;
  514.      
  515. class hp_owner {
  516.     hazard_pointer* hp;
  517. public:
  518.     hp_owner(hp_owner const&)=delete;
  519.     hp_owner operator=(hp_owner const&)=delete;
  520.  
  521.     hp_owner() : hp(nullptr) {
  522.       hazard_block* hb = &hazard_ptrs;
  523.       do {
  524.         for(int i = 0; i < HAZARD_BLOCK; ++i) {
  525.           std::thread::id old_id;
  526.           if(hb->block[i].id.compare_exchange_strong(
  527.               old_id, std::this_thread::get_id())) {
  528.             hp = hazard_pointers[i];
  529.             return;
  530.           }
  531.         }
  532.         auto* temp = hb->next.load();
  533.         if(temp) {
  534.             hb = temp;
  535.         } else {
  536.           hazard_block* new_block = new hazard_block();
  537.           if(hb->next.compare_exchange_strong(
  538.               temp, new_block)) {
  539.             hb = new_block;
  540.           } else {
  541.             delete new_block;
  542.             hb = temp;
  543.           }
  544.         }
  545.       } while(true);
  546.     }
  547.    
  548.     std::atomic<void*>& get_pointer() {
  549.        return hp->pointer;
  550.     }
  551.    
  552.     ~hp_owner() {
  553.       hp->pointer.store(nullptr);
  554.       hp->id.store(std::thread::id());
  555.     }
  556. };
  557.  
  558. template<int unused>
  559. std::atomic<void*>& hazard_pointer_for_this_thread() {
  560.   thread_local hp_owner hazard;
  561.   return hazard.get_pointer();
  562. }
  563.          
  564. template<int id>
  565. void release_hazard() {
  566.   auto& hp = hazard_pointer_for_this_thread<id>();
  567.   hp.store(nullptr);
  568. }
  569.      
  570. template<typename T>
  571. void do_delete(void* p)
  572. {
  573.     delete static_cast<T*>(p);
  574. }
  575.      
  576. struct data_to_reclaim {
  577.     void* data;
  578.     std::function<void(void*)> deleter;
  579.     data_to_reclaim* next
  580.        
  581.     template<typename T>
  582.     data_to_reclaim(T* p) : data(p),
  583.                             deleter(&do_delete<T>),
  584.                             next(0) {}
  585.  
  586.     ~data_to_reclaim() {
  587.       deleter(data);
  588.     }
  589. };
  590.          
  591. bool outstanding_hazard_pointers_for(void* p) {
  592.     for(unsigned i=0;i<max_hazard_pointers;++i) {
  593.       if(hazard_pointers[i].pointer.load()==p) {
  594.          return true;
  595.       }
  596.     }
  597.     return false;
  598. }
  599.      
  600. std::atomic<data_to_reclaim*> nodes_to_reclaim;
  601.      
  602. void add_to_reclaim_list(data_to_reclaim* node) {
  603.   node->next=nodes_to_reclaim.load();
  604.   while(!nodes_to_reclaim.
  605.         compare_exchange_weak(node->next,node));
  606. }
  607.          
  608. template<typename T>
  609. void reclaim(T* data) {
  610.     if(outstanding_hazard_pointers_for(data)) {
  611.       add_to_reclaim_list(new data_to_reclaim(data));
  612.     } else {
  613.       delete data;
  614.     }
  615. }
  616.          
  617. void delete_nodes_with_no_hazards() {
  618.   data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr);  
  619.   while(current) {
  620.     data_to_reclaim* const next=current->next;
  621.     if(!outstanding_hazard_pointers_for(current->data)) {
  622.       delete current;
  623.     } else {
  624.       add_to_reclaim_list(current);
  625.     }
  626.     current=next;
  627.   }
  628. }
  629.          
  630. // support for fetch_add etc.
  631.    
  632. template <template<class> typename U, typename T>
  633. struct fetch_op_support {
  634.     using Arg = std::conditional_t<
  635.                     std::is_integral_v<T>, T, intptr_t>
  636.     using Arg2 = std::enable_if_t<
  637.                     std::is_integral_v<T>, T>
  638.     using Ret = std::conditional_t<
  639.                    std::is_integral_v<T>, T,
  640.                 std::enable_if_t<
  641.                  std::is_pointer_v<T>, T> >
  642. private:
  643.     template<typename A, typename Op>
  644.     Ret op(A value, Op&& mod) {
  645.         Ret expected;
  646.         while(!
  647.            static_cast<U<T>*>(this)->
  648.             compare_exchange_weak(
  649.             expected, mod(expected, value));
  650.         return expected;
  651.     }        
  652.  
  653. public:
  654. Ret fetch_add(Arg v) {return op(v,[](Ret a, Arg b){a + b;});}
  655. Ret fetch_sub(Arg v) {return op(v,[](Ret a, Arg b){a - b;});}
  656. Ret fetch_and(Arg2 v){return op(v,[](Ret a,Arg2 b){a & b;});}
  657. Ret fetch_or (Arg2 v){return op(v,[](Ret a,Arg2 b){a | b;});}
  658. Ret fetch_xor(Arg2 v){return op(v,[](Ret a,Arg2 b){a ^ b;});}
  659.     Ret operator+=(Arg  val) { return fetch_add(val) + val; }
  660.     Ret operator-=(Arg  val) { return fetch_sub(val) - val; }
  661.     Ret operator&=(Arg2 val) { return fetch_and(val) & val; }
  662.     Ret operator|=(Arg2 val) { return fetch_or (val) | val; }
  663.     Ret operator^=(Arg2 val) { return fetch_xor(val) ^ val; }
  664.     Ret operator++()    { return *this+=1; }
  665.     Ret operator--()    { return *this-=1; }
  666.     Ret operator++(int) { return *this++; }
  667.     Ret operator--(int) { return *this--; }
  668. }
  669.          
  670. }; // namespace fbw
  671.  
Add Comment
Please, Sign In to add comment