Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.collection.mutable
- import scala.io.Source
- import java.util
- object Main {
- def main(args: Array[String]) {
- val sw = System.currentTimeMillis()
- val lines = Source.fromFile( """c:\data\data.swf""").getLines()
- val maxProcessors = lines.find(_.contains("MaxProcs")).get.split(":\\s+")(1).toInt
- val data = lines.dropWhile(_.startsWith(";")).map(parseSWFJob)
- val scheduler = new FCFSScheduler(numProcessors = maxProcessors)
- var sum = 0l
- val results = data.map(scheduler.notify).flatMap(_ => scheduler.getDoneJobs)
- for(job <- results) {
- sum += job.startTime.get - job.submitTime
- }
- scheduler.noMoreJobs()
- for(job <- scheduler.getDoneJobs) {
- sum += job.startTime.get - job.submitTime
- }
- println("Schedule difference " + sum)
- println("Time diff millis" + (System.currentTimeMillis() - sw))
- }
- def parseSWFJob(str:String) = {
- val xs = str.trim.split("\\s+").map(_.trim)
- new Job(xs(0).toLong, xs(1).toLong, xs(3).toLong, None, xs(4).toLong)
- }
- }
- class Job(jobId:Long ,submittedTime:Long,jobRuntime:Long, jobStartTime: Option[Long], jobRequiredProcessors: Long){
- val (id, submitTime, runtime, requiredProcessors) = (jobId, submittedTime, jobRuntime, jobRequiredProcessors)
- var startTime = jobStartTime
- override def toString = s"(id=$id,submit=$submitTime,run=$runtime,start=$startTime,procs=$requiredProcessors)"
- }
- class DoneJob(doneTime:Long, doneJob:Job) extends Ordered[DoneJob]{
- val (job, time) = (doneJob, doneTime)
- override def compare(that: DoneJob): Int = -time.compare(that.time)
- }
- class FCFSScheduler(numProcessors:Long) extends JobScheduler {
- private val nextDoneTimes = new mutable.PriorityQueue[DoneJob]()
- private val nextJobs = new mutable.Queue[Job]()
- private var (usedProcessors, currentTime, maxTime) = (0l, 0l, 0l)
- override def notify(j: Job) = {
- nextJobs.enqueue(j)
- maxTime = j.submitTime
- }
- override def getDoneJobs: Iterable[Job] = {
- val doneJobs = new mutable.Queue[Job]()
- while(nextJobs.nonEmpty && currentTime <= maxTime){
- val canRunNextJob = nextJobs.head.requiredProcessors <= availableProcessors
- val (nextJobStart, nextDone) = (nextJobs.head.submitTime, nextDoneTimes.headOption.map(_.time))
- if((nextJobStart max nextDone.getOrElse(Long.MinValue)) > maxTime) return doneJobs
- val doneFirst = nextDone.getOrElse(Long.MaxValue) < nextJobStart
- if(canRunNextJob && !doneFirst){
- val job = nextJobs.dequeue()
- usedProcessors += job.requiredProcessors
- currentTime = currentTime max job.submitTime
- job.startTime = Option(currentTime)
- doneJobs.enqueue(job)
- nextDoneTimes.enqueue(new DoneJob(currentTime+job.runtime, job))
- }else if(nextDoneTimes.nonEmpty){
- if(nextDoneTimes.isEmpty) throw new RuntimeException("Impossible to run jobs " + canRunNextJob + " " + doneFirst + " ")
- val entry = nextDoneTimes.dequeue()
- if(currentTime > entry.time) throw new RuntimeException(s"Sanity check failed, currentTime > entry.time ")
- currentTime = entry.time
- usedProcessors -= entry.job.requiredProcessors
- } else {
- return doneJobs
- }
- }
- doneJobs
- }
- private def availableProcessors = numProcessors - usedProcessors
- override def noMoreJobs() = maxTime = Long.MaxValue
- }
- trait JobScheduler {
- def notify(j: Job)
- def getDoneJobs: Iterable[Job]
- def noMoreJobs()
- }
Advertisement
Add Comment
Please, Sign In to add comment