Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: UTF-8 -*-
- import olympe
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import time
- import cv2
- import asyncio
- import logging
- from os import system
- from colorama import Fore, Style
- from olympe.messages.ardrone3.Piloting import TakeOff, Landing, Emergency, moveBy
- from olympe.messages.animation import start_flip
- from olympe.messages.ardrone3.PilotingSettingsState import *
- from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
- from olympe.messages.drone_manager import discover_drones
- from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
- from olympe.enums.ardrone3.PilotingState import FlyingStateChanged_State
- from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
- from olympe.messages.ardrone3.PilotingSettings import MaxTilt
- from olympe.messages.wifi import *
- from olympe.messages.battery import health
- from olympe.messages.ardrone3.PilotingState import SpeedChanged, AltitudeChanged, GpsLocationChanged
- from olympe.messages import gimbal
- import olympe_deps as od
- import math
- import json
- class Drone():
- def __init__(self):
- self.drone = olympe.Drone(
- "10.202.0.1", loglevel=2, drone_type=od.ARSDK_DEVICE_TYPE_ANAFI4K)
- self.file = "data.txt"
- self.lock = threading.Lock()
- def main(self):
- try:
- self.drone.connection()
- # self.drone.set_streaming_callbacks(raw_cb=self.yuv_frame_cb,)
- self.drone.start_video_streaming()
- if not self.lock.acquire(False):
- time.sleep(1)
- else:
- print("TAkEOFFF : ACQUIRE")
- try:
- self.drone(
- FlyingStateChanged(state="hovering", _policy="check")
- | (
- GPSFixStateChanged(fixed=1, _timeout=10)
- >> (
- TakeOff(_no_expect=True)
- & FlyingStateChanged(state="hovering", _policy="wait", _timeout=10)
- )
- )
- ).wait()
- finally:
- self.lock.release()
- print("TAkEOFFF : RELEASE")
- self.drone(MaxTilt(40)).wait().success()
- self.drone(gimbal.set_target(
- gimbal_id=0,
- control_mode="position",
- yaw_frame_of_reference="none", # None instead of absolute
- yaw=0.0,
- pitch_frame_of_reference="absolute",
- pitch=-45.0,
- roll_frame_of_reference="none", # None instead of absolute
- roll=0.0,
- )).wait()
- for i in range(3):
- if not self.lock.acquire(False):
- time.sleep(1)
- print("MOVING : NOPE")
- else:
- print("MOVING : ACQUIRE")
- try:
- print("Moving by ({}/3)...".format(i + 1))
- self.drone(moveBy(4, 0, 0, math.pi, _timeout=20)
- ).wait().success()
- finally:
- self.lock.release()
- print("MOVING : RELEASE")
- time.sleep(2)
- except Exception as e:
- print(e)
- self.drone.landing()
- return 1
- def landing(self):
- print("Landing...")
- self.drone(
- Landing()
- >> FlyingStateChanged(state="landed", _timeout=5)
- ).wait()
- print("Landed\n")
- self.drone.disconnection()
- def yuv_frame_cb(self, yuv_frame):
- info = yuv_frame.info()
- height, width = info["yuv"]["height"], info["yuv"]["width"]
- # convert pdraw YUV flag to OpenCV YUV flag
- cv2_cvt_color_flag = {
- olympe.PDRAW_YUV_FORMAT_I420: cv2.COLOR_YUV2BGR_I420,
- olympe.PDRAW_YUV_FORMAT_NV12: cv2.COLOR_YUV2BGR_NV12,
- }[info["yuv"]["format"]]
- cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
- cv2.imshow("live stream", cv2frame)
- cv2.waitKey(1)
- def states(self):
- final_dict = {}
- inc = 0
- dataSpeed = ["speedX", "speedY", "speedZ"]
- dataAttidude = ["roll", "pitch", "yaw"]
- dataGps = ["longitude", "latitude", "altidude"]
- dict_test = {SpeedChanged: dataSpeed,
- AltitudeChanged: dataAttidude, GpsLocationChanged: dataGps, health: ["state_of_health"]}
- if not self.lock.acquire(False):
- time.sleep(1)
- print("STATES: NOPE")
- else:
- try:
- print("STATES : ACQUIRE")
- while True:
- try:
- if self.drone.get_state(SpeedChanged) and self.drone.get_state(AltitudeChanged) and self.drone.get_state(GpsLocationChanged):
- print("initalized!")
- break
- except Exception as e:
- time.sleep(1)
- continue
- while self.drone.get_state(FlyingStateChanged)["state"] is not FlyingStateChanged_State.landing:
- inc += 1
- final_dict["ind"] = inc
- for k in dict_test:
- dictTotransform = self.drone.get_state(k)
- if dictTotransform is not None:
- for y, v in dictTotransform.items():
- final_dict[y] = v
- time.sleep(1)
- # print(final_dict)
- with open(self.file) as f:
- file_str = f.read()
- file_str = json.dumps(final_dict)
- with open(self.file, 'w', encoding='utf-8') as f:
- f.write(file_str)
- print("STATES: WRITED")
- finally:
- print("STATES : RELEASE")
- self.lock.release()
- if __name__ == "__main__":
- drone = Drone()
- with ThreadPoolExecutor(max_workers=2) as executor:
- task1 = executor.submit(drone.main())
- task2 = executor.submit(drone.states())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement