Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package uebung03
- import robomine.Simulation
- import scala.util.Try
- import robomine.RobotControls
- import scala.Array.canBuildFrom
- import rx.lang.scala.Observable
- import rx.lang.scala.Subscription
- import scala.concurrent.duration._
- import rx.lang.scala.Subject
- import rx.lang.scala.Observer
- import scala.concurrent.duration._
- import scala.language.postfixOps
- import scala.util.Random
- object Uebung03 extends App {
- val mine = Simulation.start(teams = 1)
- val t0 = System.currentTimeMillis()
- mine.onReady { robots =>
- val eventBus = Subject[Double]()
- RobotBehavior.create(eventBus)(robots.last)
- }
- mine.onGoldCollected { amount =>
- if (amount > 100) {
- val time = (System.currentTimeMillis() - t0).toDouble / 1000
- println(f"yeehaw! it took $time%.3f seconds")
- mine.stop()
- }
- }
- }
- case class Length(val m: Double) extends AnyVal {
- def -(that: Length) = Length(this.m - that.m)
- def +(that: Length) = Length(this.m + that.m)
- }
- case class Angle(val rad: Double) extends AnyVal {
- def normal = rad + (Math.PI % (2 * Math.PI) + (2 * Math.PI)) % (2 * Math.PI) - Math.PI
- def unit = Vector(Length(Math.cos(rad)),Length(Math.sin(rad)))
- def +(that: Angle) = Angle(this.rad + that.rad)
- def -(that: Angle) = Angle(this.rad - that.rad)
- }
- object Angle {
- def fromDeg(deg: Double) = Angle(deg / (180 / Math.PI))
- }
- case class Vector(north: Length, west: Length) {
- def to (that: Vector) = Vector(that.north - this.north, that.west - this.west)
- def angle = Angle(Math.atan2(west.m, north.m) - Math.PI * 0.5)
- }
- case class Laser(left: Double, center: Double, right: Double)
- object RobotBehavior {
- implicit class PimpedObservable[T](val underlying: Observable[T]) extends AnyVal {
- def foreach(f: T => Unit) = underlying.subscribe(f,e => println(e),() => println("completed"))
- }
- def create(eventBus: Subject[Double])(controls: RobotControls) {
- object sensors {
- def powerSave[T](delay: Duration)(sensor: => Observable[T]) = Observable.defer(sensor).delaySubscription(delay).first.repeat
- val sensorScheduler = rx.lang.scala.schedulers.TrampolineScheduler()
- private def sensor[T](name: String)(f: String => T) = Observable.create[T]{ obs =>
- val handler = (x: String) => obs.onNext(f(x))
- controls.addEventListener(name, handler)
- Subscription(controls.removeEventListener(name, handler))
- }.observeOn(sensorScheduler)
- def gps = sensor[Vector]("gps"){ raw =>
- val Array(n,w) = raw.split(" ")
- Vector(Length(n.init.toDouble), Length(w.init.toDouble))
- }
- def batteryLevel = sensor[Double]("batteryLevel")(raw => raw.init.toDouble / 100)
- def compass = sensor[Angle]("compass")(raw => Angle.fromDeg(raw.init.toDouble))
- def gyroscope = sensor[Angle]("gyroscope")(raw => Angle.fromDeg(raw.toDouble))
- def accelerometer = sensor[Vector]("accelerometer"){ raw =>
- val Array(n,w) = raw.split(" ")
- Vector(Length(n.toDouble), Length(w.toDouble))
- }
- def laser = sensor[Laser]("laser"){ raw =>
- val Array(l,c,r) = raw.split(" ")
- Laser(l.toDouble, c.toDouble, r.toDouble)
- }
- def goldDetector = sensor[Double]("goldDetector")(raw => raw.toDouble)
- }
- object outputs {
- private def output(name: String) = Observer[Double](
- (level: Double) => controls.setPowerLevel(name, level),
- () => controls.setPowerLevel(name, 0.0))
- def leftWheel = output("leftWheel")
- def rightWheel = output("rightWheel")
- object laser {
- def left = output("laserL")
- def center = output("laserC")
- def right = output("laserR")
- }
- }
- val orientation =
- sensors.powerSave(2 seconds)(sensors.compass).map(keyframe => sensors.gyroscope.scan(keyframe)(_ + _)).switch
- def steer(speed: Observable[Double], desiredDirection: Observable[Angle]) = {
- val c = orientation.combineLatest[Angle,Double](desiredDirection, ((a,r) => (r-a).normal / Math.PI ))
- speed.combineLatest[Double,Double](c, (s,c) => s + s * -c).subscribe(outputs.leftWheel)
- speed.combineLatest[Double,Double](c, (s,c) => s + s * c).subscribe(outputs.rightWheel)
- }
- def goTo(pos: Observable[Vector]) = {
- def desiredDirection = sensors.gps.combineLatest[Vector,Angle](pos, (gps,pos) => (pos to gps).angle)
- steer(Observable.items(1),desiredDirection)
- }
- def criticalBattery =
- sensors.batteryLevel.filter(x => x < 0.4 || x > 0.95).map(_ < 0.5)
- .distinctUntilChanged
- val basePosition = Vector(Length(60),Length(60))
- goTo(criticalBattery.map(if (_) basePosition else Vector(Length(0),Length(0))))
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement