run_tests.py
5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#! /usr/bin/env python3
import serial, time
import subprocess
from subprocess import call, Popen
from argparse import ArgumentParser
import re
import unittest
import os
import sys
def do_test(port, baudrate, test_name):
databits = serial.EIGHTBITS
stopbits = serial.STOPBITS_ONE
parity = serial.PARITY_NONE
ser = serial.Serial(port, baudrate, databits, parity, stopbits, timeout=1)
success = False
# run test cmd
print('\n|======================================================================')
cmd = 'tests ' + test_name
print("| Running:", cmd)
print('|======================================================================')
timeout_start = time.time()
timeout = 10 # 10 seconds
# clear
ser.write("\n".encode("ascii"))
ser.flush()
ser.readline()
serial_cmd = '{0}\n'.format(cmd)
ser.write(serial_cmd.encode("ascii"))
ser.flush()
ser.readline()
# TODO: retry command
# while True:
# serial_cmd = '{0}\n'.format(cmd)
# ser.write(serial_cmd.encode("ascii"))
# ser.flush()
# serial_line = ser.readline().decode("ascii", errors='ignore')
# if cmd in serial_line:
# break
# else:
# print(serial_line.replace('\n', ''))
# if time.time() > timeout_start + timeout:
# print("Error, unable to write cmd")
# return False
# time.sleep(1)
# print results, wait for final result (PASSED or FAILED)
timeout = 180 # 3 minutes
timeout_start = time.time()
timeout_newline = timeout_start
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if (len(serial_line) > 0):
print(serial_line, end='')
if test_name + " PASSED" in serial_line:
success = True
break
elif test_name + " FAILED" in serial_line:
success = False
break
if time.time() > timeout_start + timeout:
print("Error, timeout")
print(test_name + " FAILED")
success = False
break
# newline every 10 seconds if still running
if time.time() - timeout_newline > 10:
ser.write("\n".encode("ascii"))
timeout_newline = time.time()
ser.close()
return success
class TestHardwareMethods(unittest.TestCase):
TEST_DEVICE = 0
TEST_BAUDRATE = 0
def test_atomic_bitset(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "atomic_bitset"))
def test_bezier(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "bezier"))
def test_bitset(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "bitset"))
def test_bson(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "bson"))
# def test_dataman(self):
# self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "dataman"))
def floattest_float(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "float"))
def test_hrt(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "hrt"))
def test_IntrusiveQueue(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "IntrusiveQueue"))
def test_IntrusiveSortedList(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "IntrusiveSortedList"))
def test_List(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "List"))
def test_mathlib(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "mathlib"))
def test_matrix(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "matrix"))
def test_microbench_atomic(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "microbench_atomic"))
def test_microbench_hrt(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "microbench_hrt"))
def test_microbench_math(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "microbench_math"))
def test_microbench_matrix(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "microbench_matrix"))
def test_microbench_uorb(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "microbench_uorb"))
# def test_mixer(self):
# self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "mixer"))
def test_param(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "param"))
def test_parameters(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "parameters"))
def test_perf(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "perf"))
# def test_rc(self):
# self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "rc"))
def test_search_min(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "search_min"))
def test_sleep(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "sleep"))
def test_time(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "time"))
def test_versioning(self):
self.assertTrue(do_test(self.TEST_DEVICE, self.TEST_BAUDRATE, "versioning"))
def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument('--device', "-d", nargs='?', default=None, help='', required=True)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600)
args = parser.parse_args()
TestHardwareMethods.TEST_DEVICE = args.device
TestHardwareMethods.TEST_BAUDRATE = args.baudrate
unittest.main(__name__, failfast=True, verbosity=0, argv=['main'])
if __name__ == "__main__":
main()