Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.*
- import java.util.concurrent.Phaser
- import java.util.concurrent.atomic.AtomicInteger
- import java.util.concurrent.locks.Lock
- import java.util.concurrent.locks.ReentrantLock
- import kotlin.Comparator
- import kotlin.collections.ArrayList
- import kotlin.concurrent.thread
- var isDone = AtomicInteger(0)
- private val NODE_DISTANCE_COMPARATOR = Comparator<Node> { o1, o2 -> Integer.compare(o1!!.distance, o2!!.distance) }
- class MultiPriorityQueue(size: Int, comparator: Comparator<Node>) {
- private val numOfQueues: Int = size;
- private val _locks = List(2 * numOfQueues) { ReentrantLock() }
- val lockList: List<Lock> = ArrayList(_locks)
- private val _queues = List(2 * numOfQueues) { PriorityQueue<Node>(comparator) }
- val queueList: List<PriorityQueue<Node>> = ArrayList(_queues)
- fun poll(): Node? {
- val random1 = (1 until numOfQueues).random()
- val random2 = (0 until random1).random()
- val q = queueList[random1]
- val p = queueList[random2]
- val lockQ = lockList[random1]
- val lockP = lockList[random2]
- var ans: Node? = null
- if (lockQ.tryLock() && lockP.tryLock()) {
- try {
- if (p.isEmpty()) {
- if (!q.isEmpty()) {
- ans = q.poll()
- }
- } else {
- if (q.isEmpty()) {
- ans = p.poll()
- } else {
- if (p.peek().distance < q.peek().distance) {
- ans = p.poll()
- } else {
- ans = q.poll()
- }
- }
- }
- } finally {
- lockQ.unlock()
- lockP.unlock()
- }
- }
- return ans
- }
- fun add(n: Node){
- while (true) {
- val r = (0 until numOfQueues).random()
- val q = queueList[r]
- val l = lockList[r]
- if (l.tryLock()) {
- try {
- q.add(n)
- } finally {
- l.unlock();
- }
- break
- }
- }
- }
- }
- // Returns `Integer.MAX_VALUE` if a path has not been found.
- fun shortestPathParallel(start: Node) {
- val workers = Runtime.getRuntime().availableProcessors()
- // The distance to the start node is `0`
- start.distance = 0
- // Create a priority (by distance) queue and add the start node into it
- val q = MultiPriorityQueue(workers, NODE_DISTANCE_COMPARATOR) // TODO replace me with a multi-queue based PQ!
- q.add(start)
- isDone.incrementAndGet()
- // Run worker threads and wait until the total work is done
- val onFinish = Phaser(workers + 1) // `arrive()` should be invoked at the end by each worker
- repeat(workers) {
- thread {
- while (true) {
- val cur: Node = q.poll()
- ?: if (isDone.get() <= 0) {
- break
- } else {
- continue
- }
- for (e in cur.outgoingEdges) {
- while (e.to.distance > cur.distance + e.weight) {
- val curD: Int = e.to.distance
- val newD: Int = cur.distance + e.weight
- if (curD > newD) {
- if (e.to.casDistance(curD, newD)) {
- q.add(e.to)
- isDone.incrementAndGet()
- break
- }
- }
- }
- }
- isDone.decrementAndGet()
- }
- onFinish.arrive()
- }
- }
- onFinish.arriveAndAwaitAdvance()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement