detectors.py 6.08 KB
#! /usr/bin/env python3
"""
detectors
"""
from typing import Optional
import numpy as np
from pyulog import ULog


class PreconditionError(Exception):
    """
    a class for a Precondition Error
    """


class Airtime(object):
    """
    Airtime struct.
    """
    def __init__(self, take_off: float, landing: float):
        self.take_off = take_off
        self.landing = landing


class InAirDetector(object):
    """
    handles airtime detection.
    """

    def __init__(self, ulog: ULog, min_flight_time_seconds: float = 0.0,
                 in_air_margin_seconds: float = 0.0) -> None:
        """
        initializes an InAirDetector instance.
        :param ulog:
        :param min_flight_time_seconds: set this value to return only airtimes that are at least
        min_flight_time_seconds long
        :param in_air_margin_seconds: removes a margin of in_air_margin_seconds from the airtime
        to avoid ground effects.
        """

        self._ulog = ulog
        self._min_flight_time_seconds = min_flight_time_seconds
        self._in_air_margin_seconds = in_air_margin_seconds

        try:
            self._vehicle_land_detected = ulog.get_dataset('vehicle_land_detected').data
            self._landed = self._vehicle_land_detected['landed']
        except:
            self._in_air = []
            raise PreconditionError(
                'InAirDetector: Could not find vehicle land detected message and/or landed field'
                ' and thus not find any airtime.')

        self._log_start = self._ulog.start_timestamp / 1.0e6

        self._in_air = self._detect_airtime()


    def _detect_airtime(self) -> Optional[Airtime]:
        """
        detects the airtime take_off and landing of a ulog.
        :return: a named tuple of ('Airtime', ['take_off', 'landing']) or None.
        """

        # test whether flight was in air at all
        if (self._landed > 0).all():
            print('InAirDetector: always on ground.')
            return []

        # find the indices of all take offs and landings
        take_offs = np.where(np.diff(self._landed) < 0)[0].tolist()
        landings = np.where(np.diff(self._landed) > 0)[0].tolist()

        # check for start in air.
        if len(take_offs) == 0 or ((len(landings) > 0) and (landings[0] < take_offs[0])):

            print('Started in air. Take first timestamp value as start point.')
            take_offs = [-1] + take_offs

        # correct for offset: add 1 to take_off list
        take_offs = [take_off + 1 for take_off in take_offs]
        if len(landings) < len(take_offs):
            print('No final landing detected. Assume last timestamp is landing.')
            landings += [len(self._landed) - 2]
        # correct for offset: add 1 to landing list
        landings = [landing + 1 for landing in landings]

        assert len(landings) == len(take_offs), 'InAirDetector: different number of take offs' \
                                                ' and landings.'

        in_air = []
        for take_off, landing in zip(take_offs, landings):
            if (self._vehicle_land_detected['timestamp'][landing] / 1e6 -
                    self._in_air_margin_seconds) - \
                    (self._vehicle_land_detected['timestamp'][take_off] / 1e6 +
                     self._in_air_margin_seconds) >= self._min_flight_time_seconds:
                in_air.append(Airtime(
                    take_off=(self._vehicle_land_detected['timestamp'][take_off] -
                              self._ulog.start_timestamp) / 1.0e6 + self._in_air_margin_seconds,
                    landing=(self._vehicle_land_detected['timestamp'][landing] -
                             self._ulog.start_timestamp) / 1.0e6 - self._in_air_margin_seconds))
        if len(in_air) == 0:
            print('InAirDetector: no airtime detected.')

        return in_air

    @property
    def airtimes(self):
        """
        airtimes
        :return:
        """
        return self._in_air

    @property
    def take_off(self) -> Optional[float]:
        """
        first take off
        :return:
        """
        return self._in_air[0].take_off if self._in_air else None

    @property
    def landing(self) -> Optional[float]:
        """
        last landing
        :return: the last landing of the flight.
        """
        return self._in_air[-1].landing if self._in_air else None

    @property
    def log_start(self) -> Optional[float]:
        """
        log start
        :return: the start time of the log.
        """
        return self._log_start

    def get_take_off_to_last_landing(self, dataset) -> list:
        """
        return all indices of the log file between the first take_off and the
        last landing.
        :param dataset:
        :return:
        """
        try:
            data = self._ulog.get_dataset(dataset).data
        except:
            print('InAirDetector: {:s} not found in log.'.format(dataset))
            return []

        if self._in_air:
            airtime = np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
                                self._in_air[0].take_off) & (
                                    (data['timestamp'] - self._ulog.start_timestamp) /
                                    1.0e6 < self._in_air[-1].landing))[0]

        else:
            airtime = []

        return airtime

    def get_airtime(self, dataset) -> list:
        """
        return all indices of the log file that are in air
        :param dataset:
        :return:
        """
        try:
            data = self._ulog.get_dataset(dataset).data
        except:
            raise PreconditionError('InAirDetector: {:s} not found in log.'.format(dataset))

        airtime = []
        if self._in_air is not None:
            for i in range(len(self._in_air)):
                airtime.extend(np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
                                         self._in_air[i].take_off) & (
                                             (data['timestamp'] - self._ulog.start_timestamp) /
                                             1.0e6 < self._in_air[i].landing))[0])

        return airtime