detectors.py
6.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#! /usr/bin/env python3
"""
detectors
"""
from typing import Optional
import numpy as np
from pyulog import ULog
class PreconditionError(Exception):
"""
a class for a Precondition Error
"""
class Airtime(object):
"""
Airtime struct.
"""
def __init__(self, take_off: float, landing: float):
self.take_off = take_off
self.landing = landing
class InAirDetector(object):
"""
handles airtime detection.
"""
def __init__(self, ulog: ULog, min_flight_time_seconds: float = 0.0,
in_air_margin_seconds: float = 0.0) -> None:
"""
initializes an InAirDetector instance.
:param ulog:
:param min_flight_time_seconds: set this value to return only airtimes that are at least
min_flight_time_seconds long
:param in_air_margin_seconds: removes a margin of in_air_margin_seconds from the airtime
to avoid ground effects.
"""
self._ulog = ulog
self._min_flight_time_seconds = min_flight_time_seconds
self._in_air_margin_seconds = in_air_margin_seconds
try:
self._vehicle_land_detected = ulog.get_dataset('vehicle_land_detected').data
self._landed = self._vehicle_land_detected['landed']
except:
self._in_air = []
raise PreconditionError(
'InAirDetector: Could not find vehicle land detected message and/or landed field'
' and thus not find any airtime.')
self._log_start = self._ulog.start_timestamp / 1.0e6
self._in_air = self._detect_airtime()
def _detect_airtime(self) -> Optional[Airtime]:
"""
detects the airtime take_off and landing of a ulog.
:return: a named tuple of ('Airtime', ['take_off', 'landing']) or None.
"""
# test whether flight was in air at all
if (self._landed > 0).all():
print('InAirDetector: always on ground.')
return []
# find the indices of all take offs and landings
take_offs = np.where(np.diff(self._landed) < 0)[0].tolist()
landings = np.where(np.diff(self._landed) > 0)[0].tolist()
# check for start in air.
if len(take_offs) == 0 or ((len(landings) > 0) and (landings[0] < take_offs[0])):
print('Started in air. Take first timestamp value as start point.')
take_offs = [-1] + take_offs
# correct for offset: add 1 to take_off list
take_offs = [take_off + 1 for take_off in take_offs]
if len(landings) < len(take_offs):
print('No final landing detected. Assume last timestamp is landing.')
landings += [len(self._landed) - 2]
# correct for offset: add 1 to landing list
landings = [landing + 1 for landing in landings]
assert len(landings) == len(take_offs), 'InAirDetector: different number of take offs' \
' and landings.'
in_air = []
for take_off, landing in zip(take_offs, landings):
if (self._vehicle_land_detected['timestamp'][landing] / 1e6 -
self._in_air_margin_seconds) - \
(self._vehicle_land_detected['timestamp'][take_off] / 1e6 +
self._in_air_margin_seconds) >= self._min_flight_time_seconds:
in_air.append(Airtime(
take_off=(self._vehicle_land_detected['timestamp'][take_off] -
self._ulog.start_timestamp) / 1.0e6 + self._in_air_margin_seconds,
landing=(self._vehicle_land_detected['timestamp'][landing] -
self._ulog.start_timestamp) / 1.0e6 - self._in_air_margin_seconds))
if len(in_air) == 0:
print('InAirDetector: no airtime detected.')
return in_air
@property
def airtimes(self):
"""
airtimes
:return:
"""
return self._in_air
@property
def take_off(self) -> Optional[float]:
"""
first take off
:return:
"""
return self._in_air[0].take_off if self._in_air else None
@property
def landing(self) -> Optional[float]:
"""
last landing
:return: the last landing of the flight.
"""
return self._in_air[-1].landing if self._in_air else None
@property
def log_start(self) -> Optional[float]:
"""
log start
:return: the start time of the log.
"""
return self._log_start
def get_take_off_to_last_landing(self, dataset) -> list:
"""
return all indices of the log file between the first take_off and the
last landing.
:param dataset:
:return:
"""
try:
data = self._ulog.get_dataset(dataset).data
except:
print('InAirDetector: {:s} not found in log.'.format(dataset))
return []
if self._in_air:
airtime = np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
self._in_air[0].take_off) & (
(data['timestamp'] - self._ulog.start_timestamp) /
1.0e6 < self._in_air[-1].landing))[0]
else:
airtime = []
return airtime
def get_airtime(self, dataset) -> list:
"""
return all indices of the log file that are in air
:param dataset:
:return:
"""
try:
data = self._ulog.get_dataset(dataset).data
except:
raise PreconditionError('InAirDetector: {:s} not found in log.'.format(dataset))
airtime = []
if self._in_air is not None:
for i in range(len(self._in_air)):
airtime.extend(np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
self._in_air[i].take_off) & (
(data['timestamp'] - self._ulog.start_timestamp) /
1.0e6 < self._in_air[i].landing))[0])
return airtime