Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package dijkstra
- import kotlinx.atomicfu.atomic
- import java.util.*
- import java.util.concurrent.Phaser
- import java.util.concurrent.atomic.AtomicInteger
- import java.util.concurrent.locks.Lock
- import java.util.concurrent.locks.ReentrantLock
- import kotlin.Comparator
- import kotlin.collections.ArrayList
- import kotlin.concurrent.thread
- import kotlin.random.Random
- private val NODE_DISTANCE_COMPARATOR = Comparator<Node> { o1, o2 -> Integer.compare(o1!!.distance, o2!!.distance) }
- private val DISTANCE_COMPARATOR = Comparator<Int> { o1, o2 -> o1.compareTo(o2) }
- class MultiQueue(val workers: Int, val comparator: java.util.Comparator<Node>) {
- private val N = 10;
- private val nonEmptyCnt = atomic(0)
- private val queue: Array<PriorityQueue<Node>> = Array(N) { PriorityQueue<Node>(comparator) }
- private val locks: Array<Lock> = Array(N) { ReentrantLock() }
- fun add(node: Node) {
- while (true) {
- val qCnt = Random.nextInt(0, N)
- if (locks[qCnt].tryLock()) {
- queue[qCnt].add(node)
- if (queue[qCnt].size == 1) {
- nonEmptyCnt.incrementAndGet()
- }
- locks[qCnt].unlock()
- return
- }
- }
- }
- fun remove(): Node? {
- loop@ while (true) {
- if (nonEmptyCnt.compareAndSet(0, 0)) {
- return null
- }
- val f = Random.nextInt(0, N)
- if (!locks[f].tryLock()) {
- continue;
- }
- val s = Random.nextInt(0, N)
- if (!locks[s].tryLock()) {
- locks[f].unlock()
- continue
- }
- val res: Node;
- if (queue[f].isEmpty() && queue[s].isEmpty()) {
- continue@loop
- }
- if (queue[f].isEmpty()) {
- return removedElement(s)
- } else
- if (queue[s].isEmpty()) {
- return removedElement(f)
- } else
- if (queue[f].first().distance < queue[s].first().distance) {
- res = removedElement(f)
- } else {
- res = removedElement(s)
- }
- locks[f].unlock()
- locks[s].unlock()
- return res
- }
- }
- private fun removedElement(i: Int): Node {
- if (queue[i].size == 1) nonEmptyCnt.decrementAndGet()
- return queue[i].remove()
- }
- val queues: List<PriorityQueue<Node>> = ArrayList(Collections.nCopies(N, PriorityQueue<Node>(comparator)))
- }
- /**
- * Returns `Integer.MAX_VALUE` if a path has not been found.
- */
- fun shortestPathParallel(start: Node) {
- val workers = Runtime.getRuntime().availableProcessors()
- val q = MultiQueue(workers, NODE_DISTANCE_COMPARATOR).apply {
- add(start.apply {
- distanceMutable.value = 0;
- })
- }
- val activeNodes = AtomicInteger(1)
- val onFinish = Phaser(workers + 1)
- repeat(workers) {
- thread {
- while (activeNodes.get() > 0) {
- q.remove()?.let { node ->
- val distance = node.distance
- for (edge in node.outgoingEdges) {
- val update = distance + edge.weight
- while (true) {
- val oldValue = edge.to.distance
- if (DISTANCE_COMPARATOR.compare(update, oldValue) >= 0) {
- break
- }
- if (edge.to.distanceMutable.compareAndSet(oldValue, update)) {
- q.add(edge.to)
- activeNodes.incrementAndGet()
- break
- }
- }
- }
- activeNodes.decrementAndGet()
- }
- }
- onFinish.arrive()
- }
- }
- onFinish.arriveAndAwaitAdvance()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement