Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kivy.app import App
- from kivy.uix.gridlayout import GridLayout
- from kivy.uix.relativelayout import RelativeLayout
- from kivy.uix.button import Button
- from kivy.uix.label import Label
- from kivy.uix.widget import Widget
- from kivy.graphics import Ellipse, Color
- from simulations import Simulation, FailureSimulation
- # # activate FPS counter
- # from kivy.config import Config
- # Config.set('modules', 'monitor', '')
- def update_preview(preview, simulation):
- try:
- for widget in simulation.node_widgets + simulation.target_widgets:
- clear_map_widget(preview=preview, widget=widget)
- except AttributeError:
- pass
- targets = [(target.name, target.position) for target in simulation.targets.active]
- nodes = [(node.name, node.position) for node in simulation.nodes.active]
- width = preview.width
- height = preview.height
- mapped_targets = [
- (name, (position.x * width / 100, position.y * height / 100))
- for name, position in targets
- ]
- mapped_nodes = [
- (name, (position.x * width / 100, position.y * height / 100))
- for name, position in nodes
- ]
- # TODO: Change adding new Widgets to getting old widgets
- simulation.node_widgets = [NodeWidget(pos=position, id=name)
- for name, position in mapped_nodes]
- for node_widget in simulation.node_widgets:
- preview.add_widget(node_widget)
- # TODO: Change removing and adding widgets for stationary targets.
- simulation.target_widgets = [TargetWidget(pos=position, id=name)
- for name, position in mapped_targets]
- for target_widget in simulation.target_widgets:
- preview.add_widget(target_widget)
- def clear_map_widget(preview, widget):
- preview.remove_widget(widget)
- widget.canvas.clear()
- del widget
- def move_map_widget(widget, position):
- widget.pos = position
- widget.ellipse.pos = position
- class NodeWidget(Widget):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.canvas.clear()
- self.ellipse_color = Color(1., 1., 1.)
- self.canvas.add(self.ellipse_color)
- self.ellipse = Ellipse(size=(5, 5), pos=self.pos)
- self.canvas.add(self.ellipse)
- class TargetWidget(Widget):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.canvas.clear()
- self.ellipse_color = Color(1., 0, 0)
- self.canvas.add(self.ellipse_color)
- self.ellipse = Ellipse(size=(5, 5), pos=self.pos)
- self.canvas.add(self.ellipse)
- class SimulationPreviewLayout(RelativeLayout):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- class SimulationMenuLayout(GridLayout):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.cols = 1
- self.rows = 4
- self.padding = 10
- self.spacing = 10
- self.name_label = Label(text='Menu', size_hint_y=0.1)
- self.add_widget(self.name_label)
- self.restart_button = Button(text='Start simulation', size_hint_y=0.1)
- self.restart_button.on_press = self.restart_simulation
- self.add_widget(self.restart_button)
- self.step_button = Button(text='Step', size_hint_y=0.1)
- self.step_button.on_press = self.step_simulation
- self.step_button.disabled = True
- self.add_widget(self.step_button)
- self.empty_space = Label(size_hint_y=0.7)
- self.add_widget(self.empty_space)
- def step_simulation(self, steps=1):
- simulation = self.parent.simulation
- for _ in range(steps):
- if simulation.ended is not True:
- simulation.run_next_turn(spawn=True)
- preview = self.parent.preview
- update_preview(preview=preview, simulation=simulation)
- def restart_simulation(self):
- self.restart_button.text = 'Restart simulation'
- self.step_button.disabled = False
- self.parent.simulation = FailureSimulation(nodes=100,
- target_spawn_rate=18,
- targets=200,
- node_load=10)
- self.simulation = self.parent.simulation
- self.simulation.setup()
- self.simulation.target_widgets = []
- self.simulation.node_widgets = []
- self.simulation.run_next_turn(spawn=False)
- preview = self.parent.preview
- update_preview(preview=preview, simulation=self.simulation)
- class StepSimulationScreen(GridLayout):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.cols = 2
- self.rows = 1
- self.padding = 10
- self.menu = SimulationMenuLayout(size_hint_x=1/4)
- self.add_widget(self.menu)
- self.preview = SimulationPreviewLayout(size_hint_x=3/4)
- self.add_widget(self.preview)
- class StepSimulationApp(App):
- title = 'Basic Simulation'
- def build(self):
- return StepSimulationScreen()
- if __name__ == '__main__':
- StepSimulationApp().run()
- import random
- from nodes import Node
- from targets import Target
- class NodeManager:
- def __init__(self):
- self._objects = []
- self._active_objects = []
- @property
- def active(self):
- return self._active_objects
- @property
- def all(self):
- return self._objects
- @property
- def active_count(self):
- return len(self.active)
- @property
- def count(self):
- return len(self.all)
- # def activate(self, node):
- # if node.is_active is False:
- # node.is_active = True
- # self._active_objects.append(node)
- def deactivate(self, node):
- if node.is_active is True:
- node.is_active = False
- self._active_objects.remove(node)
- def create(self, *args, **kwargs):
- new_node = Node(*args, **kwargs, name=f'Node{self.active_count + 1}')
- self._objects.append(new_node)
- if new_node.is_active:
- self._active_objects.append(new_node)
- return new_node
- def random(self, number=1, active=True):
- if active is True:
- node_pool = self._active_objects
- else:
- node_pool = self._objects
- if number > self.count:
- return self._objects
- return [random.choice(node_pool) for _ in range(number)]
- # def delete(self, node):
- # if node in self._objects:
- # self._objects.remove(node)
- #
- # if node in self._active_objects:
- # self._active_objects.remove(node)
- def closest(self, position):
- nodes = {
- node: node.distance(position)
- for node in self.active
- if node.target_position is None
- and node.is_in_range(position)
- }
- sorted_nodes = sorted(nodes, key=nodes.get)
- return sorted_nodes
- def nodes_in_range(self, position, exclude=True):
- nodes = [node for node in self.active if node.is_in_range(position)]
- if exclude is True:
- for node in nodes:
- if node.position == position:
- nodes.remove(node)
- return nodes
- class TargetManager:
- def __init__(self):
- self._objects = []
- self._active_objects = []
- @property
- def active(self):
- return self._active_objects
- @property
- def all(self):
- return self._objects
- @property
- def active_count(self):
- return len(self.active)
- @property
- def count(self):
- return len(self.all)
- # def activate(self, target):
- # if target.is_active is False:
- # target.is_active = True
- # self._active_objects.append(target)
- def deactivate(self, target, caught=True):
- if target.is_active is True:
- target.is_active = False
- target.caught = caught
- self._active_objects.remove(target)
- def create(self, *args, **kwargs):
- new_target = Target(*args, **kwargs, name=f'Target{self.active_count}')
- self._objects.append(new_target)
- if new_target.is_active:
- self._active_objects.append(new_target)
- return new_target
- def delete(self, target):
- if target in self._objects:
- self._objects.remove(target)
- if target in self._active_objects:
- self._active_objects.remove(target)
- from environments import Environment
- from helpers import random_position
- from managers import NodeManager, TargetManager
- class Simulation:
- """
- Simulations are being run as a part of the study.
- """
- environment = None
- ended = False
- environment_model = Environment
- def __init__(self, nodes=20, node_load=10,
- targets=30, target_spawn_rate=4):
- # Nodes
- self.node_load = node_load
- self.node_number = nodes
- # Targets
- self.target_spawn_rate = target_spawn_rate
- self.target_number = targets
- self.targets_to_spawn = targets
- self.max_lifetime = 300
- # Managers
- self.nodes = NodeManager()
- self.targets = TargetManager()
- # Environment
- self.create_environment(nodes=self.nodes, targets=self.targets)
- self.turn = 0
- def create_environment(self, nodes, targets):
- self.environment = self.environment_model(nodes=nodes, targets=targets)
- def run(self): # pragma: no cover
- self.setup()
- while not self.ended:
- self.run_next_turn()
- if self.turn % 100 == 0:
- print(f'Simulation turn: {self.turn}.')
- self.save_results()
- print(f'Simulation ran successfully! Results has been saved.')
- def run_next_turn(self, spawn=True):
- if spawn is True:
- self.spawn_target()
- self.nodes_pursue()
- self.move_nodes()
- self.age_targets()
- self.end_simulation_if_no_more_targets()
- self.turn += 1
- def nodes_pursue(self):
- """
- For every target that is not being pursued node if searched.
- If node if found- pursuit is established. If target is within
- reach of node, it is deactivated, node is moved and pursuit
- is ended. Node later on will be normally moved by loads.
- """
- for target in self.targets.active:
- # set pursuits for active targets if not set already
- if target.pursued_by is None:
- # check all nodes in range, is any is found- stop searching
- for node in self.nodes.closest(target.position):
- if node.target_position is None:
- target.pursued_by = node
- node.target_position = target.position
- break
- def move_nodes(self):
- """
- After nodes start pursuits, if they have a target they are moved
- towards the target. Otherwise they are moved by loads(forces).
- """
- nodes_with_forces = dict()
- for node in self.nodes.active:
- # calculate forces by loads if node has no target
- if node.target_position is None:
- force = self.environment.calculate_force(node)
- nodes_with_forces[node] = force
- # move targets towards target position if it is set
- else:
- target = self.environment.get_target_by_position(node.target_position)
- node.move_to_position(target.position)
- if node.distance(target.position) < node.speed:
- self.targets.deactivate(target)
- node.target_position = None
- # apply calculated forces
- for node, force in nodes_with_forces.items():
- node.move_by_force(force, map_size=self.environment.map_size)
- def setup(self):
- map_size = self.environment.map_size
- for _ in range(self.node_number):
- self.nodes.create(load=self.node_load,
- sight_range=self.environment.range,
- position=random_position(1, map_size - 1),
- speed=0.2)
- def spawn_target(self):
- """
- Spawns target if there are still targets to be spawned and if spawn
- rate aligns with current turn.
- """
- map_size = self.environment.map_size
- if self.turn % self.target_spawn_rate == 0 and self.targets_to_spawn > 0:
- target_position = random_position(1, map_size - 1)
- self.targets.create(position=target_position)
- self.targets_to_spawn -= 1
- def age_targets(self):
- for target in self.targets.active:
- target.age(1)
- if target.lifetime == self.max_lifetime:
- self.targets.deactivate(target, caught=False)
- # remove pursuer's target_position if given target is too old
- if target.pursued_by is not None:
- node = target.pursued_by
- target.pursued_by = None
- node.target_position = None
- def end_simulation_if_no_more_targets(self):
- if self.targets_to_spawn == 0 and self.targets.active_count == 0:
- self.ended = True
- def save_results(self): # pragma: no cover
- file_name = 'results.txt'
- with open(file_name, 'w+') as results:
- cumulative_lifespan = sum([target.lifetime for target in self.targets.all])
- average_lifespan = cumulative_lifespan / self.targets.count
- uncaught_targets = len([target for target in self.targets.all
- if target.caught is False])
- results.write(f'Simulation ended in turn {self.turn}.\n')
- results.write(f'Cumulative target lifespan was: {cumulative_lifespan}.\n')
- results.write(f'Average target lifespan was: {average_lifespan}.\n')
- results.write(f'Uncaught targets: {uncaught_targets}.\n')
- for node in self.nodes.all:
- results.write(str(node) + '\n')
- for target in self.targets.all:
- results.write(str(target) + '\n')
- class FailureSimulation(Simulation):
- def setup(self, number=0.5):
- super().setup()
- self.partial_failure(number=number)
- def partial_failure(self, number=None):
- if number is None:
- failing_nodes = self.nodes.random(number=int(0.5 * self.nodes.active_count),
- active=True)
- elif number < 1:
- failing_nodes = self.nodes.random(number=int(number * self.nodes.active_count),
- active=True)
- else:
- failing_nodes = self.nodes.random(number=number,
- active=True)
- # Call deactivate for all nodes
- return [self.nodes.deactivate(node) for node in failing_nodes]
- import random
- import pytest
- from simulations import Simulation, FailureSimulation
- from helpers import random_position, Position
- class TestSimulation:
- def test_creating_simulation(self):
- simulation = Simulation()
- assert simulation is not None
- assert simulation.environment is not None
- assert simulation.ended is False
- def test_end_simulation(self):
- simulation = Simulation(targets=0)
- simulation.end_simulation_if_no_more_targets()
- assert simulation.ended is True
- def test_setup(self):
- node_number = 10
- simulation = Simulation(nodes=node_number)
- simulation.setup()
- assert simulation.node_number == node_number
- assert simulation.nodes.count == node_number
- def test_spawn_target_on_starting_turn(self):
- simulation = Simulation(target_spawn_rate=10)
- simulation.spawn_target()
- assert simulation.targets.count == 1
- assert simulation.targets.active_count == 1
- def test_spawn_target_with_not_aligned_turn(self):
- simulation = Simulation(target_spawn_rate=10)
- simulation.turn = 5
- simulation.spawn_target()
- assert simulation.targets.count == 0
- assert simulation.targets.active_count == 0
- def test_spawn_target_with_aligned_turn(self):
- simulation = Simulation(target_spawn_rate=1)
- simulation.turn = 5
- simulation.spawn_target()
- assert simulation.targets.active_count == 1
- assert simulation.targets.count == 1
- def test_spawn_target_random_positions(self):
- # set seed for random position function
- random.seed(123)
- simulation = Simulation(target_spawn_rate=10)
- simulation.spawn_target()
- simulation.spawn_target()
- map_size = simulation.environment.map_size
- # restart seed to get the same values
- random.seed(123)
- position1 = random_position(1, map_size - 1)
- position2 = random_position(1, map_size - 1)
- targets = simulation.targets.active
- target1, target2 = targets
- assert target1.position == position1
- assert target2.position == position2
- assert target1.position != target2.position
- def test_nodes_pursue_when_node_is_close(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(50, 51))
- target = simulation.targets.create(position=(50, 50))
- node_target_before = node.target_position
- simulation.nodes_pursue()
- node_target_after = node.target_position
- assert target in simulation.targets.all
- assert target in simulation.targets.active
- assert node_target_before is None
- assert node_target_after == target.position
- assert target.pursued_by is node
- def test_nodes_pursue_when_node_is_away(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(90, 90))
- target = simulation.targets.create(position=(50, 50))
- node_target_before = node.target_position
- simulation.nodes_pursue()
- node_target_after = node.target_position
- assert target in simulation.targets.active
- assert node_target_before is None
- assert node_target_after is None
- assert target.pursued_by is None
- def test_nodes_pursue_when_target_is_pursued(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(50, 51))
- pursuer = simulation.nodes.create(position=(52, 52))
- target = simulation.targets.create(position=(50, 50))
- target.pursued_by = pursuer
- node_target_before = node.target_position
- simulation.nodes_pursue()
- node_target_after = node.target_position
- assert target in simulation.targets.active
- assert node_target_before is None
- assert node_target_after is None
- assert target.pursued_by is pursuer
- def test_age_targets(self):
- simulation = Simulation()
- target1 = simulation.targets.create(position=(50, 50))
- simulation.age_targets()
- target2 = simulation.targets.create(position=(50, 50))
- simulation.age_targets()
- assert target1.lifetime == 2
- assert target2.lifetime == 1
- def test_age_targets_deactivate_target_without_pursuer(self):
- simulation = Simulation()
- target_old = simulation.targets.create(position=(50, 50))
- target_old.lifetime = simulation.max_lifetime - 1
- simulation.age_targets()
- assert target_old.caught is False
- assert target_old in simulation.targets.all
- assert target_old not in simulation.targets.active
- assert target_old.lifetime == simulation.max_lifetime
- def test_age_targets_deactivate_target_with_pursuer(self):
- simulation = Simulation()
- pursuer = simulation.nodes.create(position=(50, 50))
- target_old_pursued = simulation.targets.create(position=(50, 50))
- target_old_pursued.lifetime = simulation.max_lifetime - 1
- pursuer.target_position = target_old_pursued.position
- target_old_pursued.pursued_by = pursuer
- simulation.age_targets()
- assert target_old_pursued.caught is False
- assert target_old_pursued in simulation.targets.all
- assert target_old_pursued not in simulation.targets.active
- assert target_old_pursued.lifetime == simulation.max_lifetime
- assert target_old_pursued.pursued_by is None
- assert pursuer.target_position is None
- def test_move_nodes_by_loads(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(60, 60))
- node_position_before_x = node.position.x
- node_position_before_y = node.position.y
- simulation.move_nodes()
- assert node.position.x < node_position_before_x
- assert node.position.y < node_position_before_y
- def test_move_nodes_by_loads_equilibrium(self):
- simulation = Simulation()
- position1 = (simulation.environment.map_size / 3,
- 50)
- position2 = (simulation.environment.map_size * 2 / 3,
- 50)
- node1 = simulation.nodes.create(position=position1)
- node2 = simulation.nodes.create(position=position2)
- simulation.move_nodes()
- assert node1.position.y == Position(*position1).y
- assert node2.position.y == Position(*position2).y
- def test_move_nodes_by_loads_with_moving_nodes(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(50, 50))
- target = simulation.targets.create(position=(90, 90))
- node_moving = simulation.nodes.create(position=(45, 45))
- node_moving.target_position = target.position
- node_position_before = Position(*node.position)
- simulation.move_nodes()
- assert node.position != node_position_before
- def test_move_nodes_deactivate_target(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(50, 50))
- target = simulation.targets.create(position=(51, 51))
- node.target_position = target.position
- simulation.move_nodes()
- assert node.position == target.position
- assert node.target_position is None
- def test_move_nodes_move_towards_target(self):
- simulation = Simulation()
- node = simulation.nodes.create(position=(50, 50))
- target = simulation.targets.create(position=(90, 90))
- node.target_position = target.position
- node_position_before = Position(node.position.x, node.position.y)
- simulation.move_nodes()
- assert node.position.x > node_position_before.x
- assert node.position.y > node_position_before.y
- assert node.target_position is target.position
- def test_run_next_turn(self):
- simulation = Simulation()
- node1 = simulation.nodes.create(position=(50, 50),
- sight_range=20,
- speed=2)
- node2 = simulation.nodes.create(position=(49, 49),
- sight_range=20,
- speed=2)
- target = simulation.targets.create(position=(55, 55))
- turn_before = simulation.turn
- target_age_before = target.lifetime
- node1_position_before = Position(*node1.position)
- node2_position_before = Position(*node2.position)
- simulation.run_next_turn()
- assert simulation.turn == turn_before + 1
- assert simulation.ended is False
- assert target.lifetime == target_age_before + 1
- assert node1.position != node1_position_before
- assert node1.target_position == target.position
- assert node2.position != node2_position_before
- assert node2.target_position is None
- def test_run_next_turn_if_no_more_targets(self):
- simulation = Simulation(targets=0)
- simulation.nodes.create(position=(50, 50), sight_range=20, speed=2)
- simulation.run_next_turn()
- assert simulation.ended is True
- def test_run_next_turn_if_0_targets_but_more_to_spawn(self):
- simulation = Simulation(targets=2, target_spawn_rate=200)
- simulation.nodes.create(position=(50, 50), sight_range=20, speed=2)
- simulation.run_next_turn()
- simulation.run_next_turn()
- target = simulation.targets.all[0]
- simulation.targets.delete(target)
- assert simulation.ended is False
- def test_run_next_turn_with_spawn_false(self):
- simulation = Simulation(targets=1, target_spawn_rate=1)
- simulation.nodes.create(position=(50, 50), sight_range=20, speed=2)
- simulation.run_next_turn(spawn=False)
- simulation.run_next_turn(spawn=False)
- simulation.run_next_turn(spawn=False)
- assert simulation.ended is False
- class TestFailureSimulation:
- def test_setup_without_percentage(self):
- simulation = FailureSimulation(target_spawn_rate=200, nodes=10)
- simulation.setup()
- inactive_nodes = [node for node in simulation.nodes.all if
- node.is_active is False]
- assert len(inactive_nodes) == 5
- assert simulation.nodes.active_count == 5
- def test_setup_with_percentage(self):
- simulation = FailureSimulation(target_spawn_rate=200, nodes=10)
- simulation.setup(number=0.3)
- inactive_nodes = [node for node in simulation.nodes.all if
- node.is_active is False]
- assert len(inactive_nodes) == 3
- assert simulation.nodes.active_count == 7
- def test_partial_failure(self):
- simulation = FailureSimulation(target_spawn_rate=200, nodes=10)
- simulation.partial_failure()
- inactive_nodes = [node for node in simulation.nodes.all if
- node.is_active is False]
- assert len(inactive_nodes) == 5
- assert simulation.nodes.active_count == 5
- def test_partial_failure_with_percentage(self):
- simulation = FailureSimulation(target_spawn_rate=200, nodes=10)
- simulation.partial_failure(number=0.3)
- inactive_nodes = [node for node in simulation.nodes.all if
- node.is_active is False]
- assert len(inactive_nodes) == 3
- assert simulation.nodes.active_count == 7
- def test_partial_failure_with_number(self):
- simulation = FailureSimulation(target_spawn_rate=200, nodes=10)
- simulation.partial_failure(3)
- inactive_nodes = [node for node in simulation.nodes.all if
- node.is_active is False]
- assert len(inactive_nodes) == 3
- assert simulation.nodes.active_count == 7
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement