Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package dijkstra
- import java.util.*
- import java.util.concurrent.Phaser
- import java.util.concurrent.atomic.AtomicInteger
- import java.util.concurrent.locks.Lock
- import java.util.concurrent.locks.ReentrantLock
- import kotlin.Comparator
- import kotlin.collections.ArrayList
- import kotlin.concurrent.thread
- var activeNodes = AtomicInteger(0)
- private val NODE_DISTANCE_COMPARATOR = Comparator<Node> { o1, o2 -> Integer.compare(o1!!.distance, o2!!.distance) }
- class MultiPriorityQueue(size: Int, comparator: Comparator<Node>) {
- private val N: Int = size;
- private val _locks = List(2 * N) { ReentrantLock() }
- val locks: List<Lock> = ArrayList(_locks)
- private val _queues = List(2 * N) { PriorityQueue<Node>(comparator) }
- val queues: List<PriorityQueue<Node>> = ArrayList(_queues)
- fun poll(): Node? {
- val i = (0 until (2 * N - 1)).random()
- val j = (i + 1 until (2 * N)).random()
- val q1 = queues[i]
- val q2 = queues[j]
- val lock1 = locks[i]
- val lock2 = locks[j]
- var removed: Node? = null
- if (lock1.tryLock() && lock2.tryLock()) {
- val node1 = q1.peek()
- val node2 = q2.peek()
- if (node1 == null) {
- removed = node2
- q2.poll()
- } else {
- if (node2 == null || node1.distance < node2.distance) {
- removed = node1
- q1.poll()
- } else {
- removed = node2
- q2.poll()
- }
- }
- lock1.unlock()
- lock2.unlock()
- }
- return removed
- }
- fun add(elem: Node) {
- while (true) {
- val i: Int = (Random()).nextInt(2 * N)
- val q = queues[i]
- val curLock: Lock = locks[i]
- if (curLock.tryLock()) {
- q.add(elem)
- curLock.unlock();
- break
- }
- }
- }
- }
- // Returns `Integer.MAX_VALUE` if a path has not been found.
- fun shortestPathParallel(start: Node) {
- val workers = Runtime.getRuntime().availableProcessors()
- // The distance to the start node is `0`
- start.distance = 0
- // Create a priority (by distance) queue and add the start node into it
- val q = MultiPriorityQueue(workers, NODE_DISTANCE_COMPARATOR) // TODO replace me with a multi-queue based PQ!
- q.add(start)
- activeNodes.incrementAndGet()
- // Run worker threads and wait until the total work is done
- val onFinish = Phaser(workers + 1) // `arrive()` should be invoked at the end by each worker
- repeat(workers) {
- thread {
- while (true) {
- // TODO Write the required algorithm here,
- // TODO break from this loop when there is no more node to process.
- // TODO Be careful, "empty queue" != "all nodes are processed".
- val cur: Node = q.poll()
- ?: if (activeNodes.get() > 0) continue else break;
- for (e in cur.outgoingEdges) {
- while (e.to.distance > cur.distance + e.weight) {
- val initDist: Int = e.to.distance
- val dist: Int = cur.distance + e.weight
- if (initDist > dist && e.to.casDistance(initDist, dist)) {
- q.add(e.to)
- activeNodes.incrementAndGet()
- break
- }
- }
- }
- activeNodes.decrementAndGet()
- }
- onFinish.arrive()
- }
- }
- onFinish.arriveAndAwaitAdvance()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement