rmohan

Untitled

Sep 22nd, 2025 (edited)
289
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
OCaml 4.32 KB | None | 0 0
  1. open Effect
  2. open Effect.Deep
  3. open Eio
  4.  
  5.  
  6. type _ Effect.t += Barrier_Action: unit -> unit Effect.t
  7.  
  8. let barrier_action f =
  9.  match_with (fun() -> f )
  10.  ()
  11.  { effc = (fun (type c) (eff1: c Effect.t) ->
  12.       match eff1 with
  13.       | Barrier_Action  ()-> Some (fun (_: (c,_) continuation) ->
  14.               Printf.printf "Barrier action";
  15.           )
  16.       | _ -> None
  17.   );
  18.   exnc = (function
  19.         | e -> raise e
  20.   );
  21.  
  22.   retc = (fun _ ->  ())
  23. }
  24.  
  25. type round = {
  26.     mutable count  :  int;
  27.     wait_cond : Eio.Condition.t;
  28.     breached_cond : Eio.Condition.t;
  29.     mutable is_broken : bool
  30. }
  31.  
  32. type cyclic_barrier = {
  33.     participants : int;
  34.     barrier_action :  (unit -> unit);
  35.     lock : Eio.Mutex.t;
  36.     mutable round : round
  37. }
  38.  
  39. let create_cyclic_barrier participants  ={
  40.      participants = participants;
  41.      lock = Eio.Mutex.create();
  42.      round = {
  43.           count = 0;
  44.           wait_cond = Eio.Condition.create ();
  45.           breached_cond = Eio.Condition.create () ;
  46.           is_broken = false
  47.      };
  48.      barrier_action = (fun() -> perform (Barrier_Action() ))
  49. }
  50.  
  51. exception Barrier_breached of string
  52.  
  53. let break_barrier b wait_cond breached_cond should_lock =
  54.  
  55.     Eio.Mutex.use_rw ~protect:should_lock b.lock (
  56.       fun () ->
  57.        if not b.round.is_broken then
  58.         b.round.is_broken <- true;
  59.         (*Broadcase change of condition  *)
  60.        match wait_cond with
  61.        | Some w ->
  62.          Eio.Condition.broadcast w;
  63.          (* Eio.Condition.broadcast br *)
  64.        | None -> failwith "break barrier error"
  65.     )
  66.  
  67. let resets b yes_or_no wait_cond breached_cond=
  68.    Eio.Mutex.use_rw ~protect:true b.lock ( fun () ->
  69.  
  70.     if yes_or_no then(
  71.        match wait_cond with
  72.        | Some w ->
  73.          Eio.Condition.broadcast w;
  74.          (* Eio.Condition.broadcast br *)
  75.        | None -> failwith "reset error"
  76.  
  77.    )
  78.    else if b.round.count > 0 then
  79.       break_barrier b wait_cond breached_cond true;
  80.  
  81.       b.round <- {           (* Start all over again *)
  82.  
  83.             count = 0;
  84.             wait_cond = Eio.Condition.create ();
  85.             breached_cond = Eio.Condition.create () ;
  86.             is_broken = false
  87.       };
  88.    )
  89.  
  90. let reset b wait_cond breached_cond =
  91.    resets b true wait_cond breached_cond
  92.  
  93. let  get_participants b =
  94.    b.participants
  95.  
  96. let  wait_list b =
  97.   Eio.Switch.run @@ fun sw ->
  98.   Eio.Mutex.use_rw ~protect:true b.lock (fun () ->  b.round.count)
  99.  
  100. let  is_broken b =
  101.   Eio.Switch.run @@ fun sw ->
  102.   Eio.Mutex.use_rw ~protect:true b.lock (fun () ->  b.round.is_broken)
  103.  
  104. let wait_cond = Fiber.create_key()
  105. let breached_cond = Fiber.create_key()
  106.  
  107. let await b =
  108.  
  109.     try Fiber.check();
  110.  
  111.      let captured_round_result =
  112.      Eio.Mutex.use_rw ~protect:true b.lock @@
  113.       fun () ->
  114.        let capture_round = b.round in
  115.          if capture_round.is_broken then(
  116.           raise ( Barrier_breached "await\n" )
  117.        )
  118.        else
  119.             capture_round.count <- capture_round.count + 1;
  120.             let c = capture_round.count in
  121.                 Printf.printf "Count is %d\n" capture_round.count;
  122.                 if  c > b.participants then
  123.                     failwith ("c > participants fatal")
  124.                 else
  125.                 if c < b.participants then(
  126.                  match (Fiber.get wait_cond) with
  127.                  | Some w ->
  128.                   Eio.Condition.await w  b.lock ;
  129.                   false
  130.                  | None -> failwith "await error"
  131.                 )
  132.                 else(
  133.                     true
  134.                 )
  135.       in
  136.       if captured_round_result = true then(
  137.         break_barrier b  (Fiber.get wait_cond) (Fiber.get breached_cond) true;
  138.         reset b  (Fiber.get wait_cond) (Fiber.get breached_cond);
  139.       )
  140.      with | e ->
  141.        let msg = Printexc.to_string e
  142.        and stack = Printexc.get_backtrace () in
  143.          Printf.eprintf "there was an error: %s%s\n" msg stack;
  144.          break_barrier b (Fiber.get wait_cond) (Fiber.get breached_cond) true;
  145.      ()
  146.  
  147. let spawn_fibers =
  148.   Eio_main.run @@ fun _env ->
  149.   Eio.Switch.run @@ fun sw ->
  150.   let b = create_cyclic_barrier 4 in
  151.   Eio.Fiber.with_binding wait_cond b.round.wait_cond @@ fun() ->
  152.   Eio.Fiber.with_binding breached_cond b.round.breached_cond @@ fun() ->
  153.    for i = 1 to 4 do
  154.       Fiber.fork ~sw (fun () ->
  155.       Printf.printf "count is %d\n" i;
  156.        await b
  157.       )
  158.     done
  159.  
Advertisement
Add Comment
Please, Sign In to add comment