readers.py
13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides readers configured for different datasets."""
import tensorflow as tf
import utils
def resize_axis(tensor, axis, new_size, fill_value=0):
"""Truncates or pads a tensor to new_size on on a given axis.
Truncate or extend tensor such that tensor.shape[axis] == new_size. If the
size increases, the padding will be performed at the end, using fill_value.
Args:
tensor: The tensor to be resized.
axis: An integer representing the dimension to be sliced.
new_size: An integer or 0d tensor representing the new value for
tensor.shape[axis].
fill_value: Value to use to fill any new entries in the tensor. Will be cast
to the type of tensor.
Returns:
The resized tensor.
"""
tensor = tf.convert_to_tensor(tensor)
shape = tf.unstack(tf.shape(tensor))
pad_shape = shape[:]
pad_shape[axis] = tf.maximum(0, new_size - shape[axis])
shape[axis] = tf.minimum(shape[axis], new_size)
shape = tf.stack(shape)
resized = tf.concat([
tf.slice(tensor, tf.zeros_like(shape), shape),
tf.fill(tf.stack(pad_shape), tf.cast(fill_value, tensor.dtype))
], axis)
# Update shape.
new_shape = tensor.get_shape().as_list() # A copy is being made.
new_shape[axis] = new_size
resized.set_shape(new_shape)
return resized
class BaseReader(object):
"""Inherit from this class when implementing new readers."""
def prepare_reader(self, unused_filename_queue):
"""Create a thread for generating prediction and label tensors."""
raise NotImplementedError()
class YT8MAggregatedFeatureReader(BaseReader):
"""Reads TFRecords of pre-aggregated Examples.
The TFRecords must contain Examples with a sparse int64 'labels' feature and
a fixed length float32 feature, obtained from the features in 'feature_name'.
The float features are assumed to be an average of dequantized values.
"""
def __init__( # pylint: disable=dangerous-default-value
self,
num_classes=3862,
feature_sizes=[1024, 128],
feature_names=["mean_rgb", "mean_audio"]):
"""Construct a YT8MAggregatedFeatureReader.
Args:
num_classes: a positive integer for the number of classes.
feature_sizes: positive integer(s) for the feature dimensions as a list.
feature_names: the feature name(s) in the tensorflow record as a list.
"""
assert len(feature_names) == len(feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(feature_names), len(feature_sizes)))
self.num_classes = num_classes
self.feature_sizes = feature_sizes
self.feature_names = feature_names
def prepare_reader(self, filename_queue, batch_size=1024):
"""Creates a single reader thread for pre-aggregated YouTube 8M Examples.
Args:
filename_queue: A tensorflow queue of filename locations.
batch_size: batch size used for feature output.
Returns:
A dict of video indexes, features, labels, and frame counts.
"""
reader = tf.TFRecordReader()
_, serialized_examples = reader.read_up_to(filename_queue, batch_size)
tf.add_to_collection("serialized_examples", serialized_examples)
return self.prepare_serialized_examples(serialized_examples)
def prepare_serialized_examples(self, serialized_examples):
"""Parse a single video-level TF Example."""
# set the mapping from the fields to data types in the proto
num_features = len(self.feature_names)
assert num_features > 0, "self.feature_names is empty!"
assert len(self.feature_names) == len(self.feature_sizes), \
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(self.feature_names), len(self.feature_sizes))
feature_map = {
"id": tf.io.FixedLenFeature([], tf.string),
"labels": tf.io.VarLenFeature(tf.int64)
}
for feature_index in range(num_features):
feature_map[self.feature_names[feature_index]] = tf.FixedLenFeature(
[self.feature_sizes[feature_index]], tf.float32)
features = tf.parse_example(serialized_examples, features=feature_map)
labels = tf.sparse_to_indicator(features["labels"], self.num_classes)
labels.set_shape([None, self.num_classes])
concatenated_features = tf.concat(
[features[feature_name] for feature_name in self.feature_names], 1)
output_dict = {
"video_ids": features["id"],
"video_matrix": concatenated_features,
"labels": labels,
"num_frames": tf.ones([tf.shape(serialized_examples)[0]])
}
return output_dict
class YT8MFrameFeatureReader(BaseReader):
"""Reads TFRecords of SequenceExamples.
The TFRecords must contain SequenceExamples with the sparse in64 'labels'
context feature and a fixed length byte-quantized feature vector, obtained
from the features in 'feature_names'. The quantized features will be mapped
back into a range between min_quantized_value and max_quantized_value.
"""
def __init__( # pylint: disable=dangerous-default-value
self,
num_classes=3862,
feature_sizes=[1024, 128],
feature_names=["rgb", "audio"],
max_frames=300,
segment_labels=False,
segment_size=5):
"""Construct a YT8MFrameFeatureReader.
Args:
num_classes: a positive integer for the number of classes.
feature_sizes: positive integer(s) for the feature dimensions as a list.
feature_names: the feature name(s) in the tensorflow record as a list.
max_frames: the maximum number of frames to process.
segment_labels: if we read segment labels instead.
segment_size: the segment_size used for reading segments.
"""
assert len(feature_names) == len(feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(feature_names), len(feature_sizes)))
self.num_classes = num_classes
self.feature_sizes = feature_sizes
self.feature_names = feature_names
self.max_frames = max_frames
self.segment_labels = segment_labels
self.segment_size = segment_size
def get_video_matrix(self, features, feature_size, max_frames,
max_quantized_value, min_quantized_value):
"""Decodes features from an input string and quantizes it.
Args:
features: raw feature values
feature_size: length of each frame feature vector
max_frames: number of frames (rows) in the output feature_matrix
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
feature_matrix: matrix of all frame-features
num_frames: number of frames in the sequence
"""
decoded_features = tf.reshape(
tf.cast(tf.decode_raw(features, tf.uint8), tf.float32),
[-1, feature_size])
num_frames = tf.minimum(tf.shape(decoded_features)[0], max_frames)
feature_matrix = utils.Dequantize(decoded_features, max_quantized_value,
min_quantized_value)
feature_matrix = resize_axis(feature_matrix, 0, max_frames)
return feature_matrix, num_frames
def prepare_reader(self,
filename_queue,
max_quantized_value=2,
min_quantized_value=-2):
"""Creates a single reader thread for YouTube8M SequenceExamples.
Args:
filename_queue: A tensorflow queue of filename locations.
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
A dict of video indexes, video features, labels, and frame counts.
"""
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
return self.prepare_serialized_examples(serialized_example,
max_quantized_value,
min_quantized_value)
def prepare_serialized_examples(self,
serialized_example,
max_quantized_value=2,
min_quantized_value=-2):
"""Parse single serialized SequenceExample from the TFRecords."""
# Read/parse frame/segment-level labels.
context_features = {
"id": tf.io.FixedLenFeature([], tf.string),
}
if self.segment_labels:
context_features.update({
# There is no need to read end-time given we always assume the segment
# has the same size.
"segment_labels": tf.io.VarLenFeature(tf.int64),
"segment_start_times": tf.io.VarLenFeature(tf.int64),
"segment_scores": tf.io.VarLenFeature(tf.float32)
})
else:
context_features.update({"labels": tf.io.VarLenFeature(tf.int64)})
sequence_features = {
feature_name: tf.io.FixedLenSequenceFeature([], dtype=tf.string)
for feature_name in self.feature_names
}
contexts, features = tf.io.parse_single_sequence_example(
serialized_example,
context_features=context_features,
sequence_features=sequence_features)
# loads (potentially) different types of features and concatenates them
num_features = len(self.feature_names)
assert num_features > 0, "No feature selected: feature_names is empty!"
assert len(self.feature_names) == len(self.feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(self.feature_names), len(self.feature_sizes)))
num_frames = -1 # the number of frames in the video
feature_matrices = [None] * num_features # an array of different features
for feature_index in range(num_features):
feature_matrix, num_frames_in_this_feature = self.get_video_matrix(
features[self.feature_names[feature_index]],
self.feature_sizes[feature_index], self.max_frames,
max_quantized_value, min_quantized_value)
if num_frames == -1:
num_frames = num_frames_in_this_feature
feature_matrices[feature_index] = feature_matrix
# cap the number of frames at self.max_frames
num_frames = tf.minimum(num_frames, self.max_frames)
# concatenate different features
video_matrix = tf.concat(feature_matrices, 1)
# Partition frame-level feature matrix to segment-level feature matrix.
if self.segment_labels:
start_times = contexts["segment_start_times"].values
# Here we assume all the segments that started at the same start time has
# the same segment_size.
uniq_start_times, seg_idxs = tf.unique(start_times,
out_idx=tf.dtypes.int64)
# TODO(zhengxu): Ensure the segment_sizes are all same.
segment_size = self.segment_size
# Range gather matrix, e.g., [[0,1,2],[1,2,3]] for segment_size == 3.
range_mtx = tf.expand_dims(uniq_start_times, axis=-1) + tf.expand_dims(
tf.range(0, segment_size, dtype=tf.int64), axis=0)
# Shape: [num_segment, segment_size, feature_dim].
batch_video_matrix = tf.gather_nd(video_matrix,
tf.expand_dims(range_mtx, axis=-1))
num_segment = tf.shape(batch_video_matrix)[0]
batch_video_ids = tf.reshape(tf.tile([contexts["id"]], [num_segment]),
(num_segment,))
batch_frames = tf.reshape(tf.tile([segment_size], [num_segment]),
(num_segment,))
# For segment labels, all labels are not exhausively rated. So we only
# evaluate the rated labels.
# Label indices for each segment, shape: [num_segment, 2].
label_indices = tf.stack([seg_idxs, contexts["segment_labels"].values],
axis=-1)
label_values = contexts["segment_scores"].values
sparse_labels = tf.sparse.SparseTensor(label_indices, label_values,
(num_segment, self.num_classes))
batch_labels = tf.sparse.to_dense(sparse_labels, validate_indices=False)
sparse_label_weights = tf.sparse.SparseTensor(
label_indices, tf.ones_like(label_values, dtype=tf.float32),
(num_segment, self.num_classes))
batch_label_weights = tf.sparse.to_dense(sparse_label_weights,
validate_indices=False)
else:
# Process video-level labels.
label_indices = contexts["labels"].values
sparse_labels = tf.sparse.SparseTensor(
tf.expand_dims(label_indices, axis=-1),
tf.ones_like(contexts["labels"].values, dtype=tf.bool),
(self.num_classes,))
labels = tf.sparse.to_dense(sparse_labels,
default_value=False,
validate_indices=False)
# convert to batch format.
batch_video_ids = tf.expand_dims(contexts["id"], 0)
batch_video_matrix = tf.expand_dims(video_matrix, 0)
batch_labels = tf.expand_dims(labels, 0)
batch_frames = tf.expand_dims(num_frames, 0)
batch_label_weights = None
output_dict = {
"video_ids": batch_video_ids,
"video_matrix": batch_video_matrix,
"labels": batch_labels,
"num_frames": batch_frames,
}
if batch_label_weights is not None:
output_dict["label_weights"] = batch_label_weights
return output_dict