Guest User

Untitled

a guest
Nov 10th, 2014
162
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.43 KB | None | 0 0
  1. import scala.collection.mutable
  2. import scala.io.Source
  3. import java.util
  4.  
  5. object Main {
  6.   def main(args: Array[String]) {
  7.     val sw = System.currentTimeMillis()
  8.     val lines = Source.fromFile( """c:\data\data.swf""").getLines()
  9.     val maxProcessors = lines.find(_.contains("MaxProcs")).get.split(":\\s+")(1).toInt
  10.     val data = lines.dropWhile(_.startsWith(";")).map(parseSWFJob)
  11.     val scheduler = new FCFSScheduler(numProcessors = maxProcessors)
  12.     var sum = 0l
  13.     val results = data.map(scheduler.notify).flatMap(_ => scheduler.getDoneJobs)
  14.     for(job <- results) {
  15.         sum += job.startTime.get - job.submitTime
  16.     }
  17.     scheduler.noMoreJobs()
  18.     for(job <- scheduler.getDoneJobs) {
  19.       sum += job.startTime.get - job.submitTime
  20.     }
  21.     println("Schedule difference " + sum)
  22.     println("Time diff millis" + (System.currentTimeMillis() - sw))
  23.   }
  24.   def parseSWFJob(str:String) = {
  25.     val xs = str.trim.split("\\s+").map(_.trim)
  26.     new Job(xs(0).toLong, xs(1).toLong, xs(3).toLong, None, xs(4).toLong)
  27.   }
  28. }
  29. class Job(jobId:Long ,submittedTime:Long,jobRuntime:Long, jobStartTime: Option[Long], jobRequiredProcessors: Long){
  30.   val (id, submitTime, runtime, requiredProcessors) = (jobId, submittedTime, jobRuntime, jobRequiredProcessors)
  31.   var startTime = jobStartTime
  32.   override def toString = s"(id=$id,submit=$submitTime,run=$runtime,start=$startTime,procs=$requiredProcessors)"
  33. }
  34. class DoneJob(doneTime:Long, doneJob:Job) extends Ordered[DoneJob]{
  35.   val (job, time) = (doneJob, doneTime)
  36.   override def compare(that: DoneJob): Int = -time.compare(that.time)
  37. }
  38. class FCFSScheduler(numProcessors:Long) extends JobScheduler {
  39.   private val nextDoneTimes = new mutable.PriorityQueue[DoneJob]()
  40.   private val nextJobs = new mutable.Queue[Job]()
  41.   private var (usedProcessors, currentTime, maxTime) = (0l, 0l, 0l)
  42.   override def notify(j: Job) = {
  43.     nextJobs.enqueue(j)
  44.     maxTime = j.submitTime
  45.   }
  46.   override def getDoneJobs: Iterable[Job] = {
  47.     val doneJobs = new mutable.Queue[Job]()
  48.     while(nextJobs.nonEmpty && currentTime <= maxTime){
  49.       val canRunNextJob = nextJobs.head.requiredProcessors <= availableProcessors
  50.       val (nextJobStart, nextDone) = (nextJobs.head.submitTime, nextDoneTimes.headOption.map(_.time))
  51.       if((nextJobStart max nextDone.getOrElse(Long.MinValue)) > maxTime) return doneJobs
  52.       val doneFirst = nextDone.getOrElse(Long.MaxValue) < nextJobStart
  53.       if(canRunNextJob && !doneFirst){
  54.         val job = nextJobs.dequeue()
  55.         usedProcessors += job.requiredProcessors
  56.         currentTime = currentTime max job.submitTime
  57.         job.startTime = Option(currentTime)
  58.         doneJobs.enqueue(job)
  59.         nextDoneTimes.enqueue(new DoneJob(currentTime+job.runtime, job))
  60.       }else if(nextDoneTimes.nonEmpty){
  61.         if(nextDoneTimes.isEmpty) throw new RuntimeException("Impossible to run jobs " + canRunNextJob + " " + doneFirst + " ")
  62.         val entry = nextDoneTimes.dequeue()
  63.         if(currentTime > entry.time) throw new RuntimeException(s"Sanity check failed, currentTime > entry.time ")
  64.         currentTime = entry.time
  65.         usedProcessors -= entry.job.requiredProcessors
  66.       } else {
  67.         return doneJobs
  68.       }
  69.     }
  70.     doneJobs
  71.   }
  72.   private def availableProcessors = numProcessors - usedProcessors
  73.   override def noMoreJobs() = maxTime = Long.MaxValue
  74. }
  75. trait JobScheduler {
  76.   def notify(j: Job)
  77.   def getDoneJobs: Iterable[Job]
  78.   def noMoreJobs()
  79. }
Advertisement
Add Comment
Please, Sign In to add comment