Replay.hpp
9.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/****************************************************************************
*
* Copyright (c) 2016-2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
#pragma once
#include <fstream>
#include <map>
#include <vector>
#include <set>
#include <string>
#include "definitions.hpp"
#include <px4_platform_common/module.h>
#include <uORB/topics/uORBTopics.hpp>
#include <uORB/topics/ekf2_timestamps.h>
namespace px4
{
/**
* @class Replay
* Parses an ULog file and replays it in 'real-time'. The timestamp of each replayed message is offset
* to match the starting time of replay. It keeps a stream for each subscription to find the next message
* to replay. This is necessary because data messages from different subscriptions don't need to be in
* monotonic increasing order.
*/
class Replay : public ModuleBase<Replay>
{
public:
Replay() = default;
virtual ~Replay();
/** @see ModuleBase */
static int task_spawn(int argc, char *argv[]);
/** @see ModuleBase */
static Replay *instantiate(int argc, char *argv[]);
/** @see ModuleBase */
static int custom_command(int argc, char *argv[]);
/** @see ModuleBase */
static int print_usage(const char *reason = nullptr);
/** @see ModuleBase::run() */
void run() override;
/**
* Apply the parameters from the log
* @param quiet do not print an error if true and no log file given via ENV
* @return 0 on success
*/
static int applyParams(bool quiet);
/**
* Tell the replay module that we want to use replay mode.
* After that, only 'replay start' must be executed (typically the last step after startup).
* @param file_name file name of the used log replay file. Will be copied.
*/
static void setupReplayFile(const char *file_name);
static bool isSetup() { return _replay_file; }
protected:
/**
* @class Compatibility base class to convert topics to an updated format
*/
class CompatBase
{
public:
virtual ~CompatBase() = default;
/**
* apply compatibility to a topic
* @param data input topic (can be modified in place)
* @return new topic data
*/
virtual void *apply(void *data) = 0;
};
class CompatSensorCombinedDtType : public CompatBase
{
public:
CompatSensorCombinedDtType(int gyro_integral_dt_offset_log, int gyro_integral_dt_offset_intern,
int accelerometer_integral_dt_offset_log, int accelerometer_integral_dt_offset_intern);
void *apply(void *data) override;
private:
int _gyro_integral_dt_offset_log;
int _gyro_integral_dt_offset_intern;
int _accelerometer_integral_dt_offset_log;
int _accelerometer_integral_dt_offset_intern;
};
struct Subscription {
const orb_metadata *orb_meta = nullptr; ///< if nullptr, this subscription is invalid
orb_advert_t orb_advert = nullptr;
uint8_t multi_id;
int timestamp_offset; ///< marks the field of the timestamp
bool ignored = false; ///< if true, it will not be considered for publication in the main loop
std::streampos next_read_pos;
uint64_t next_timestamp; ///< timestamp of the file
CompatBase *compat = nullptr;
// statistics
int error_counter = 0;
int publication_counter = 0;
};
/**
* Find the offset & field size in bytes for a given field name
* @param format format string, as specified by ULog
* @param field_name search for this field
* @param offset returned offset
* @param field_size returned field size
* @return true if found, false otherwise
*/
static bool findFieldOffset(const std::string &format, const std::string &field_name, int &offset, int &field_size);
/**
* publish an orb topic
* @param sub
* @param data
* @return true if published, false otherwise
*/
bool publishTopic(Subscription &sub, void *data);
/**
* called when entering the main replay loop
*/
virtual void onEnterMainLoop() {}
/**
* called when exiting the main replay loop
*/
virtual void onExitMainLoop() {}
/**
* called when a new subscription is added
*/
virtual void onSubscriptionAdded(Subscription &sub, uint16_t msg_id) {}
/**
* handle delay until topic can be published.
* @param next_file_timestamp timestamp of next message to publish
* @param timestamp_offset offset between file start time and replay start time
* @return timestamp that the message to publish should have
*/
virtual uint64_t handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset);
/**
* handle the publication of a topic update
* @return true if published, false otherwise
*/
virtual bool handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file);
/**
* read a topic from the file (offset given by the subscription) into _read_buffer
*/
void readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file);
/**
* Find next data message for this subscription, starting with the stored file offset.
* Skip the first message, and if found, read the timestamp and store the new file offset.
* This also takes care of new subscriptions and parameter updates. When reaching EOF,
* the subscription is set to invalid.
* File seek position is arbitrary after this call.
* @return false on file error
*/
bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id);
virtual uint64_t getTimestampOffset()
{
//we update the timestamps from the file by a constant offset to match
//the current replay time
return _replay_start_time - _file_start_time;
}
std::vector<Subscription *> _subscriptions;
std::vector<uint8_t> _read_buffer;
float _speed_factor{1.f}; ///< from PX4_SIM_SPEED_FACTOR env variable (set to 0 to avoid usleep = unlimited rate)
private:
std::set<std::string> _overridden_params;
std::map<std::string, std::string> _file_formats; ///< all formats we read from the file
uint64_t _file_start_time;
uint64_t _replay_start_time;
std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message
/** keep track of file position to avoid adding a subscription multiple times. */
std::streampos _subscription_file_pos = 0;
int64_t _read_until_file_position = 1ULL << 60; ///< read limit if log contains appended data
float _accumulated_delay{0.f};
bool readFileHeader(std::ifstream &file);
/**
* Read definitions section: check formats, apply parameters and store
* the start of the data section.
* @return true on success
*/
bool readFileDefinitions(std::ifstream &file);
///file parsing methods. They return false, when further parsing should be aborted.
bool readFormat(std::ifstream &file, uint16_t msg_size);
bool readAndAddSubscription(std::ifstream &file, uint16_t msg_size);
bool readFlagBits(std::ifstream &file, uint16_t msg_size);
/**
* Read the file header and definitions sections. Apply the parameters from this section
* and apply user-defined overridden parameters.
* @return true on success
*/
bool readDefinitionsAndApplyParams(std::ifstream &file);
/**
* Read and handle additional messages starting at current file position, while position < end_position.
* This handles dropout and parameter update messages.
* We need to handle these separately, because they have no timestamp. We look at the file position instead.
* @return false on file error
*/
bool readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position);
bool readDropout(std::ifstream &file, uint16_t msg_size);
bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size);
static const orb_metadata *findTopic(const std::string &name);
/** get the array size from a type. eg. float[3] -> return float */
static std::string extractArraySize(const std::string &type_name_full, int &array_size);
/** get the size of a type that is not an array */
static size_t sizeOfType(const std::string &type_name);
/** get the size of a type that can be an array */
static size_t sizeOfFullType(const std::string &type_name_full);
void setUserParams(const char *filename);
static char *_replay_file;
};
} //namespace px4