Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package dijkstra
- import kotlinx.atomicfu.atomic
- import java.lang.Math.abs
- import java.util.*
- import java.util.Collections.swap
- import java.util.concurrent.Phaser
- import java.util.concurrent.locks.ReentrantLock
- import kotlin.Comparator
- import kotlin.collections.ArrayList
- import kotlin.collections.RandomAccess
- import kotlin.concurrent.thread
- private val NODE_DISTANCE_COMPARATOR = Comparator<Node> { o1, o2 -> Integer.compare(o1!!.distance, o2!!.distance) }
- // Returns `Integer.MAX_VALUE` if a path has not been found.
- class MyQueue(workers: Int, comparator: Comparator<Node> ){
- val workers : Int
- var queueSzie = atomic(0)
- val sizeLock : ReentrantLock
- val arr = IntArray(workers)
- var myList : List<Int>
- val sizeGroups = atomic(0)
- val qu : ArrayList<PriorityQueue<Node>> = ArrayList()
- val loksStatus : ArrayList<ReentrantLock> = ArrayList()
- init{
- this.workers = workers
- for(i in 0..workers -1){
- arr[i] = i
- }
- myList = arr.toList()
- sizeLock = ReentrantLock()
- for (i in 1..this.workers) {
- qu.add(PriorityQueue(1, comparator))
- loksStatus.add(ReentrantLock())
- }
- }
- fun pop(): Node? {
- var i = 0
- var j = 0
- var deletedSomethink : Boolean = false
- var ans : Node = Node()
- var myShuffleList = myList.shuffled(Random())
- while (!isEmpty()){
- // i++;
- // if(i == workers)
- // i = 0
- //
- // if(loksStatus[myShuffleList[i]].tryLock()){
- // try{
- // if(qu[myShuffleList[i]].size > 0){
- // ans = qu[myShuffleList[i]].remove()
- // deletedSomethink = true
- // queueSzie.decrementAndGet()
- // }
- // } finally {
- // loksStatus[myShuffleList[i]].unlock()
- // }
- // }
- // if(deletedSomethink){
- // return ans
- // }
- i = abs(Random().nextInt()) % workers
- if(sizeGroups.value == 1) {
- if (loksStatus[i].tryLock()) {
- try {
- if (qu[i].size > 0) {
- ans = qu[i].remove()
- deletedSomethink = true
- if (qu[i].size == 0)
- sizeGroups.decrementAndGet()
- queueSzie.decrementAndGet()
- }
- } finally {
- loksStatus[i].unlock()
- }
- }
- if (deletedSomethink)
- return ans
- continue
- }
- j = abs(Random().nextInt()) % workers
- if(i == j)
- continue
- if (loksStatus[i].tryLock()) {
- try {
- if(loksStatus[j].tryLock()) {
- try {
- if (qu[i].size > 0 && qu[j].size > 0) {
- if(qu[i].peek().distance > qu[j].peek().distance){
- ans = qu[j].remove()
- if(qu[j].size == 0)
- sizeGroups.decrementAndGet()
- } else{
- ans = qu[i].remove()
- if(qu[i].size == 0)
- sizeGroups.decrementAndGet()
- }
- deletedSomethink = true
- queueSzie.decrementAndGet()
- }
- } finally {
- loksStatus[j].unlock();
- }
- }
- } finally {
- loksStatus[i].unlock()
- }
- }
- if(deletedSomethink) {
- return ans
- }
- }
- return null
- }
- fun add(x : Node){
- var myShuffleList = myList.shuffled(Random())
- var i = 0
- var flag = true
- while (flag){
- // i++
- // if(i == workers)
- // i = 0;
- // if (loksStatus[myShuffleList[i]].tryLock()) {
- // try {
- // if(qu[myShuffleList[i]].size == 0){
- // sizeGroups.incrementAndGet()
- // }
- // queueSzie.incrementAndGet()
- // qu[myShuffleList[i]].add(x)
- // flag = false
- // } finally {
- // loksStatus[myShuffleList[i]].unlock()
- //
- // }
- // }
- i = abs(Random().nextInt()) % workers
- if (loksStatus[i].tryLock()) {
- try {
- if(qu[i].size == 0){
- sizeGroups.incrementAndGet()
- }
- queueSzie.incrementAndGet()
- qu[i].add(x)
- flag = false
- } finally {
- loksStatus[i].unlock()
- }
- }
- }
- }
- fun isEmpty() : Boolean {
- var result : Boolean
- result = true
- if(queueSzie.value > 0) {
- result = false
- }
- return result
- }
- }
- fun shortestPathParallel(start: Node) {
- val workers = Runtime.getRuntime().availableProcessors()
- //val workers = 1
- // The distance to the start node is `0`
- start.distance = 0
- // Create a priority (by distance) queue and add the start node into it
- //val q = PriorityQueue(workers, NODE_DISTANCE_COMPARATOR) // TODO replace me with a multi-queue based PQ!
- val q = MyQueue(workers, NODE_DISTANCE_COMPARATOR)
- q.add(start)
- // Run worker threads and wait until the total work is done
- val onFinish = Phaser(workers + 1) // `arrive()` should be invoked at the end by each worker
- repeat(workers) {
- thread {
- while (!q.isEmpty()) {
- val cur: Node? = q.pop()
- if (cur == null) {
- continue
- }
- for (e in cur.outgoingEdges) {
- while (true) {
- val olde = e.to.distance
- val oldcur = cur.distance
- if(olde > oldcur + e.weight) {
- if (e.to.casDistance(olde, oldcur + e.weight)) {
- q.add(e.to)
- break
- }
- continue
- }
- break
- }
- }
- }
- // TODO Write the required algorithm here,
- // TODO break from this loop when there is no more node to process.
- // TODO Be careful, "empty queue" != "all nodes are processed".
- onFinish.arrive()
- }
- }
- onFinish.arriveAndAwaitAdvance()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement