Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package dijkstra
- import kotlinx.atomicfu.AtomicInt
- import kotlinx.atomicfu.atomic
- import java.util.*
- import java.util.Collections.nCopies
- import java.util.concurrent.Phaser
- import java.util.concurrent.atomic.AtomicInteger
- import kotlin.Comparator
- import kotlin.concurrent.thread
- import java.util.concurrent.locks.ReentrantLock
- private val NODE_DISTANCE_COMPARATOR = Comparator<Node> { o1, o2 -> o1!!.distance.compareTo(o2!!.distance) }
- private val random = Random()
- // Returns `Integer.MAX_VALUE` if a path has not been found.
- fun shortestPathParallel(start: Node) {
- val workers = Runtime.getRuntime().availableProcessors()
- val elements = workers * 2
- // The distance to the start node is `0`
- start.distanceMutable.value = 0
- // Create a priority (by distance) queue and add the start node into it
- val queues = nCopies(elements, PriorityQueue(elements, NODE_DISTANCE_COMPARATOR))
- val mutexes = nCopies(elements, ReentrantLock(true))
- val activeNodes = AtomicInteger(0)
- queues[0].add(start)
- activeNodes.incrementAndGet()
- // Run worker threads and wait until the total work is done
- val onFinish = Phaser(workers + 1) // `arrive()` should be invoked at the end by each worker
- repeat(workers) {
- thread {
- while (true) {
- // TODO Write the required algorithm here,
- // TODO break from this loop when there is no more node to process.
- // TODO Be careful, "empty queue" != "all nodes are processed".
- var cur: Node? = null
- while (activeNodes.get() > 0) {
- var firstInd = random.nextInt(elements)
- var secondInd = random.nextInt(elements)
- if (firstInd == secondInd) {
- continue
- }
- if (firstInd > secondInd) firstInd = secondInd.also { secondInd = firstInd }
- while (true) {
- mutexes[firstInd].lock()
- if (mutexes[secondInd].tryLock()) {
- break
- }
- mutexes[firstInd].unlock()
- }
- try {
- val firstNode: Node? = queues[firstInd].firstOrNull()
- val secondNode: Node? = queues[secondInd].firstOrNull()
- if (firstNode == null && secondNode == null) {
- continue
- }
- if (firstNode == null) {
- cur = queues[secondInd].poll()
- break
- }
- if (secondNode == null) {
- cur = queues[firstInd].poll()
- break
- }
- cur = if (firstNode.distance < secondNode.distance) {
- queues[firstInd].poll()
- } else {
- queues[secondInd].poll()
- }
- break
- } finally {
- mutexes[secondInd].unlock()
- mutexes[firstInd].unlock()
- }
- }
- if (cur == null) {
- // if (activeNodes.get() == 0) {
- break
- // } else {
- // continue
- // }
- }
- var distE: Int
- for (e in cur.outgoingEdges) {
- distE = e.to.distance
- while (distE > cur.distance + e.weight) {
- if (e.to.distanceMutable.compareAndSet(distE, cur.distance + e.weight)) {
- while (true) {
- val index = random.nextInt(elements)
- if (mutexes[index].tryLock()) {
- try {
- activeNodes.incrementAndGet()
- queues[index].add(e.to)
- break
- } finally {
- mutexes[index].unlock()
- }
- }
- }
- }
- distE = e.to.distance
- }
- }
- activeNodes.decrementAndGet()
- }
- onFinish.arrive()
- }
- }
- onFinish.arriveAndAwaitAdvance()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement