process_logdata_ekf.py
7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#! /usr/bin/env python3
from __future__ import print_function
import argparse
import os
import sys
import csv
from typing import Dict
from pyulog import ULog
from analyse_logdata_ekf import analyse_ekf
from plotting.pdf_report import create_pdf_report
from analysis.detectors import PreconditionError
"""
Performs a health assessment on the ecl EKF navigation estimator data contained in a an ULog file
Outputs a health assessment summary in a csv file named <inputfilename>.mdat.csv
Outputs summary plots in a pdf file named <inputfilename>.pdf
"""
def get_arguments():
parser = argparse.ArgumentParser(description='Analyse the estimator_status and ekf2_innovation message data')
parser.add_argument('filename', metavar='file.ulg', help='ULog input file')
parser.add_argument('--no-plots', action='store_true',
help='Whether to only analyse and not plot the summaries for developers.')
parser.add_argument('--check-level-thresholds', type=str, default=None,
help='The csv file of fail and warning test thresholds for analysis.')
parser.add_argument('--check-table', type=str, default=None,
help='The csv file with descriptions of the checks.')
parser.add_argument('--no-sensor-safety-margin', action='store_true',
help='Whether to not cut-off 5s after take-off and 5s before landing '
'(for certain sensors that might be influence by proximity to ground).')
return parser.parse_args()
def create_results_table(
check_table_filename: str, master_status: str, check_status: Dict[str, str],
metrics: Dict[str, float], airtime_info: Dict[str, float]) -> Dict[str, list]:
"""
creates the output results table
:param check_table_filename:
:param master_status:
:param check_status:
:param metrics:
:param airtime_info:
:return:
"""
try:
with open(check_table_filename, 'r') as file:
reader = csv.DictReader(file)
test_results_table = {
row['check_id']: [float('NaN'), row['check_description']] for row in reader}
print('Using test description loaded from {:s}'.format(check_table_filename))
except:
raise PreconditionError('could not find {:s}'.format(check_table_filename))
# store metrics
for key, value in metrics.items():
test_results_table[key][0] = value
# store check results
for key, value in check_status.items():
test_results_table[key][0] = value
# store check results
for key, value in test_results_table.items():
if key.endswith('_status'):
test_results_table[key][0] = str(value[0])
# store master status
test_results_table['master_status'][0] = master_status
# store take_off and landing information
test_results_table['in_air_transition_time'][0] = airtime_info['in_air_transition_time']
test_results_table['on_ground_transition_time'][0] = airtime_info['on_ground_transition_time']
return test_results_table
def process_logdata_ekf(
filename: str, check_level_dict_filename: str, check_table_filename: str,
plot: bool = True, sensor_safety_margins: bool = True):
## load the log and extract the necessary data for the analyses
try:
ulog = ULog(filename)
except:
raise PreconditionError('could not open {:s}'.format(filename))
ekf_instances = 1
try:
estimator_selector_status = ulog.get_dataset('estimator_selector_status',).data
print('found estimator_selector_status (multi-ekf) data')
for instances_available in estimator_selector_status['instances_available']:
if instances_available > ekf_instances:
ekf_instances = instances_available
print(ekf_instances, 'ekf instances')
except:
print('could not find estimator_selector_status data')
try:
# get the dictionary of fail and warning test thresholds from a csv file
with open(check_level_dict_filename, 'r') as file:
reader = csv.DictReader(file)
check_levels = {row['check_id']: float(row['threshold']) for row in reader}
print('Using test criteria loaded from {:s}'.format(check_level_dict_filename))
except:
raise PreconditionError('could not find {:s}'.format(check_level_dict_filename))
in_air_margin = 5.0 if sensor_safety_margins else 0.0
for multi_instance in range(ekf_instances):
print('\nestimator instance:', multi_instance)
# perform the ekf analysis
master_status, check_status, metrics, airtime_info = analyse_ekf(
ulog, check_levels, multi_instance, red_thresh=1.0, amb_thresh=0.5, min_flight_duration_seconds=5.0,
in_air_margin_seconds=in_air_margin)
test_results = create_results_table(
check_table_filename, master_status, check_status, metrics, airtime_info)
# write metadata to a .csv file
with open('{:s}-{:d}.mdat.csv'.format(filename, multi_instance), "w") as file:
file.write("name,value,description\n")
# loop through the test results dictionary and write each entry on a separate row, with data comma separated
# save data in alphabetical order
key_list = list(test_results.keys())
key_list.sort()
for key in key_list:
file.write(key + "," + str(test_results[key][0]) + "," + test_results[key][1] + "\n")
print('Test results written to {:s}-{:d}.mdat.csv'.format(filename, multi_instance))
if plot:
create_pdf_report(ulog, multi_instance, '{:s}-{:d}.pdf'.format(filename, multi_instance))
print('Plots saved to {:s}-{:d}.pdf'.format(filename, multi_instance))
return test_results
def main() -> None:
args = get_arguments()
if args.check_level_thresholds is not None:
check_level_dict_filename = args.check_level_thresholds
else:
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
check_level_dict_filename = os.path.join(file_dir, "check_level_dict.csv")
if args.check_table is not None:
check_table_filename = args.check_table
else:
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
check_table_filename = os.path.join(file_dir, "check_table.csv")
try:
test_results = process_logdata_ekf(
args.filename, check_level_dict_filename, check_table_filename,
plot=not args.no_plots, sensor_safety_margins=not args.no_sensor_safety_margin)
except Exception as e:
print(str(e))
sys.exit(-1)
# print master test status to console
if (test_results['master_status'][0] == 'Pass'):
print('No anomalies detected')
elif (test_results['master_status'][0] == 'Warning'):
print('Minor anomalies detected')
elif (test_results['master_status'][0] == 'Fail'):
print('Major anomalies detected')
sys.exit(-1)
if __name__ == '__main__':
main()