Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import utils
- sched = utils.Scheduler()
- import wws_lite
- from IPython.display import Image
- import pprint
- pp = pprint.PrettyPrinter(indent=4)
- import math
- from time import sleep
- import matplotlib.pyplot as plt
- import urllib.request
- import numpy as np
- import orspy
- import pickle
- class State:
- def __init__(self, sched, simulated=False):
- self.sched = sched
- # self.sched.add_robot('codebox1', utils.bottom_I_close2wall)
- self.waypoints = [
- {"coord": orspy.EulerPose(64, 96, 0, 0, 0, 0)},
- {"coord": orspy.EulerPose(69, 95, 0, 0, 0, 0)},
- {"coord": orspy.EulerPose(68.5, 104, 0, 0, 0, 0)},
- {"coord": orspy.EulerPose(65, 108, 0, 0, 0, 0)},
- {"coord": orspy.EulerPose(71, 113, 0, 0, 0, 0)},
- ] # list of positions
- self.simulated = simulated
- sched.use_simulated(simulated)
- self.poses = []
- self.images = []
- self.get_image = False
- def navigate_to(self, location):
- return self.sched.goto(location,
- allowed_trans_error=0.5, allowed_rot_error=6.)
- def rotate(self):
- steps = 4
- for _ in range(steps * 2):
- sched.rotate(speed=math.pi/8, maxrot=math.pi/steps)# - math.pi / (steps * 16))
- def start(self):
- self.topic_img = "{}/object_img".format(self.sched.wws_topic_prefix())
- self.topic_pose = "{}/object_pose".format(self.sched.wws_topic_prefix())
- wws_lite.subscribe(self.topic_img, lambda x: self.wws_detect_img_cb(x))
- wws_lite.subscribe(self.topic_pose, lambda x: self.detect_cb_pose(x))
- def clear_action(self):
- self.sched.stop_action(self.action)
- def end(self):
- wws_lite.unsubscribe(self.topic_img)
- wws_lite.unsubscribe(self.topic_pose)
- def detect_cb_pose(self, m):
- if self.simulated:
- if len(m['object']['payload']['objects']['detections']) > 0:
- # print(m['object']['payload']['objects']['detections'])
- self.class_name = m['object']['payload']['objects']['detections'][0]['class_name']
- self.poses.append(m)
- self.get_image = True
- else:
- if len(m['object']['payload']['detections']) > 0:
- self.class_name = m['object']['payload']['detections'][0]['class']['name']
- self.poses.append(m)
- self.get_image = True
- # print(f"object type- {m['object']['payload']['objects']['detections'][0]['class_name']}")
- def wws_detect_img_cb(self, m):
- detect = m['image']['url']
- f = urllib.request.urlopen(detect)
- a = plt.imread(f, 0)
- # print(len(self.images), len(self.poses))
- if self.get_image:
- # print('adding image')
- self.images.append(a)
- self.get_image = False
- def localize_all_objects(self):
- pass
- def control_loop(self):
- self.start()
- count = 0
- for _ in range(1):
- for way in self.waypoints:
- loc = way["coord"]
- self.navigate_to(loc)
- pickle.dump((self.poses, self.images), open(f'/home/ros/spinachbots/jupyter/data_{count}.pickle', 'wb'))
- count += 1
- self.end()
- sched = utils.Scheduler()
- # sched.add_robot('codebox1', utils.bottom_I_close2wall)
- state = State(sched, False)
- # sched.goto(orspy.EulerPose(68.73, 105.66, 0, 0, 0, 0),
- # allowed_trans_error=0.3, allowed_rot_error=0.5)
- state.control_loop()
- print(state.poses)
- print(state.images)
- # for i in range(100):
- # state.sched.stop_action(i)
- # state.delete_state()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement