Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- open Core.Std
- open Async.Std
- module Existential = struct
- type anytype = Any : 'a -> anytype
- end;;
- module Bundle = struct
- open Existential
- type t = anytype Ivar.t String.Table.t
- let create (keys : string list) =
- let tbl = String.Table.create () in
- List.iter keys ~f:(fun key ->
- Hashtbl.set tbl ~key ~data:(Ivar.create ())
- );
- tbl
- ;;
- let fill t key value =
- let ivar = Hashtbl.find_exn t key in
- Ivar.fill ivar (Existential.Any value)
- ;;
- let wait_complete t =
- let keys = Hashtbl.keys t in
- List.map keys ~f:(fun key ->
- Ivar.read (Hashtbl.find_exn t key)
- >>| fun value ->
- (key, value)
- )
- |> Deferred.all
- >>| String.Table.of_alist_exn
- ;;
- end;;
- type t =
- { keys : string list
- ; bundles : (int, Bundle.t) Hashtbl.t
- ; processor : Existential.anytype String.Table.t -> unit
- }
- let create keys ~processor =
- { keys
- ; bundles = Hashtbl.create ~hashable:Int.hashable ()
- ; processor
- }
- ;;
- let update t seq_id key value =
- match Hashtbl.find t.bundles seq_id with
- | Some bundle -> Bundle.fill bundle key value
- | None ->
- let bundle = Bundle.create t.keys in
- Hashtbl.set t.bundles ~key:seq_id ~data:bundle;
- Bundle.fill bundle key value;
- let process_bundle =
- Bundle.wait_complete bundle
- >>| fun table ->
- Hashtbl.remove t.bundles seq_id;
- t.processor table
- in
- Deferred.don't_wait_for(process_bundle)
- ;;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement