Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* hazardpointer.vala
- *
- * Copyright (C) 2011 Maciej Piechotka
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * Author:
- * Maciej Piechotka <uzytkownik2@gmail.com>
- */
- /**
- * Hazard pointer is a method of protecting a pointer shared by many threads.
- * If you want to use atomic pointer that may be freed you should use following code:
- *
- * {{{
- * string *shared_pointer = ...;
- * HazardPointer<string> hptr = HazardPointer.get_hazard_pointer (&shared_pointer);
- * // my_string contains value from shared_pinter. It is valid as long as hptr is alive.
- * unowned string my_string = ptr.get ();
- * // instead of delete
- * ptr.release ((ptr) => {string *sptr = ptr;string ref = (owned)sptr;});
- * });
- * }}}
- *
- * In some cases you may use helper methods which might involve copying of object (and are unsafe for unowned objects):
- * {{{
- * Gtk.Window *window = ...;
- * Gtk.Window? local_window = HazardPointer.get_pointer (&window);
- * HazardPointer.set_pointer (&window, ...)
- * local_window = HazardPointer.exchange_pointer (&window, null);
- * HazardPointer.compare_and_exchange (&window, null, local_window);
- * }}}
- *
- * The class also provides helper methods if least significant bits are used for storing flags.
- *
- * HazardPointers are not thread-safe and cannot be shared between threads.
- */
- [Compact]
- public class Gee.HazardPointer<G> { // FIXME: Make it a struct
- /**
- * Creates a hazard pointer for a pointer.
- *
- * @param ptr Protected pointer
- */
- public HazardPointer (G *ptr) {
- this._node = acquire ();
- this._node.set ((void *)ptr);
- }
- /**
- * Create a hazard pointer from Node.
- */
- internal HazardPointer.from_node (Node node) {
- this._node = node;
- }
- /**
- * Gets hazard pointer from atomic pointer safely.
- *
- * @param aptr Atomic pointer.
- * @param mask Mask of bits.
- * @param mask_out Result of mask.
- * @returns Hazard pointer containing the element.
- */
- public static HazardPointer<G>? get_hazard_pointer<G> (G **aptr, size_t mask = 0, out size_t mask_out = null) {
- unowned Node node = acquire ();
- void *rptr = null;
- void *ptr = null;
- do {
- rptr = AtomicPointer.get ((void **)aptr);
- ptr = (void *)((size_t) rptr & ~mask);
- if (&mask_out != null)
- mask_out = (size_t) rptr & mask;
- node.set (ptr);
- } while (rptr != AtomicPointer.get ((void **)aptr));
- if (ptr != null) {
- return new HazardPointer<G>.from_node (node);
- } else {
- node.release ();
- return null;
- }
- return null;
- }
- /**
- * Copy an object from atomic pointer.
- *
- * @param aptr Atomic pointer.
- * @param mask Mask of flags.
- * @param mask_out Result of mask.
- * @returns A copy of object from atomic pointer.
- */
- public static G? get_pointer<G> (G **aptr, size_t mask = 0, out size_t mask_out = null) {
- unowned Node node = acquire ();
- void *rptr = null;
- void *ptr = null;
- do {
- rptr = AtomicPointer.get ((void **)aptr);
- ptr = (void *)((size_t) rptr & ~mask);
- if (&mask_out != null)
- mask_out = (size_t) rptr & mask;
- node.set (ptr);
- } while (rptr != AtomicPointer.get ((void **)aptr));
- G? res = (G *)ptr;
- node.release ();
- return res;
- }
- /**
- * Exchange objects safly.
- *
- * @param aptr Atomic pointer.
- * @param new_ptr New value
- * @param mask Mask of flags.
- * @param new_mask New mask.
- * @param old_mask Previous mask mask.
- * @returns Hazard pointer containing old value.
- */
- public static HazardPointer<G>? exchange_hazard_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) {
- unowned Node? new_node = null;
- if (new_ptr != null) {
- new_node = acquire ();
- new_node.set (new_ptr);
- }
- void *new_rptr = (void *)((size_t)((owned) new_ptr) | (mask & new_mask));
- unowned Node node = acquire ();
- void *rptr = null;
- void *ptr = null;
- do {
- rptr = AtomicPointer.get ((void **)aptr);
- ptr = (void *)((size_t) rptr & ~mask);
- if (&old_mask != null)
- old_mask = (size_t) rptr & mask;
- node.set (ptr);
- } while (!AtomicPointer.compare_and_exchange((void **)aptr, rptr, new_rptr));
- if (new_node != null)
- new_node.release ();
- if (rptr == null) {
- return new HazardPointer<G>.from_node (node);
- } else {
- node.release ();
- return null;
- }
- }
- /**
- * Sets object safely
- *
- * @param aptr Atomic pointer.
- * @param new_ptr New value
- * @param mask Mask of flags.
- * @param new_mask New mask.
- */
- public static void set_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0) {
- HazardPointer<G>? ptr = exchange_hazard_pointer<G> (aptr, new_ptr, mask, new_mask, null);
- if (ptr != null)
- ptr.release (null); // FIXME: Add real DestroyNotify
- }
- /**
- * Exchange objects safly.
- *
- * @param aptr Atomic pointer.
- * @param new_ptr New value
- * @param mask Mask of flags.
- * @param new_mask New mask.
- * @param old_mask Previous mask mask.
- * @returns Value that was previously stored.
- */
- public static G? exchange_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) {
- HazardPointer<G>? ptr = exchange_hazard_pointer<G> (aptr, new_ptr, mask, new_mask, out old_mask);
- G? rptr = ptr != null ? ptr.get () : null;
- return rptr;
- }
- /**
- * Compares and exchanges objects.
- *
- * @param aptr Atomic pointer.
- * @param old_ptr Old pointer.
- * @param new_ptr New value.
- * @param old_mask Old mask.
- * @param new_mask New mask.
- * @returns Value that was previously stored.
- */
- public static bool compare_and_exchange_pointer<G> (G **aptr, G? old_ptr, G? new_ptr, size_t mask = 0, size_t old_mask = 0, size_t new_mask = 0) {
- void *new_rptr = (void *)((size_t)(new_ptr) | (mask & new_mask));
- void *old_rptr = (void *)((size_t)(old_ptr) | (mask & old_mask));
- bool success = AtomicPointer.compare_and_exchange((void **)aptr, old_rptr, new_rptr);
- if (success) {
- Context.get_current_context ()->release (old_ptr, null); // FIXME: Real DestroyNotify
- G? fake = new_ptr;
- G *fake_ptr = (owned)fake;
- }
- return success;
- }
- ~HazardPointer () {
- _node.release ();
- }
- /**
- * Gets the pointer hold by hazard pointer.
- * @returns The value hold by pointer.
- */
- public inline new unowned G get () {
- return _node[false];
- }
- /**
- * Free the pointer.
- *
- * @params DestroyNotify method freeing object
- */
- internal void release (DestroyNotify notify) {
- unowned G item = _node[false];
- _node.set (null);
- Context.get_current_context ()->release (item, notify);
- }
- /**
- * Sets default policy (i.e. default policy for user-created contexts).
- * The policy must be concrete and should not be blocking.
- *
- * @params policy New default policy.
- */
- public static void set_default_policy (Policy policy) requires (policy.is_concrete ()) {
- if (policy.is_blocking ())
- warning ("Setting blocking defautl Gee.HazardPointer.Policy (there may be a deadlock).\n");
- AtomicInt.set(ref _default_policy, (int)policy);
- }
- /**
- * Sets thread exit policy (i.e. default policy for the top-most Context).
- * The policy must be concrete and should not be unsafe.
- *
- * @params policy New thread policy.
- */
- public static void set_thread_exit_policy (Policy policy) requires (policy.is_concrete ()) {
- if (!policy.is_safe ())
- warning ("Setting unsafe globale thread-exit Gee.HazardPointer.Policy (there may be a memory leak).\n");
- AtomicInt.set(ref _thread_exit_policy, (int)policy);
- }
- /**
- * Sets release (i.e. how exactly the released objects arefreed).
- *
- * The method can be only set before any objects is released and is not thread-safe.
- *
- * @params policy New release policy.
- * @returns ``true'' if policy was sucessfully changed.
- */
- public static bool set_release_policy (ReleasePolicy policy) {
- int old_policy = AtomicInt.get (ref release_policy);
- if ((old_policy & (sizeof(int) * 8 - 1)) != 0) {
- error ("Attempt to change the policy of running helper. Failing.");
- return false;
- }
- if (!AtomicInt.compare_and_exchange (ref release_policy, old_policy, (int)policy)) {
- error ("Concurrent access to release policy detected. Failing.");
- return false;
- }
- return true;
- }
- /**
- * Policy determines what happens on exit from Context.
- */
- public enum Policy {
- /**
- * Performs default action on exit from thread.
- */
- DEFAULT,
- /**
- * Performs the same action as on exit from current thread.
- */
- THREAD_EXIT,
- /**
- * Goes through the free list and attempts to free un-freed elements.
- */
- TRY_FREE,
- /**
- * Goes through the free list and attempts to free un-freed elements
- * untill all elements are freed.
- */
- FREE,
- /**
- * Release the un-freed elements to either helper thread or to main loop.
- * Please note if the operation would block it is not performed.
- */
- TRY_RELEASE,
- /**
- * Release the un-freed elements to either helper thread or to main loop.
- * Please note it may block while adding to queue.
- */
- RELEASE;
- /**
- * Checks if the policy is concrete or if it depends on global variables.
- *
- * @returns ``true'' if this policy does not depend on global variables
- */
- public bool is_concrete () {
- switch (this) {
- case DEFAULT:
- case THREAD_EXIT:
- return false;
- case TRY_FREE:
- case FREE:
- case TRY_RELEASE:
- case RELEASE:
- return true;
- default:
- assert_not_reached ();
- }
- }
- /**
- * Checks if policy blocks or is lock-free.
- * Please note that it works on a concrete policy only.
- *
- * @returns ``true'' if the policy may block the thread.
- */
- public bool is_blocking () requires (this.is_concrete ()) {
- switch (this) {
- case TRY_FREE:
- case TRY_RELEASE:
- return false;
- case FREE:
- case RELEASE:
- return true;
- default:
- assert_not_reached ();
- }
- }
- /**
- * Checks if policy guarantees freeing all elements.
- * Please note that it works on a concrete policy only.
- *
- * @returns ``true'' if the policy guarantees freeing all elements.
- */
- public bool is_safe () requires (this.is_concrete ()) {
- switch (this) {
- case TRY_FREE:
- case TRY_RELEASE:
- return false;
- case FREE:
- case RELEASE:
- return true;
- default:
- assert_not_reached ();
- }
- }
- /**
- * Finds concrete policy which corresponds to given policy.
- *
- * @returns Policy that corresponds to given policy at given time in given thread.
- */
- public Policy to_concrete () ensures (result.is_concrete ()) {
- switch (this) {
- case TRY_FREE:
- case FREE:
- case TRY_RELEASE:
- case RELEASE:
- return this;
- case DEFAULT:
- return (Policy) AtomicInt.get (ref _default_policy);
- case THREAD_EXIT:
- return (Policy) AtomicInt.get (ref _thread_exit_policy);
- default:
- assert_not_reached ();
- }
- }
- /**
- * Runs the policy.
- * @param to_free List containing elements to free.
- * @returns Non-empty list of not freed elements or ``null'' if all elements have been disposed.
- */
- internal ArrayList<FreeNode *>? perform (owned ArrayList<FreeNode *> to_free) {
- switch (this.to_concrete ()) {
- case TRY_FREE:
- return try_free (to_free) ? (owned) to_free : null;
- case FREE:
- while (try_free (to_free)) {
- Thread.yield ();
- }
- return null;
- case TRY_RELEASE:
- ReleasePolicy.ensure_start ();
- if (_queue_mutex.trylock ()) {
- _queue.offer ((owned) to_free);
- _queue_mutex.unlock ();
- return null;
- } else {
- return (owned) to_free;
- }
- case RELEASE:
- ReleasePolicy.ensure_start ();
- _queue_mutex.lock ();
- _queue.offer ((owned) to_free);
- _queue_mutex.unlock ();
- return null;
- default:
- assert_not_reached ();
- }
- }
- }
- /**
- * Release policy determines what happens with object freed by Policy.TRY_RELEASE
- * and Policy.RELEASE.
- */
- public enum ReleasePolicy {
- /**
- * Libgee spawns helper thread to free those elements.
- * This is default.
- */
- HELPER_THREAD,
- /**
- * Libgee uses GLib main loop.
- * This is recommended for application using GLib main loop.
- */
- MAIN_LOOP;
- /**
- * Ensures that helper methods are started.
- */
- public static void ensure_start () {
- int policy = AtomicInt.get (ref release_policy);
- if ((policy & (1 << (sizeof(int) * 8 - 1))) != 0)
- return;
- if (AtomicInt.compare_and_exchange (ref release_policy, policy, policy | (1 << (sizeof(int) * 8 - 1)))) {
- switch ((ReleasePolicy) (policy & ~(sizeof(int) * 8 - 1))) {
- case HELPER_THREAD:
- try {
- Thread.create<bool> (() => {
- Thread.self<bool> ().set_priority (ThreadPriority.LOW);
- while (true) {
- Thread.yield ();
- if (_queue_mutex.trylock ()) {
- Collection<ArrayList<FreeNode *>> temp = new ArrayList<ArrayList<FreeNode *>> ();
- _queue.drain (temp);
- _queue_mutex.unlock ();
- temp.foreach ((x) => {_global_to_free.add_all (x);});
- }
- try_free (_global_to_free);
- }
- }, false);
- } catch (ThreadError error) {
- assert_not_reached ();
- }
- break;
- case MAIN_LOOP:
- Idle.add (() => {
- if (_queue_mutex.trylock ()) {
- Collection<ArrayList<FreeNode *>> temp = new ArrayList<ArrayList<FreeNode *>> ();
- _queue.drain (temp);
- _queue_mutex.unlock ();
- temp.foreach ((x) => {_global_to_free.add_all (x);});
- }
- try_free (_global_to_free);
- return true;
- }, Priority.LOW);
- break;
- }
- }
- }
- }
- /**
- * Create a new context. In many cases user does not need o
- */
- [Compact]
- public class Context { // FIXME: Should be struct
- public Context (Policy? policy = null) {
- this._to_free = new ArrayList<FreeNode *> ();
- this._parent = _current_context.get ();
- if (policy == null) {
- if (_parent == null) {
- _policy = (Policy)AtomicInt.get (ref _thread_exit_policy);
- } else {
- _policy = (Policy)AtomicInt.get (ref _default_policy);
- }
- } else {
- this._policy = policy.to_concrete ();
- }
- }
- ~Context () {
- ArrayList<FreeNode *>? remaining = (_parent == null || _to_free.size >= TRESHOLD) ? _policy.perform ((owned) _to_free) : (owned) _to_free;
- if (remaining != null) {
- assert (_parent != null);
- _parent->_to_free.add_all (remaining);
- }
- _current_context.set (_parent, null);
- }
- public inline void release (void *ptr, DestroyNotify notify) {
- FreeNode *node = new FreeNode ();
- node->pointer = ptr;
- node->destroy_notify = notify;
- _to_free.add (node);
- if (_to_free.size >= TRESHOLD)
- try_free (_to_free);
- }
- public inline static Context *get_current_context () {
- return _current_context.get ();
- }
- internal Context *_parent;
- internal ArrayList<FreeNode *> _to_free;
- internal Policy? _policy;
- internal static StaticPrivate _current_context;
- internal static StaticPrivate _root_context;
- private static uint TRESHOLD = 10;
- }
- /**
- * Gets a new hazard pointer node
- *
- * @return new hazard pointer node
- */
- internal static inline unowned Node acquire () {
- for (unowned Node? curr = get_head (); curr != null; curr = curr.get_next ())
- if (curr.activate ())
- return curr;
- Node *node = new Node ();
- Node *old_head = null;
- do {
- node->set_next (old_head = (Node *)AtomicPointer.get (&_head));
- } while (!AtomicPointer.compare_and_exchange (&_head, old_head, node));
- return node;
- }
- internal static bool try_free (ArrayList<FreeNode *> to_free) {
- Collection<void *> used = new HashSet<void *>();
- for (unowned Node? current = get_head (); current != null; current = current.get_next ()) {
- used.add (current.get ());
- }
- for (int i = 0; i < to_free.size;) {
- if (used.contains (to_free[i])) {
- i++;
- } else {
- FreeNode *cur = to_free.remove_at (to_free.size - 1);
- if (i != to_free.size - 1) {
- FreeNode *temp = to_free[i];
- to_free[i] = cur;
- cur = temp;
- }
- cur->destroy_notify (cur->pointer);
- delete cur;
- }
- }
- return to_free.size > 0;
- }
- internal static unowned Node? get_head () {
- return (Node *)AtomicPointer.get(&_head);
- }
- internal unowned Node _node;
- internal static Node *_head = null;
- internal static int _default_policy = (int)Policy.TRY_FREE;
- internal static int _thread_exit_policy = (int)Policy.RELEASE;
- internal static int release_policy = 0;
- internal static Queue<ArrayList<FreeNode *>> _queue = new LinkedList<ArrayList<FreeNode *>> ();
- internal static StaticMutex _queue_mutex;
- internal static ArrayList<FreeNode *> _global_to_free;
- [Compact]
- internal class FreeNode {
- public void *pointer;
- public DestroyNotify destroy_notify;
- }
- [Compact]
- internal class Node {
- public Node () {
- AtomicPointer.set (&_hazard, null);
- AtomicInt.set (ref _active, 1);
- }
- inline ~Node () {
- delete _next;
- }
- public void release () {
- AtomicPointer.set (&_hazard, null);
- AtomicInt.set (ref _active, 0);
- }
- public inline bool is_active () {
- return AtomicInt.get (ref _active) != 0;
- }
- public inline bool activate () {
- return AtomicInt.compare_and_exchange (ref _active, 0, 1);
- }
- public inline void set (void *ptr) {
- AtomicPointer.set (&_hazard, ptr);
- }
- public inline void *get (bool safe = true) {
- if (safe) {
- return (void *)AtomicPointer.get (&_hazard);
- } else {
- return (void *)_hazard;
- }
- }
- public inline unowned Node? get_next () {
- return (Node *)AtomicPointer.get (&_next);
- }
- public inline void set_next (Node *next) {
- AtomicPointer.set (&_next, next);
- }
- public Node *_next;
- public int _active;
- public void *_hazard;
- }
- }
Add Comment
Please, Sign In to add comment