이현규

Convert submodule to files

youtube-8m @ e6f6bf68
Subproject commit e6f6bf682d20bb21904ea9c081c15e070809d914
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calculate or keep track of the interpolated average precision.
It provides an interface for calculating interpolated average precision for an
entire list or the top-n ranked items. For the definition of the
(non-)interpolated average precision:
http://trec.nist.gov/pubs/trec15/appendices/CE.MEASURES06.pdf
Example usages:
1) Use it as a static function call to directly calculate average precision for
a short ranked list in the memory.
```
import random
p = np.array([random.random() for _ in xrange(10)])
a = np.array([random.choice([0, 1]) for _ in xrange(10)])
ap = average_precision_calculator.AveragePrecisionCalculator.ap(p, a)
```
2) Use it as an object for long ranked list that cannot be stored in memory or
the case where partial predictions can be observed at a time (Tensorflow
predictions). In this case, we first call the function accumulate many times
to process parts of the ranked list. After processing all the parts, we call
peek_interpolated_ap_at_n.
```
p1 = np.array([random.random() for _ in xrange(5)])
a1 = np.array([random.choice([0, 1]) for _ in xrange(5)])
p2 = np.array([random.random() for _ in xrange(5)])
a2 = np.array([random.choice([0, 1]) for _ in xrange(5)])
# interpolated average precision at 10 using 1000 break points
calculator = average_precision_calculator.AveragePrecisionCalculator(10)
calculator.accumulate(p1, a1)
calculator.accumulate(p2, a2)
ap3 = calculator.peek_ap_at_n()
```
"""
import heapq
import random
import numbers
import numpy
class AveragePrecisionCalculator(object):
"""Calculate the average precision and average precision at n."""
def __init__(self, top_n=None):
"""Construct an AveragePrecisionCalculator to calculate average precision.
This class is used to calculate the average precision for a single label.
Args:
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when the top_n is not a positive integer.
"""
if not ((isinstance(top_n, int) and top_n >= 0) or top_n is None):
raise ValueError("top_n must be a positive integer or None.")
self._top_n = top_n # average precision at n
self._total_positives = 0 # total number of positives have seen
self._heap = [] # max heap of (prediction, actual)
@property
def heap_size(self):
"""Gets the heap size maintained in the class."""
return len(self._heap)
@property
def num_accumulated_positives(self):
"""Gets the number of positive samples that have been accumulated."""
return self._total_positives
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value larger than 0
will be treated as positives, otherwise as negatives. num_positives = If
the 'predictions' and 'actuals' inputs aren't complete, then it's
possible some true positives were missed in them. In that case, you can
provide 'num_positives' in order to accurately track recall.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if num_positives is not None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError(
"'num_positives' was provided but it was a negative number.")
if num_positives is not None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(
numpy.where(numpy.array(actuals) > 1e-5))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def clear(self):
"""Clear the accumulated predictions."""
self._heap = []
self._total_positives = 0
def peek_ap_at_n(self):
"""Peek the non-interpolated average precision at n.
Returns:
The non-interpolated average precision at n (default 0).
If n is larger than the length of the ranked list,
the average precision will be returned.
"""
if self.heap_size <= 0:
return 0
predlists = numpy.array(list(zip(*self._heap)))
ap = self.ap_at_n(predlists[0],
predlists[1],
n=self._top_n,
total_num_positives=self._total_positives)
return ap
@staticmethod
def ap(predictions, actuals):
"""Calculate the non-interpolated average precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
actuals: a numpy 1-D array storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
Returns:
The non-interpolated average precision at n.
If n is larger than the length of the ranked list,
the average precision will be returned.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
return AveragePrecisionCalculator.ap_at_n(predictions, actuals, n=None)
@staticmethod
def ap_at_n(predictions, actuals, n=20, total_num_positives=None):
"""Calculate the non-interpolated average precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
actuals: a numpy 1-D array storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
n: the top n items to be considered in ap@n.
total_num_positives : (optionally) you can specify the number of total
positive in the list. If specified, it will be used in calculation.
Returns:
The non-interpolated average precision at n.
If n is larger than the length of the ranked list,
the average precision will be returned.
Raises:
ValueError: An error occurred when
1) the format of the input is not the numpy 1-D array;
2) the shape of predictions and actuals does not match;
3) the input n is not a positive integer.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if n is not None:
if not isinstance(n, int) or n <= 0:
raise ValueError("n must be 'None' or a positive integer."
" It was '%s'." % n)
ap = 0.0
predictions = numpy.array(predictions)
actuals = numpy.array(actuals)
# add a shuffler to avoid overestimating the ap
predictions, actuals = AveragePrecisionCalculator._shuffle(
predictions, actuals)
sortidx = sorted(range(len(predictions)),
key=lambda k: predictions[k],
reverse=True)
if total_num_positives is None:
numpos = numpy.size(numpy.where(actuals > 0))
else:
numpos = total_num_positives
if numpos == 0:
return 0
if n is not None:
numpos = min(numpos, n)
delta_recall = 1.0 / numpos
poscount = 0.0
# calculate the ap
r = len(sortidx)
if n is not None:
r = min(r, n)
for i in range(r):
if actuals[sortidx[i]] > 0:
poscount += 1
ap += poscount / (i + 1) * delta_recall
return ap
@staticmethod
def _shuffle(predictions, actuals):
random.seed(0)
suffidx = random.sample(range(len(predictions)), len(predictions))
predictions = predictions[suffidx]
actuals = actuals[suffidx]
return predictions, actuals
@staticmethod
def _zero_one_normalize(predictions, epsilon=1e-7):
"""Normalize the predictions to the range between 0.0 and 1.0.
For some predictions like SVM predictions, we need to normalize them before
calculate the interpolated average precision. The normalization will not
change the rank in the original list and thus won't change the average
precision.
Args:
predictions: a numpy 1-D array storing the sparse prediction scores.
epsilon: a small constant to avoid denominator being zero.
Returns:
The normalized prediction.
"""
denominator = numpy.max(predictions) - numpy.min(predictions)
ret = (predictions - numpy.min(predictions)) / numpy.max(
denominator, epsilon)
return ret
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility to convert the output of batch prediction into a CSV submission.
It converts the JSON files created by the command
'gcloud beta ml jobs submit prediction' into a CSV file ready for submission.
"""
import json
import tensorflow as tf
from builtins import range
from tensorflow import app
from tensorflow import flags
from tensorflow import gfile
from tensorflow import logging
FLAGS = flags.FLAGS
if __name__ == "__main__":
flags.DEFINE_string(
"json_prediction_files_pattern", None,
"Pattern specifying the list of JSON files that the command "
"'gcloud beta ml jobs submit prediction' outputs. These files are "
"located in the output path of the prediction command and are prefixed "
"with 'prediction.results'.")
flags.DEFINE_string(
"csv_output_file", None,
"The file to save the predictions converted to the CSV format.")
def get_csv_header():
return "VideoId,LabelConfidencePairs\n"
def to_csv_row(json_data):
video_id = json_data["video_id"]
class_indexes = json_data["class_indexes"]
predictions = json_data["predictions"]
if isinstance(video_id, list):
video_id = video_id[0]
class_indexes = class_indexes[0]
predictions = predictions[0]
if len(class_indexes) != len(predictions):
raise ValueError(
"The number of indexes (%s) and predictions (%s) must be equal." %
(len(class_indexes), len(predictions)))
return (video_id.decode("utf-8") + "," +
" ".join("%i %f" % (class_indexes[i], predictions[i])
for i in range(len(class_indexes))) + "\n")
def main(unused_argv):
logging.set_verbosity(tf.logging.INFO)
if not FLAGS.json_prediction_files_pattern:
raise ValueError(
"The flag --json_prediction_files_pattern must be specified.")
if not FLAGS.csv_output_file:
raise ValueError("The flag --csv_output_file must be specified.")
logging.info("Looking for prediction files with pattern: %s",
FLAGS.json_prediction_files_pattern)
file_paths = gfile.Glob(FLAGS.json_prediction_files_pattern)
logging.info("Found files: %s", file_paths)
logging.info("Writing submission file to: %s", FLAGS.csv_output_file)
with gfile.Open(FLAGS.csv_output_file, "w+") as output_file:
output_file.write(get_csv_header())
for file_path in file_paths:
logging.info("processing file: %s", file_path)
with gfile.Open(file_path) as input_file:
for line in input_file:
json_data = json.loads(line)
output_file.write(to_csv_row(json_data))
output_file.flush()
logging.info("done")
if __name__ == "__main__":
app.run()
No preview for this file type
import numpy as np
import tensorflow as tf
from tensorflow import logging
from tensorflow import gfile
import esot3ria.pbutil as pbutil
def get_segments(batch_video_mtx, batch_num_frames, segment_size):
"""Get segment-level inputs from frame-level features."""
video_batch_size = batch_video_mtx.shape[0]
max_frame = batch_video_mtx.shape[1]
feature_dim = batch_video_mtx.shape[-1]
padded_segment_sizes = (batch_num_frames + segment_size - 1) // segment_size
padded_segment_sizes *= segment_size
segment_mask = (
0 < (padded_segment_sizes[:, np.newaxis] - np.arange(0, max_frame)))
# Segment bags.
frame_bags = batch_video_mtx.reshape((-1, feature_dim))
segment_frames = frame_bags[segment_mask.reshape(-1)].reshape(
(-1, segment_size, feature_dim))
# Segment num frames.
segment_start_times = np.arange(0, max_frame, segment_size)
num_segments = batch_num_frames[:, np.newaxis] - segment_start_times
num_segment_bags = num_segments.reshape((-1))
valid_segment_mask = num_segment_bags > 0
segment_num_frames = num_segment_bags[valid_segment_mask]
segment_num_frames[segment_num_frames > segment_size] = segment_size
max_segment_num = (max_frame + segment_size - 1) // segment_size
video_idxs = np.tile(
np.arange(0, video_batch_size)[:, np.newaxis], [1, max_segment_num])
segment_idxs = np.tile(segment_start_times, [video_batch_size, 1])
idx_bags = np.stack([video_idxs, segment_idxs], axis=-1).reshape((-1, 2))
video_segment_ids = idx_bags[valid_segment_mask]
return {
"video_batch": segment_frames,
"num_frames_batch": segment_num_frames,
"video_segment_ids": video_segment_ids
}
def format_prediction(video_ids, predictions, top_k, whitelisted_cls_mask=None):
batch_size = len(video_ids)
for video_index in range(batch_size):
video_prediction = predictions[video_index]
if whitelisted_cls_mask is not None:
# Whitelist classes.
video_prediction *= whitelisted_cls_mask
top_indices = np.argpartition(video_prediction, -top_k)[-top_k:]
line = [(class_index, predictions[video_index][class_index])
for class_index in top_indices]
line = sorted(line, key=lambda p: -p[1])
return (video_ids[video_index] + "," +
" ".join("%i %g" % (label, score) for (label, score) in line) +
"\n").encode("utf8")
def inference_pb(filename):
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True)) as sess:
# 200527 Esot3riA
# 0. Import SequenceExample type target from pb.
target_video = pbutil.convert_pb(filename)
# 1. Load video features from pb.
video_id_batch_val = np.array([b'video'])
n_frames = len(target_video.feature_lists.feature_list['rgb'].feature)
# Restrict frame size to 300
if n_frames > 300:
n_frames = 300
video_batch_val = np.zeros((300, 1152))
for i in range(n_frames):
video_batch_rgb_raw = target_video.feature_lists.feature_list['rgb'].feature[i].bytes_list.value[0]
video_batch_rgb = np.array(tf.cast(tf.decode_raw(video_batch_rgb_raw, tf.float32), tf.float32).eval())
video_batch_audio_raw = target_video.feature_lists.feature_list['audio'].feature[i].bytes_list.value[0]
video_batch_audio = np.array(tf.cast(tf.decode_raw(video_batch_audio_raw, tf.float32), tf.float32).eval())
video_batch_val[i] = np.concatenate([video_batch_rgb, video_batch_audio], axis=0)
video_batch_val = np.array([video_batch_val])
num_frames_batch_val = np.array([n_frames])
# 200527 Esot3riA End
# Restore checkpoint and meta-graph file
checkpoint_file = '/Users/esot3ria/PycharmProjects/yt8m/models/frame' \
'/sample_model/inference_model/segment_inference_model'
if not gfile.Exists(checkpoint_file + ".meta"):
raise IOError("Cannot find %s. Did you run eval.py?" % checkpoint_file)
meta_graph_location = checkpoint_file + ".meta"
logging.info("loading meta-graph: " + meta_graph_location)
with tf.device("/cpu:0"):
saver = tf.train.import_meta_graph(meta_graph_location,
clear_devices=True)
logging.info("restoring variables from " + checkpoint_file)
saver.restore(sess, checkpoint_file)
input_tensor = tf.get_collection("input_batch_raw")[0]
num_frames_tensor = tf.get_collection("num_frames")[0]
predictions_tensor = tf.get_collection("predictions")[0]
# Workaround for num_epochs issue.
def set_up_init_ops(variables):
init_op_list = []
for variable in list(variables):
if "train_input" in variable.name:
init_op_list.append(tf.assign(variable, 1))
variables.remove(variable)
init_op_list.append(tf.variables_initializer(variables))
return init_op_list
sess.run(
set_up_init_ops(tf.get_collection_ref(tf.GraphKeys.LOCAL_VARIABLES)))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
whitelisted_cls_mask = np.zeros((predictions_tensor.get_shape()[-1],),
dtype=np.float32)
segment_label_ids_file = '../segment_label_ids.csv'
with tf.io.gfile.GFile(segment_label_ids_file) as fobj:
for line in fobj:
try:
cls_id = int(line)
whitelisted_cls_mask[cls_id] = 1.
except ValueError:
# Simply skip the non-integer line.
continue
# 200527 Esot3riA
# 2. Make segment features.
results = get_segments(video_batch_val, num_frames_batch_val, 5)
video_segment_ids = results["video_segment_ids"]
video_id_batch_val = video_id_batch_val[video_segment_ids[:, 0]]
video_id_batch_val = np.array([
"%s:%d" % (x.decode("utf8"), y)
for x, y in zip(video_id_batch_val, video_segment_ids[:, 1])
])
video_batch_val = results["video_batch"]
num_frames_batch_val = results["num_frames_batch"]
if input_tensor.get_shape()[1] != video_batch_val.shape[1]:
raise ValueError("max_frames mismatch. Please re-run the eval.py "
"with correct segment_labels settings.")
predictions_val, = sess.run([predictions_tensor],
feed_dict={
input_tensor: video_batch_val,
num_frames_tensor: num_frames_batch_val
})
logging.info(predictions_val)
logging.info("profit :D")
# result = format_prediction(video_id_batch_val, predictions_val, 10, whitelisted_cls_mask)
if __name__ == '__main__':
logging.set_verbosity(tf.logging.INFO)
filename = 'features.pb'
inference_pb(filename)
import tensorflow as tf
import numpy
def _make_bytes(int_array):
if bytes == str: # Python2
return ''.join(map(chr, int_array))
else:
return bytes(int_array)
def quantize(features, min_quantized_value=-2.0, max_quantized_value=2.0):
"""Quantizes float32 `features` into string."""
assert features.dtype == 'float32'
assert len(features.shape) == 1 # 1-D array
features = numpy.clip(features, min_quantized_value, max_quantized_value)
quantize_range = max_quantized_value - min_quantized_value
features = (features - min_quantized_value) * (255.0 / quantize_range)
features = [int(round(f)) for f in features]
return _make_bytes(features)
# for parse feature.pb
contexts = {
'AUDIO/feature/dimensions': tf.io.FixedLenFeature([], tf.int64),
'AUDIO/feature/rate': tf.io.FixedLenFeature([], tf.float32),
'RGB/feature/dimensions': tf.io.FixedLenFeature([], tf.int64),
'RGB/feature/rate': tf.io.FixedLenFeature([], tf.float32),
'clip/data_path': tf.io.FixedLenFeature([], tf.string),
'clip/end/timestamp': tf.io.FixedLenFeature([], tf.int64),
'clip/start/timestamp': tf.io.FixedLenFeature([], tf.int64)
}
features = {
'AUDIO/feature/floats': tf.io.VarLenFeature(dtype=tf.float32),
'AUDIO/feature/timestamp': tf.io.VarLenFeature(tf.int64),
'RGB/feature/floats': tf.io.VarLenFeature(dtype=tf.float32),
'RGB/feature/timestamp': tf.io.VarLenFeature(tf.int64)
}
def parse_exmp(serial_exmp):
_, sequence_parsed = tf.io.parse_single_sequence_example(
serialized=serial_exmp,
context_features=contexts,
sequence_features=features)
sequence_parsed = tf.contrib.learn.run_n(sequence_parsed)[0]
audio = sequence_parsed['AUDIO/feature/floats'].values
rgb = sequence_parsed['RGB/feature/floats'].values
# print(audio.values)
# print(type(audio.values))
# audio is 128 8bit, rgb is 1024 8bit for every second
audio_slices = [audio[128 * i: 128 * (i + 1)] for i in range(len(audio) // 128)]
rgb_slices = [rgb[1024 * i: 1024 * (i + 1)] for i in range(len(rgb) // 1024)]
byte_audio = []
byte_rgb = []
for seg in audio_slices:
# audio_seg = quantize(seg)
audio_seg = _make_bytes(seg)
byte_audio.append(audio_seg)
for seg in rgb_slices:
# rgb_seg = quantize(seg)
rgb_seg = _make_bytes(seg)
byte_rgb.append(rgb_seg)
return byte_audio, byte_rgb
def make_exmp(id, audio, rgb):
audio_features = []
rgb_features = []
for embedding in audio:
embedding_feature = tf.train.Feature(
bytes_list=tf.train.BytesList(value=[embedding]))
audio_features.append(embedding_feature)
for embedding in rgb:
embedding_feature = tf.train.Feature(
bytes_list=tf.train.BytesList(value=[embedding]))
rgb_features.append(embedding_feature)
# for construct yt8m data
seq_exmp = tf.train.SequenceExample(
context=tf.train.Features(
feature={
'id': tf.train.Feature(bytes_list=tf.train.BytesList(
value=[id.encode('utf-8')]))
}),
feature_lists=tf.train.FeatureLists(
feature_list={
'audio': tf.train.FeatureList(
feature=audio_features
),
'rgb': tf.train.FeatureList(
feature=rgb_features
)
})
)
serialized = seq_exmp.SerializeToString()
return serialized
def convert_pb(filename):
sequence_example = open(filename, 'rb').read()
audio, rgb = parse_exmp(sequence_example)
tmp_example = make_exmp('video', audio, rgb)
decoded = tf.train.SequenceExample.FromString(tmp_example)
return decoded
import tensorflow as tf
import numpy as np
frame_lvl_record = "test0000.tfrecord"
feat_rgb = []
feat_audio = []
for example in tf.python_io.tf_record_iterator(frame_lvl_record):
tf_seq_example = tf.train.SequenceExample.FromString(example)
test = tf_seq_example.SerializeToString()
n_frames = len(tf_seq_example.feature_lists.feature_list['audio'].feature)
sess = tf.InteractiveSession()
rgb_frame = []
audio_frame = []
# iterate through frames
for i in range(n_frames):
rgb_frame.append(tf.cast(tf.decode_raw(
tf_seq_example.feature_lists.feature_list['rgb']
.feature[i].bytes_list.value[0], tf.uint8)
, tf.float32).eval())
audio_frame.append(tf.cast(tf.decode_raw(
tf_seq_example.feature_lists.feature_list['audio']
.feature[i].bytes_list.value[0], tf.uint8)
, tf.float32).eval())
sess.close()
feat_audio.append(audio_frame)
feat_rgb.append(rgb_frame)
break
print('The first video has %d frames' %len(feat_rgb[0]))
\ No newline at end of file
No preview for this file type
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Binary for evaluating Tensorflow models on the YouTube-8M dataset."""
import json
import os
import time
from absl import logging
import eval_util
import frame_level_models
import losses
import readers
import tensorflow as tf
from tensorflow import flags
from tensorflow.python.lib.io import file_io
import utils
import video_level_models
FLAGS = flags.FLAGS
if __name__ == "__main__":
# Dataset flags.
flags.DEFINE_string(
"train_dir", "/tmp/yt8m_model/",
"The directory to load the model files from. "
"The tensorboard metrics files are also saved to this "
"directory.")
flags.DEFINE_string(
"eval_data_pattern", "",
"File glob defining the evaluation dataset in tensorflow.SequenceExample "
"format. The SequenceExamples are expected to have an 'rgb' byte array "
"sequence feature as well as a 'labels' int64 context feature.")
flags.DEFINE_bool(
"segment_labels", False,
"If set, then --eval_data_pattern must be frame-level features (but with"
" segment_labels). Otherwise, --eval_data_pattern must be aggregated "
"video-level features. The model must also be set appropriately (i.e. to "
"read 3D batches VS 4D batches.")
# Other flags.
flags.DEFINE_integer("batch_size", 1024,
"How many examples to process per batch.")
flags.DEFINE_integer("num_readers", 8,
"How many threads to use for reading input files.")
flags.DEFINE_boolean("run_once", False, "Whether to run eval only once.")
flags.DEFINE_integer("top_k", 20, "How many predictions to output per video.")
def find_class_by_name(name, modules):
"""Searches the provided modules for the named class and returns it."""
modules = [getattr(module, name, None) for module in modules]
return next(a for a in modules if a)
def get_input_evaluation_tensors(reader,
data_pattern,
batch_size=1024,
num_readers=1):
"""Creates the section of the graph which reads the evaluation data.
Args:
reader: A class which parses the training data.
data_pattern: A 'glob' style path to the data files.
batch_size: How many examples to process at a time.
num_readers: How many I/O threads to use.
Returns:
A tuple containing the features tensor, labels tensor, and optionally a
tensor containing the number of frames per video. The exact dimensions
depend on the reader being used.
Raises:
IOError: If no files matching the given pattern were found.
"""
logging.info("Using batch size of %d for evaluation.", batch_size)
with tf.name_scope("eval_input"):
files = tf.io.gfile.glob(data_pattern)
if not files:
raise IOError("Unable to find the evaluation files.")
logging.info("number of evaluation files: %d", len(files))
filename_queue = tf.train.string_input_producer(files,
shuffle=False,
num_epochs=1)
eval_data = [
reader.prepare_reader(filename_queue) for _ in range(num_readers)
]
return tf.train.batch_join(eval_data,
batch_size=batch_size,
capacity=3 * batch_size,
allow_smaller_final_batch=True,
enqueue_many=True)
def build_graph(reader,
model,
eval_data_pattern,
label_loss_fn,
batch_size=1024,
num_readers=1):
"""Creates the Tensorflow graph for evaluation.
Args:
reader: The data file reader. It should inherit from BaseReader.
model: The core model (e.g. logistic or neural net). It should inherit from
BaseModel.
eval_data_pattern: glob path to the evaluation data files.
label_loss_fn: What kind of loss to apply to the model. It should inherit
from BaseLoss.
batch_size: How many examples to process at a time.
num_readers: How many threads to use for I/O operations.
"""
global_step = tf.Variable(0, trainable=False, name="global_step")
input_data_dict = get_input_evaluation_tensors(reader,
eval_data_pattern,
batch_size=batch_size,
num_readers=num_readers)
video_id_batch = input_data_dict["video_ids"]
model_input_raw = input_data_dict["video_matrix"]
labels_batch = input_data_dict["labels"]
num_frames = input_data_dict["num_frames"]
tf.compat.v1.summary.histogram("model_input_raw", model_input_raw)
feature_dim = len(model_input_raw.get_shape()) - 1
# Normalize input features.
model_input = tf.nn.l2_normalize(model_input_raw, feature_dim)
with tf.compat.v1.variable_scope("tower"):
result = model.create_model(model_input,
num_frames=num_frames,
vocab_size=reader.num_classes,
labels=labels_batch,
is_training=False)
predictions = result["predictions"]
tf.compat.v1.summary.histogram("model_activations", predictions)
if "loss" in result.keys():
label_loss = result["loss"]
else:
label_loss = label_loss_fn.calculate_loss(predictions, labels_batch)
tf.compat.v1.add_to_collection("global_step", global_step)
tf.compat.v1.add_to_collection("loss", label_loss)
tf.compat.v1.add_to_collection("predictions", predictions)
tf.compat.v1.add_to_collection("input_batch", model_input)
tf.compat.v1.add_to_collection("input_batch_raw", model_input_raw)
tf.compat.v1.add_to_collection("video_id_batch", video_id_batch)
tf.compat.v1.add_to_collection("num_frames", num_frames)
tf.compat.v1.add_to_collection("labels", tf.cast(labels_batch, tf.float32))
if FLAGS.segment_labels:
tf.compat.v1.add_to_collection("label_weights",
input_data_dict["label_weights"])
tf.compat.v1.add_to_collection("summary_op", tf.compat.v1.summary.merge_all())
def evaluation_loop(fetches, saver, summary_writer, evl_metrics,
last_global_step_val):
"""Run the evaluation loop once.
Args:
fetches: a dict of tensors to be run within Session.
saver: a tensorflow saver to restore the model.
summary_writer: a tensorflow summary_writer
evl_metrics: an EvaluationMetrics object.
last_global_step_val: the global step used in the previous evaluation.
Returns:
The global_step used in the latest model.
"""
global_step_val = -1
with tf.Session(config=tf.ConfigProto(gpu_options=tf.GPUOptions(
allow_growth=True))) as sess:
latest_checkpoint = tf.train.latest_checkpoint(FLAGS.train_dir)
if latest_checkpoint:
logging.info("Loading checkpoint for eval: %s", latest_checkpoint)
# Restores from checkpoint
saver.restore(sess, latest_checkpoint)
# Assuming model_checkpoint_path looks something like:
# /my-favorite-path/yt8m_train/model.ckpt-0, extract global_step from it.
global_step_val = os.path.basename(latest_checkpoint).split("-")[-1]
# Save model
if FLAGS.segment_labels:
inference_model_name = "segment_inference_model"
else:
inference_model_name = "inference_model"
saver.save(
sess,
os.path.join(FLAGS.train_dir, "inference_model",
inference_model_name))
else:
logging.info("No checkpoint file found.")
return global_step_val
if global_step_val == last_global_step_val:
logging.info(
"skip this checkpoint global_step_val=%s "
"(same as the previous one).", global_step_val)
return global_step_val
sess.run([tf.local_variables_initializer()])
# Start the queue runners.
coord = tf.train.Coordinator()
try:
threads = []
for qr in tf.compat.v1.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
threads.extend(
qr.create_threads(sess, coord=coord, daemon=True, start=True))
logging.info("enter eval_once loop global_step_val = %s. ",
global_step_val)
evl_metrics.clear()
examples_processed = 0
while not coord.should_stop():
batch_start_time = time.time()
output_data_dict = sess.run(fetches)
seconds_per_batch = time.time() - batch_start_time
labels_val = output_data_dict["labels"]
summary_val = output_data_dict["summary"]
example_per_second = labels_val.shape[0] / seconds_per_batch
examples_processed += labels_val.shape[0]
predictions = output_data_dict["predictions"]
if FLAGS.segment_labels:
# This is a workaround to ignore the unrated labels.
predictions *= output_data_dict["label_weights"]
iteration_info_dict = evl_metrics.accumulate(predictions, labels_val,
output_data_dict["loss"])
iteration_info_dict["examples_per_second"] = example_per_second
iterinfo = utils.AddGlobalStepSummary(
summary_writer,
global_step_val,
iteration_info_dict,
summary_scope="SegEval" if FLAGS.segment_labels else "Eval")
logging.info("examples_processed: %d | %s", examples_processed,
iterinfo)
except tf.errors.OutOfRangeError as e:
logging.info(
"Done with batched inference. Now calculating global performance "
"metrics.")
# calculate the metrics for the entire epoch
epoch_info_dict = evl_metrics.get()
epoch_info_dict["epoch_id"] = global_step_val
summary_writer.add_summary(summary_val, global_step_val)
epochinfo = utils.AddEpochSummary(
summary_writer,
global_step_val,
epoch_info_dict,
summary_scope="SegEval" if FLAGS.segment_labels else "Eval")
logging.info(epochinfo)
evl_metrics.clear()
except Exception as e: # pylint: disable=broad-except
logging.info("Unexpected exception: %s", str(e))
coord.request_stop(e)
coord.request_stop()
coord.join(threads, stop_grace_period_secs=10)
logging.info("Total: examples_processed: %d", examples_processed)
return global_step_val
def evaluate():
"""Starts main evaluation loop."""
tf.compat.v1.set_random_seed(0) # for reproducibility
# Write json of flags
model_flags_path = os.path.join(FLAGS.train_dir, "model_flags.json")
if not file_io.file_exists(model_flags_path):
raise IOError(("Cannot find file %s. Did you run train.py on the same "
"--train_dir?") % model_flags_path)
flags_dict = json.loads(file_io.FileIO(model_flags_path, mode="r").read())
with tf.Graph().as_default():
# convert feature_names and feature_sizes to lists of values
feature_names, feature_sizes = utils.GetListOfFeatureNamesAndSizes(
flags_dict["feature_names"], flags_dict["feature_sizes"])
if flags_dict["frame_features"]:
reader = readers.YT8MFrameFeatureReader(
feature_names=feature_names,
feature_sizes=feature_sizes,
segment_labels=FLAGS.segment_labels)
else:
reader = readers.YT8MAggregatedFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
model = find_class_by_name(flags_dict["model"],
[frame_level_models, video_level_models])()
label_loss_fn = find_class_by_name(flags_dict["label_loss"], [losses])()
if not FLAGS.eval_data_pattern:
raise IOError("'eval_data_pattern' was not specified. Nothing to "
"evaluate.")
build_graph(reader=reader,
model=model,
eval_data_pattern=FLAGS.eval_data_pattern,
label_loss_fn=label_loss_fn,
num_readers=FLAGS.num_readers,
batch_size=FLAGS.batch_size)
logging.info("built evaluation graph")
# A dict of tensors to be run in Session.
fetches = {
"video_id": tf.compat.v1.get_collection("video_id_batch")[0],
"predictions": tf.compat.v1.get_collection("predictions")[0],
"labels": tf.compat.v1.get_collection("labels")[0],
"loss": tf.compat.v1.get_collection("loss")[0],
"summary": tf.compat.v1.get_collection("summary_op")[0]
}
if FLAGS.segment_labels:
fetches["label_weights"] = tf.compat.v1.get_collection("label_weights")[0]
saver = tf.compat.v1.train.Saver(tf.compat.v1.global_variables())
summary_writer = tf.compat.v1.summary.FileWriter(
os.path.join(FLAGS.train_dir, "eval"),
graph=tf.compat.v1.get_default_graph())
evl_metrics = eval_util.EvaluationMetrics(reader.num_classes, FLAGS.top_k,
None)
last_global_step_val = -1
while True:
last_global_step_val = evaluation_loop(fetches, saver, summary_writer,
evl_metrics, last_global_step_val)
if FLAGS.run_once:
break
def main(unused_argv):
logging.set_verbosity(logging.INFO)
logging.info("tensorflow version: %s", tf.__version__)
evaluate()
if __name__ == "__main__":
tf.compat.v1.app.run()
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides functions to help with evaluating models."""
import average_precision_calculator as ap_calculator
import mean_average_precision_calculator as map_calculator
import numpy
from tensorflow.python.platform import gfile
def flatten(l):
"""Merges a list of lists into a single list. """
return [item for sublist in l for item in sublist]
def calculate_hit_at_one(predictions, actuals):
"""Performs a local (numpy) calculation of the hit at one.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
Returns:
float: The average hit at one across the entire batch.
"""
top_prediction = numpy.argmax(predictions, 1)
hits = actuals[numpy.arange(actuals.shape[0]), top_prediction]
return numpy.average(hits)
def calculate_precision_at_equal_recall_rate(predictions, actuals):
"""Performs a local (numpy) calculation of the PERR.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
Returns:
float: The average precision at equal recall rate across the entire batch.
"""
aggregated_precision = 0.0
num_videos = actuals.shape[0]
for row in numpy.arange(num_videos):
num_labels = int(numpy.sum(actuals[row]))
top_indices = numpy.argpartition(predictions[row],
-num_labels)[-num_labels:]
item_precision = 0.0
for label_index in top_indices:
if predictions[row][label_index] > 0:
item_precision += actuals[row][label_index]
item_precision /= top_indices.size
aggregated_precision += item_precision
aggregated_precision /= num_videos
return aggregated_precision
def calculate_gap(predictions, actuals, top_k=20):
"""Performs a local (numpy) calculation of the global average precision.
Only the top_k predictions are taken for each of the videos.
Args:
predictions: Matrix containing the outputs of the model. Dimensions are
'batch' x 'num_classes'.
actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x
'num_classes'.
top_k: How many predictions to use per video.
Returns:
float: The global average precision.
"""
gap_calculator = ap_calculator.AveragePrecisionCalculator()
sparse_predictions, sparse_labels, num_positives = top_k_by_class(
predictions, actuals, top_k)
gap_calculator.accumulate(flatten(sparse_predictions), flatten(sparse_labels),
sum(num_positives))
return gap_calculator.peek_ap_at_n()
def top_k_by_class(predictions, labels, k=20):
"""Extracts the top k predictions for each video, sorted by class.
Args:
predictions: A numpy matrix containing the outputs of the model. Dimensions
are 'batch' x 'num_classes'.
k: the top k non-zero entries to preserve in each prediction.
Returns:
A tuple (predictions,labels, true_positives). 'predictions' and 'labels'
are lists of lists of floats. 'true_positives' is a list of scalars. The
length of the lists are equal to the number of classes. The entries in the
predictions variable are probability predictions, and
the corresponding entries in the labels variable are the ground truth for
those predictions. The entries in 'true_positives' are the number of true
positives for each class in the ground truth.
Raises:
ValueError: An error occurred when the k is not a positive integer.
"""
if k <= 0:
raise ValueError("k must be a positive integer.")
k = min(k, predictions.shape[1])
num_classes = predictions.shape[1]
prediction_triplets = []
for video_index in range(predictions.shape[0]):
prediction_triplets.extend(
top_k_triplets(predictions[video_index], labels[video_index], k))
out_predictions = [[] for _ in range(num_classes)]
out_labels = [[] for _ in range(num_classes)]
for triplet in prediction_triplets:
out_predictions[triplet[0]].append(triplet[1])
out_labels[triplet[0]].append(triplet[2])
out_true_positives = [numpy.sum(labels[:, i]) for i in range(num_classes)]
return out_predictions, out_labels, out_true_positives
def top_k_triplets(predictions, labels, k=20):
"""Get the top_k for a 1-d numpy array.
Returns a sparse list of tuples in
(prediction, class) format
"""
m = len(predictions)
k = min(k, m)
indices = numpy.argpartition(predictions, -k)[-k:]
return [(index, predictions[index], labels[index]) for index in indices]
class EvaluationMetrics(object):
"""A class to store the evaluation metrics."""
def __init__(self, num_class, top_k, top_n):
"""Construct an EvaluationMetrics object to store the evaluation metrics.
Args:
num_class: A positive integer specifying the number of classes.
top_k: A positive integer specifying how many predictions are considered
per video.
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when MeanAveragePrecisionCalculator cannot
not be constructed.
"""
self.sum_hit_at_one = 0.0
self.sum_perr = 0.0
self.sum_loss = 0.0
self.map_calculator = map_calculator.MeanAveragePrecisionCalculator(
num_class, top_n=top_n)
self.global_ap_calculator = ap_calculator.AveragePrecisionCalculator()
self.top_k = top_k
self.num_examples = 0
def accumulate(self, predictions, labels, loss):
"""Accumulate the metrics calculated locally for this mini-batch.
Args:
predictions: A numpy matrix containing the outputs of the model.
Dimensions are 'batch' x 'num_classes'.
labels: A numpy matrix containing the ground truth labels. Dimensions are
'batch' x 'num_classes'.
loss: A numpy array containing the loss for each sample.
Returns:
dictionary: A dictionary storing the metrics for the mini-batch.
Raises:
ValueError: An error occurred when the shape of predictions and actuals
does not match.
"""
batch_size = labels.shape[0]
mean_hit_at_one = calculate_hit_at_one(predictions, labels)
mean_perr = calculate_precision_at_equal_recall_rate(predictions, labels)
mean_loss = numpy.mean(loss)
# Take the top 20 predictions.
sparse_predictions, sparse_labels, num_positives = top_k_by_class(
predictions, labels, self.top_k)
self.map_calculator.accumulate(sparse_predictions, sparse_labels,
num_positives)
self.global_ap_calculator.accumulate(flatten(sparse_predictions),
flatten(sparse_labels),
sum(num_positives))
self.num_examples += batch_size
self.sum_hit_at_one += mean_hit_at_one * batch_size
self.sum_perr += mean_perr * batch_size
self.sum_loss += mean_loss * batch_size
return {"hit_at_one": mean_hit_at_one, "perr": mean_perr, "loss": mean_loss}
def get(self):
"""Calculate the evaluation metrics for the whole epoch.
Raises:
ValueError: If no examples were accumulated.
Returns:
dictionary: a dictionary storing the evaluation metrics for the epoch. The
dictionary has the fields: avg_hit_at_one, avg_perr, avg_loss, and
aps (default nan).
"""
if self.num_examples <= 0:
raise ValueError("total_sample must be positive.")
avg_hit_at_one = self.sum_hit_at_one / self.num_examples
avg_perr = self.sum_perr / self.num_examples
avg_loss = self.sum_loss / self.num_examples
aps = self.map_calculator.peek_map_at_n()
gap = self.global_ap_calculator.peek_ap_at_n()
epoch_info_dict = {
"avg_hit_at_one": avg_hit_at_one,
"avg_perr": avg_perr,
"avg_loss": avg_loss,
"aps": aps,
"gap": gap
}
return epoch_info_dict
def clear(self):
"""Clear the evaluation metrics and reset the EvaluationMetrics object."""
self.sum_hit_at_one = 0.0
self.sum_perr = 0.0
self.sum_loss = 0.0
self.map_calculator.clear()
self.global_ap_calculator.clear()
self.num_examples = 0
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities to export a model for batch prediction."""
import tensorflow as tf
import tensorflow.contrib.slim as slim
from tensorflow.python.saved_model import builder as saved_model_builder
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.saved_model import signature_def_utils
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.saved_model import utils as saved_model_utils
_TOP_PREDICTIONS_IN_OUTPUT = 20
class ModelExporter(object):
def __init__(self, frame_features, model, reader):
self.frame_features = frame_features
self.model = model
self.reader = reader
with tf.Graph().as_default() as graph:
self.inputs, self.outputs = self.build_inputs_and_outputs()
self.graph = graph
self.saver = tf.train.Saver(tf.trainable_variables(), sharded=True)
def export_model(self, model_dir, global_step_val, last_checkpoint):
"""Exports the model so that it can used for batch predictions."""
with self.graph.as_default():
with tf.Session() as session:
session.run(tf.global_variables_initializer())
self.saver.restore(session, last_checkpoint)
signature = signature_def_utils.build_signature_def(
inputs=self.inputs,
outputs=self.outputs,
method_name=signature_constants.PREDICT_METHOD_NAME)
signature_map = {
signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature
}
model_builder = saved_model_builder.SavedModelBuilder(model_dir)
model_builder.add_meta_graph_and_variables(
session,
tags=[tag_constants.SERVING],
signature_def_map=signature_map,
clear_devices=True)
model_builder.save()
def build_inputs_and_outputs(self):
if self.frame_features:
serialized_examples = tf.placeholder(tf.string, shape=(None,))
fn = lambda x: self.build_prediction_graph(x)
video_id_output, top_indices_output, top_predictions_output = (tf.map_fn(
fn, serialized_examples, dtype=(tf.string, tf.int32, tf.float32)))
else:
serialized_examples = tf.placeholder(tf.string, shape=(None,))
video_id_output, top_indices_output, top_predictions_output = (
self.build_prediction_graph(serialized_examples))
inputs = {
"example_bytes":
saved_model_utils.build_tensor_info(serialized_examples)
}
outputs = {
"video_id":
saved_model_utils.build_tensor_info(video_id_output),
"class_indexes":
saved_model_utils.build_tensor_info(top_indices_output),
"predictions":
saved_model_utils.build_tensor_info(top_predictions_output)
}
return inputs, outputs
def build_prediction_graph(self, serialized_examples):
input_data_dict = (
self.reader.prepare_serialized_examples(serialized_examples))
video_id = input_data_dict["video_ids"]
model_input_raw = input_data_dict["video_matrix"]
labels_batch = input_data_dict["labels"]
num_frames = input_data_dict["num_frames"]
feature_dim = len(model_input_raw.get_shape()) - 1
model_input = tf.nn.l2_normalize(model_input_raw, feature_dim)
with tf.variable_scope("tower"):
result = self.model.create_model(model_input,
num_frames=num_frames,
vocab_size=self.reader.num_classes,
labels=labels_batch,
is_training=False)
for variable in slim.get_model_variables():
tf.summary.histogram(variable.op.name, variable)
predictions = result["predictions"]
top_predictions, top_indices = tf.nn.top_k(predictions,
_TOP_PREDICTIONS_IN_OUTPUT)
return video_id, top_indices, top_predictions
# Lint as: python3
import numpy as np
import tensorflow as tf
from tensorflow import app
from tensorflow import flags
FLAGS = flags.FLAGS
def main(unused_argv):
# Get the input tensor names to be replaced.
tf.reset_default_graph()
meta_graph_location = FLAGS.checkpoint_file + ".meta"
tf.train.import_meta_graph(meta_graph_location, clear_devices=True)
input_tensor_name = tf.get_collection("input_batch_raw")[0].name
num_frames_tensor_name = tf.get_collection("num_frames")[0].name
# Create output graph.
saver = tf.train.Saver()
tf.reset_default_graph()
input_feature_placeholder = tf.placeholder(
tf.float32, shape=(None, None, 1152))
num_frames_placeholder = tf.placeholder(tf.int32, shape=(None, 1))
saver = tf.train.import_meta_graph(
meta_graph_location,
input_map={
input_tensor_name: input_feature_placeholder,
num_frames_tensor_name: tf.squeeze(num_frames_placeholder, axis=1)
},
clear_devices=True)
predictions_tensor = tf.get_collection("predictions")[0]
with tf.Session() as sess:
print("restoring variables from " + FLAGS.checkpoint_file)
saver.restore(sess, FLAGS.checkpoint_file)
tf.saved_model.simple_save(
sess,
FLAGS.output_dir,
inputs={'rgb_and_audio': input_feature_placeholder,
'num_frames': num_frames_placeholder},
outputs={'predictions': predictions_tensor})
# Try running inference.
predictions = sess.run(
[predictions_tensor],
feed_dict={
input_feature_placeholder: np.zeros((3, 7, 1152), dtype=np.float32),
num_frames_placeholder: np.array([[7]], dtype=np.int32)})
print('Test inference:', predictions)
print('Model saved to ', FLAGS.output_dir)
if __name__ == '__main__':
flags.DEFINE_string('checkpoint_file', None, 'Path to the checkpoint file.')
flags.DEFINE_string('output_dir', None, 'SavedModel output directory.')
app.run(main)
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a collection of models which operate on variable-length sequences."""
import math
import model_utils as utils
import models
import tensorflow as tf
from tensorflow import flags
import tensorflow.contrib.slim as slim
import video_level_models
FLAGS = flags.FLAGS
flags.DEFINE_integer("iterations", 30, "Number of frames per batch for DBoF.")
flags.DEFINE_bool("dbof_add_batch_norm", True,
"Adds batch normalization to the DBoF model.")
flags.DEFINE_bool(
"sample_random_frames", True,
"If true samples random frames (for frame level models). If false, a random"
"sequence of frames is sampled instead.")
flags.DEFINE_integer("dbof_cluster_size", 8192,
"Number of units in the DBoF cluster layer.")
flags.DEFINE_integer("dbof_hidden_size", 1024,
"Number of units in the DBoF hidden layer.")
flags.DEFINE_string(
"dbof_pooling_method", "max",
"The pooling method used in the DBoF cluster layer. "
"Choices are 'average' and 'max'.")
flags.DEFINE_string(
"dbof_activation", "sigmoid",
"The nonlinear activation method for cluster and hidden dense layer, e.g., "
"sigmoid, relu6, etc.")
flags.DEFINE_string(
"video_level_classifier_model", "MoeModel",
"Some Frame-Level models can be decomposed into a "
"generalized pooling operation followed by a "
"classifier layer")
flags.DEFINE_integer("lstm_cells", 1024, "Number of LSTM cells.")
flags.DEFINE_integer("lstm_layers", 2, "Number of LSTM layers.")
class FrameLevelLogisticModel(models.BaseModel):
"""Creates a logistic classifier over the aggregated frame-level features."""
def create_model(self, model_input, vocab_size, num_frames, **unused_params):
"""See base class.
This class is intended to be an example for implementors of frame level
models. If you want to train a model over averaged features it is more
efficient to average them beforehand rather than on the fly.
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
num_frames = tf.cast(tf.expand_dims(num_frames, 1), tf.float32)
feature_size = model_input.get_shape().as_list()[2]
denominators = tf.reshape(tf.tile(num_frames, [1, feature_size]),
[-1, feature_size])
avg_pooled = tf.reduce_sum(model_input, axis=[1]) / denominators
output = slim.fully_connected(avg_pooled,
vocab_size,
activation_fn=tf.nn.sigmoid,
weights_regularizer=slim.l2_regularizer(1e-8))
return {"predictions": output}
class DbofModel(models.BaseModel):
"""Creates a Deep Bag of Frames model.
The model projects the features for each frame into a higher dimensional
'clustering' space, pools across frames in that space, and then
uses a configurable video-level model to classify the now aggregated features.
The model will randomly sample either frames or sequences of frames during
training to speed up convergence.
"""
ACT_FN_MAP = {
"sigmoid": tf.nn.sigmoid,
"relu6": tf.nn.relu6,
}
def create_model(self,
model_input,
vocab_size,
num_frames,
iterations=None,
add_batch_norm=None,
sample_random_frames=None,
cluster_size=None,
hidden_size=None,
is_training=True,
**unused_params):
"""See base class.
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
iterations: the number of frames to be sampled.
add_batch_norm: whether to add batch norm during training.
sample_random_frames: whether to sample random frames or random sequences.
cluster_size: the output neuron number of the cluster layer.
hidden_size: the output neuron number of the hidden layer.
is_training: whether to build the graph in training mode.
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
iterations = iterations or FLAGS.iterations
add_batch_norm = add_batch_norm or FLAGS.dbof_add_batch_norm
random_frames = sample_random_frames or FLAGS.sample_random_frames
cluster_size = cluster_size or FLAGS.dbof_cluster_size
hidden1_size = hidden_size or FLAGS.dbof_hidden_size
act_fn = self.ACT_FN_MAP.get(FLAGS.dbof_activation)
assert act_fn is not None, ("dbof_activation is not valid: %s." %
FLAGS.dbof_activation)
num_frames = tf.cast(tf.expand_dims(num_frames, 1), tf.float32)
if random_frames:
model_input = utils.SampleRandomFrames(model_input, num_frames,
iterations)
else:
model_input = utils.SampleRandomSequence(model_input, num_frames,
iterations)
max_frames = model_input.get_shape().as_list()[1]
feature_size = model_input.get_shape().as_list()[2]
reshaped_input = tf.reshape(model_input, [-1, feature_size])
tf.compat.v1.summary.histogram("input_hist", reshaped_input)
if add_batch_norm:
reshaped_input = slim.batch_norm(reshaped_input,
center=True,
scale=True,
is_training=is_training,
scope="input_bn")
cluster_weights = tf.compat.v1.get_variable(
"cluster_weights", [feature_size, cluster_size],
initializer=tf.random_normal_initializer(stddev=1 /
math.sqrt(feature_size)))
tf.compat.v1.summary.histogram("cluster_weights", cluster_weights)
activation = tf.matmul(reshaped_input, cluster_weights)
if add_batch_norm:
activation = slim.batch_norm(activation,
center=True,
scale=True,
is_training=is_training,
scope="cluster_bn")
else:
cluster_biases = tf.compat.v1.get_variable(
"cluster_biases", [cluster_size],
initializer=tf.random_normal_initializer(stddev=1 /
math.sqrt(feature_size)))
tf.compat.v1.summary.histogram("cluster_biases", cluster_biases)
activation += cluster_biases
activation = act_fn(activation)
tf.compat.v1.summary.histogram("cluster_output", activation)
activation = tf.reshape(activation, [-1, max_frames, cluster_size])
activation = utils.FramePooling(activation, FLAGS.dbof_pooling_method)
hidden1_weights = tf.compat.v1.get_variable(
"hidden1_weights", [cluster_size, hidden1_size],
initializer=tf.random_normal_initializer(stddev=1 /
math.sqrt(cluster_size)))
tf.compat.v1.summary.histogram("hidden1_weights", hidden1_weights)
activation = tf.matmul(activation, hidden1_weights)
if add_batch_norm:
activation = slim.batch_norm(activation,
center=True,
scale=True,
is_training=is_training,
scope="hidden1_bn")
else:
hidden1_biases = tf.compat.v1.get_variable(
"hidden1_biases", [hidden1_size],
initializer=tf.random_normal_initializer(stddev=0.01))
tf.compat.v1.summary.histogram("hidden1_biases", hidden1_biases)
activation += hidden1_biases
activation = act_fn(activation)
tf.compat.v1.summary.histogram("hidden1_output", activation)
aggregated_model = getattr(video_level_models,
FLAGS.video_level_classifier_model)
return aggregated_model().create_model(model_input=activation,
vocab_size=vocab_size,
**unused_params)
class LstmModel(models.BaseModel):
"""Creates a model which uses a stack of LSTMs to represent the video."""
def create_model(self, model_input, vocab_size, num_frames, **unused_params):
"""See base class.
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
lstm_size = FLAGS.lstm_cells
number_of_layers = FLAGS.lstm_layers
stacked_lstm = tf.contrib.rnn.MultiRNNCell([
tf.contrib.rnn.BasicLSTMCell(lstm_size, forget_bias=1.0)
for _ in range(number_of_layers)
])
_, state = tf.nn.dynamic_rnn(stacked_lstm,
model_input,
sequence_length=num_frames,
dtype=tf.float32)
aggregated_model = getattr(video_level_models,
FLAGS.video_level_classifier_model)
return aggregated_model().create_model(model_input=state[-1].h,
vocab_size=vocab_size,
**unused_params)
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Binary for generating predictions over a set of videos."""
from __future__ import print_function
import glob
import heapq
import json
import os
import tarfile
import tempfile
import time
import numpy as np
import readers
from six.moves import urllib
import tensorflow as tf
from tensorflow import app
from tensorflow import flags
from tensorflow import gfile
from tensorflow import logging
from tensorflow.python.lib.io import file_io
import utils
FLAGS = flags.FLAGS
if __name__ == "__main__":
# Input
flags.DEFINE_string(
"train_dir", "", "The directory to load the model files from. We assume "
"that you have already run eval.py onto this, such that "
"inference_model.* files already exist.")
flags.DEFINE_string(
"input_data_pattern", "",
"File glob defining the evaluation dataset in tensorflow.SequenceExample "
"format. The SequenceExamples are expected to have an 'rgb' byte array "
"sequence feature as well as a 'labels' int64 context feature.")
flags.DEFINE_string(
"input_model_tgz", "",
"If given, must be path to a .tgz file that was written "
"by this binary using flag --output_model_tgz. In this "
"case, the .tgz file will be untarred to "
"--untar_model_dir and the model will be used for "
"inference.")
flags.DEFINE_string(
"untar_model_dir", "/tmp/yt8m-model",
"If --input_model_tgz is given, then this directory will "
"be created and the contents of the .tgz file will be "
"untarred here.")
flags.DEFINE_bool(
"segment_labels", False,
"If set, then --input_data_pattern must be frame-level features (but with"
" segment_labels). Otherwise, --input_data_pattern must be aggregated "
"video-level features. The model must also be set appropriately (i.e. to "
"read 3D batches VS 4D batches.")
flags.DEFINE_integer("segment_max_pred", 100000,
"Limit total number of segment outputs per entity.")
flags.DEFINE_string(
"segment_label_ids_file",
"https://raw.githubusercontent.com/google/youtube-8m/master/segment_label_ids.csv",
"The file that contains the segment label ids.")
# Output
flags.DEFINE_string("output_file", "", "The file to save the predictions to.")
flags.DEFINE_string(
"output_model_tgz", "",
"If given, should be a filename with a .tgz extension, "
"the model graph and checkpoint will be bundled in this "
"gzip tar. This file can be uploaded to Kaggle for the "
"top 10 participants.")
flags.DEFINE_integer("top_k", 20, "How many predictions to output per video.")
# Other flags.
flags.DEFINE_integer("batch_size", 512,
"How many examples to process per batch.")
flags.DEFINE_integer("num_readers", 1,
"How many threads to use for reading input files.")
def format_lines(video_ids, predictions, top_k, whitelisted_cls_mask=None):
"""Create an information line the submission file."""
batch_size = len(video_ids)
for video_index in range(batch_size):
video_prediction = predictions[video_index]
if whitelisted_cls_mask is not None:
# Whitelist classes.
video_prediction *= whitelisted_cls_mask
top_indices = np.argpartition(video_prediction, -top_k)[-top_k:]
line = [(class_index, predictions[video_index][class_index])
for class_index in top_indices]
line = sorted(line, key=lambda p: -p[1])
yield (video_ids[video_index] + "," +
" ".join("%i %g" % (label, score) for (label, score) in line) +
"\n").encode("utf8")
def get_input_data_tensors(reader, data_pattern, batch_size, num_readers=1):
"""Creates the section of the graph which reads the input data.
Args:
reader: A class which parses the input data.
data_pattern: A 'glob' style path to the data files.
batch_size: How many examples to process at a time.
num_readers: How many I/O threads to use.
Returns:
A tuple containing the features tensor, labels tensor, and optionally a
tensor containing the number of frames per video. The exact dimensions
depend on the reader being used.
Raises:
IOError: If no files matching the given pattern were found.
"""
with tf.name_scope("input"):
files = gfile.Glob(data_pattern)
if not files:
raise IOError("Unable to find input files. data_pattern='" +
data_pattern + "'")
logging.info("number of input files: " + str(len(files)))
filename_queue = tf.train.string_input_producer(files,
num_epochs=1,
shuffle=False)
examples_and_labels = [
reader.prepare_reader(filename_queue) for _ in range(num_readers)
]
input_data_dict = (tf.train.batch_join(examples_and_labels,
batch_size=batch_size,
allow_smaller_final_batch=True,
enqueue_many=True))
video_id_batch = input_data_dict["video_ids"]
video_batch = input_data_dict["video_matrix"]
num_frames_batch = input_data_dict["num_frames"]
return video_id_batch, video_batch, num_frames_batch
def get_segments(batch_video_mtx, batch_num_frames, segment_size):
"""Get segment-level inputs from frame-level features."""
video_batch_size = batch_video_mtx.shape[0]
max_frame = batch_video_mtx.shape[1]
feature_dim = batch_video_mtx.shape[-1]
padded_segment_sizes = (batch_num_frames + segment_size - 1) // segment_size
padded_segment_sizes *= segment_size
segment_mask = (
0 < (padded_segment_sizes[:, np.newaxis] - np.arange(0, max_frame)))
# Segment bags.
frame_bags = batch_video_mtx.reshape((-1, feature_dim))
segment_frames = frame_bags[segment_mask.reshape(-1)].reshape(
(-1, segment_size, feature_dim))
# Segment num frames.
segment_start_times = np.arange(0, max_frame, segment_size)
num_segments = batch_num_frames[:, np.newaxis] - segment_start_times
num_segment_bags = num_segments.reshape((-1))
valid_segment_mask = num_segment_bags > 0
segment_num_frames = num_segment_bags[valid_segment_mask]
segment_num_frames[segment_num_frames > segment_size] = segment_size
max_segment_num = (max_frame + segment_size - 1) // segment_size
video_idxs = np.tile(
np.arange(0, video_batch_size)[:, np.newaxis], [1, max_segment_num])
segment_idxs = np.tile(segment_start_times, [video_batch_size, 1])
idx_bags = np.stack([video_idxs, segment_idxs], axis=-1).reshape((-1, 2))
video_segment_ids = idx_bags[valid_segment_mask]
return {
"video_batch": segment_frames,
"num_frames_batch": segment_num_frames,
"video_segment_ids": video_segment_ids
}
def inference(reader, train_dir, data_pattern, out_file_location, batch_size,
top_k):
"""Inference function."""
with tf.Session(config=tf.ConfigProto(
allow_soft_placement=True)) as sess, gfile.Open(out_file_location,
"w+") as out_file:
video_id_batch, video_batch, num_frames_batch = get_input_data_tensors(
reader, data_pattern, batch_size)
inference_model_name = "segment_inference_model" if FLAGS.segment_labels else "inference_model"
checkpoint_file = os.path.join(train_dir, "inference_model",
inference_model_name)
if not gfile.Exists(checkpoint_file + ".meta"):
raise IOError("Cannot find %s. Did you run eval.py?" % checkpoint_file)
meta_graph_location = checkpoint_file + ".meta"
logging.info("loading meta-graph: " + meta_graph_location)
if FLAGS.output_model_tgz:
with tarfile.open(FLAGS.output_model_tgz, "w:gz") as tar:
for model_file in glob.glob(checkpoint_file + ".*"):
tar.add(model_file, arcname=os.path.basename(model_file))
tar.add(os.path.join(train_dir, "model_flags.json"),
arcname="model_flags.json")
print("Tarred model onto " + FLAGS.output_model_tgz)
with tf.device("/cpu:0"):
saver = tf.train.import_meta_graph(meta_graph_location,
clear_devices=True)
logging.info("restoring variables from " + checkpoint_file)
saver.restore(sess, checkpoint_file)
input_tensor = tf.get_collection("input_batch_raw")[0]
num_frames_tensor = tf.get_collection("num_frames")[0]
predictions_tensor = tf.get_collection("predictions")[0]
# Workaround for num_epochs issue.
def set_up_init_ops(variables):
init_op_list = []
for variable in list(variables):
if "train_input" in variable.name:
init_op_list.append(tf.assign(variable, 1))
variables.remove(variable)
init_op_list.append(tf.variables_initializer(variables))
return init_op_list
sess.run(
set_up_init_ops(tf.get_collection_ref(tf.GraphKeys.LOCAL_VARIABLES)))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
num_examples_processed = 0
start_time = time.time()
whitelisted_cls_mask = None
if FLAGS.segment_labels:
final_out_file = out_file
out_file = tempfile.NamedTemporaryFile()
logging.info(
"Segment temp prediction output will be written to temp file: %s",
out_file.name)
if FLAGS.segment_label_ids_file:
whitelisted_cls_mask = np.zeros((predictions_tensor.get_shape()[-1],),
dtype=np.float32)
segment_label_ids_file = FLAGS.segment_label_ids_file
if segment_label_ids_file.startswith("http"):
logging.info("Retrieving segment ID whitelist files from %s...",
segment_label_ids_file)
segment_label_ids_file, _ = urllib.request.urlretrieve(
segment_label_ids_file)
with tf.io.gfile.GFile(segment_label_ids_file) as fobj:
for line in fobj:
try:
cls_id = int(line)
whitelisted_cls_mask[cls_id] = 1.
except ValueError:
# Simply skip the non-integer line.
continue
out_file.write(u"VideoId,LabelConfidencePairs\n".encode("utf8"))
try:
while not coord.should_stop():
video_id_batch_val, video_batch_val, num_frames_batch_val = sess.run(
[video_id_batch, video_batch, num_frames_batch])
if FLAGS.segment_labels:
results = get_segments(video_batch_val, num_frames_batch_val, 5)
video_segment_ids = results["video_segment_ids"]
video_id_batch_val = video_id_batch_val[video_segment_ids[:, 0]]
video_id_batch_val = np.array([
"%s:%d" % (x.decode("utf8"), y)
for x, y in zip(video_id_batch_val, video_segment_ids[:, 1])
])
video_batch_val = results["video_batch"]
num_frames_batch_val = results["num_frames_batch"]
if input_tensor.get_shape()[1] != video_batch_val.shape[1]:
raise ValueError("max_frames mismatch. Please re-run the eval.py "
"with correct segment_labels settings.")
predictions_val, = sess.run([predictions_tensor],
feed_dict={
input_tensor: video_batch_val,
num_frames_tensor: num_frames_batch_val
})
now = time.time()
num_examples_processed += len(video_batch_val)
elapsed_time = now - start_time
logging.info("num examples processed: " + str(num_examples_processed) +
" elapsed seconds: " + "{0:.2f}".format(elapsed_time) +
" examples/sec: %.2f" %
(num_examples_processed / elapsed_time))
for line in format_lines(video_id_batch_val, predictions_val, top_k,
whitelisted_cls_mask):
out_file.write(line)
out_file.flush()
except tf.errors.OutOfRangeError:
logging.info("Done with inference. The output file was written to " +
out_file.name)
finally:
coord.request_stop()
if FLAGS.segment_labels:
# Re-read the file and do heap sort.
# Create multiple heaps.
logging.info("Post-processing segment predictions...")
heaps = {}
out_file.seek(0, 0)
for line in out_file:
segment_id, preds = line.decode("utf8").split(",")
if segment_id == "VideoId":
# Skip the headline.
continue
preds = preds.split(" ")
pred_cls_ids = [int(preds[idx]) for idx in range(0, len(preds), 2)]
pred_cls_scores = [
float(preds[idx]) for idx in range(1, len(preds), 2)
]
for cls, score in zip(pred_cls_ids, pred_cls_scores):
if not whitelisted_cls_mask[cls]:
# Skip non-whitelisted classes.
continue
if cls not in heaps:
heaps[cls] = []
if len(heaps[cls]) >= FLAGS.segment_max_pred:
heapq.heappushpop(heaps[cls], (score, segment_id))
else:
heapq.heappush(heaps[cls], (score, segment_id))
logging.info("Writing sorted segment predictions to: %s",
final_out_file.name)
final_out_file.write("Class,Segments\n")
for cls, cls_heap in heaps.items():
cls_heap.sort(key=lambda x: x[0], reverse=True)
final_out_file.write("%d,%s\n" %
(cls, " ".join([x[1] for x in cls_heap])))
final_out_file.close()
out_file.close()
coord.join(threads)
sess.close()
def main(unused_argv):
logging.set_verbosity(tf.logging.INFO)
if FLAGS.input_model_tgz:
if FLAGS.train_dir:
raise ValueError("You cannot supply --train_dir if supplying "
"--input_model_tgz")
# Untar.
if not os.path.exists(FLAGS.untar_model_dir):
os.makedirs(FLAGS.untar_model_dir)
tarfile.open(FLAGS.input_model_tgz).extractall(FLAGS.untar_model_dir)
FLAGS.train_dir = FLAGS.untar_model_dir
flags_dict_file = os.path.join(FLAGS.train_dir, "model_flags.json")
if not file_io.file_exists(flags_dict_file):
raise IOError("Cannot find %s. Did you run eval.py?" % flags_dict_file)
flags_dict = json.loads(file_io.FileIO(flags_dict_file, "r").read())
# convert feature_names and feature_sizes to lists of values
feature_names, feature_sizes = utils.GetListOfFeatureNamesAndSizes(
flags_dict["feature_names"], flags_dict["feature_sizes"])
if flags_dict["frame_features"]:
reader = readers.YT8MFrameFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
else:
reader = readers.YT8MAggregatedFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
if not FLAGS.output_file:
raise ValueError("'output_file' was not specified. "
"Unable to continue with inference.")
if not FLAGS.input_data_pattern:
raise ValueError("'input_data_pattern' was not specified. "
"Unable to continue with inference.")
inference(reader, FLAGS.train_dir, FLAGS.input_data_pattern,
FLAGS.output_file, FLAGS.batch_size, FLAGS.top_k)
if __name__ == "__main__":
app.run()
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Binary for generating predictions over a set of videos."""
from __future__ import print_function
import glob
import heapq
import json
import os
import tarfile
import tempfile
import time
import numpy as np
import readers
from six.moves import urllib
import tensorflow as tf
from tensorflow import app
from tensorflow import flags
from tensorflow import gfile
from tensorflow import logging
from tensorflow.python.lib.io import file_io
import utils
from collections import Counter
FLAGS = flags.FLAGS
if __name__ == "__main__":
# Input
flags.DEFINE_string(
"train_dir", "", "The directory to load the model files from. We assume "
"that you have already run eval.py onto this, such that "
"inference_model.* files already exist.")
flags.DEFINE_string(
"input_data_pattern", "",
"File glob defining the evaluation dataset in tensorflow.SequenceExample "
"format. The SequenceExamples are expected to have an 'rgb' byte array "
"sequence feature as well as a 'labels' int64 context feature.")
flags.DEFINE_string(
"input_model_tgz", "",
"If given, must be path to a .tgz file that was written "
"by this binary using flag --output_model_tgz. In this "
"case, the .tgz file will be untarred to "
"--untar_model_dir and the model will be used for "
"inference.")
flags.DEFINE_string(
"untar_model_dir", "/tmp/yt8m-model",
"If --input_model_tgz is given, then this directory will "
"be created and the contents of the .tgz file will be "
"untarred here.")
flags.DEFINE_bool(
"segment_labels", False,
"If set, then --input_data_pattern must be frame-level features (but with"
" segment_labels). Otherwise, --input_data_pattern must be aggregated "
"video-level features. The model must also be set appropriately (i.e. to "
"read 3D batches VS 4D batches.")
flags.DEFINE_integer("segment_max_pred", 100000,
"Limit total number of segment outputs per entity.")
flags.DEFINE_string(
"segment_label_ids_file",
"https://raw.githubusercontent.com/google/youtube-8m/master/segment_label_ids.csv",
"The file that contains the segment label ids.")
# Output
flags.DEFINE_string("output_file", "", "The file to save the predictions to.")
flags.DEFINE_string(
"output_model_tgz", "",
"If given, should be a filename with a .tgz extension, "
"the model graph and checkpoint will be bundled in this "
"gzip tar. This file can be uploaded to Kaggle for the "
"top 10 participants.")
flags.DEFINE_integer("top_k", 1, "How many predictions to output per video.")
# Other flags.
flags.DEFINE_integer("batch_size", 512,
"How many examples to process per batch.")
flags.DEFINE_integer("num_readers", 1,
"How many threads to use for reading input files.")
def format_lines(video_ids, predictions, top_k, whitelisted_cls_mask=None):
"""Create an information line the submission file."""
batch_size = len(video_ids)
for video_index in range(batch_size):
video_prediction = predictions[video_index]
if whitelisted_cls_mask is not None:
# Whitelist classes.
video_prediction *= whitelisted_cls_mask
top_indices = np.argpartition(video_prediction, -top_k)[-top_k:]
line = [(class_index, predictions[video_index][class_index])
for class_index in top_indices]
line = sorted(line, key=lambda p: -p[1])
yield (video_ids[video_index] + "," +
" ".join("%i %g" % (label, score) for (label, score) in line) +
"\n").encode("utf8")
def get_input_data_tensors(reader, data_pattern, batch_size, num_readers=1):
"""Creates the section of the graph which reads the input data.
Args:
reader: A class which parses the input data.
data_pattern: A 'glob' style path to the data files.
batch_size: How many examples to process at a time.
num_readers: How many I/O threads to use.
Returns:
A tuple containing the features tensor, labels tensor, and optionally a
tensor containing the number of frames per video. The exact dimensions
depend on the reader being used.
Raises:
IOError: If no files matching the given pattern were found.
"""
with tf.name_scope("input"):
files = gfile.Glob(data_pattern)
if not files:
raise IOError("Unable to find input files. data_pattern='" +
data_pattern + "'")
logging.info("number of input files: " + str(len(files)))
filename_queue = tf.train.string_input_producer(files,
num_epochs=1,
shuffle=False)
examples_and_labels = [
reader.prepare_reader(filename_queue) for _ in range(num_readers)
]
input_data_dict = (tf.train.batch_join(examples_and_labels,
batch_size=batch_size,
allow_smaller_final_batch=True,
enqueue_many=True))
video_id_batch = input_data_dict["video_ids"]
video_batch = input_data_dict["video_matrix"]
num_frames_batch = input_data_dict["num_frames"]
return video_id_batch, video_batch, num_frames_batch
def get_segments(batch_video_mtx, batch_num_frames, segment_size):
"""Get segment-level inputs from frame-level features."""
video_batch_size = batch_video_mtx.shape[0]
max_frame = batch_video_mtx.shape[1]
feature_dim = batch_video_mtx.shape[-1]
padded_segment_sizes = (batch_num_frames + segment_size - 1) // segment_size
padded_segment_sizes *= segment_size
segment_mask = (
0 < (padded_segment_sizes[:, np.newaxis] - np.arange(0, max_frame)))
# Segment bags.
frame_bags = batch_video_mtx.reshape((-1, feature_dim))
segment_frames = frame_bags[segment_mask.reshape(-1)].reshape(
(-1, segment_size, feature_dim))
# Segment num frames.
segment_start_times = np.arange(0, max_frame, segment_size)
num_segments = batch_num_frames[:, np.newaxis] - segment_start_times
num_segment_bags = num_segments.reshape((-1))
valid_segment_mask = num_segment_bags > 0
segment_num_frames = num_segment_bags[valid_segment_mask]
segment_num_frames[segment_num_frames > segment_size] = segment_size
max_segment_num = (max_frame + segment_size - 1) // segment_size
video_idxs = np.tile(
np.arange(0, video_batch_size)[:, np.newaxis], [1, max_segment_num])
segment_idxs = np.tile(segment_start_times, [video_batch_size, 1])
idx_bags = np.stack([video_idxs, segment_idxs], axis=-1).reshape((-1, 2))
video_segment_ids = idx_bags[valid_segment_mask]
return {
"video_batch": segment_frames,
"num_frames_batch": segment_num_frames,
"video_segment_ids": video_segment_ids
}
def inference(reader, train_dir, data_pattern, out_file_location, batch_size,
top_k):
"""Inference function."""
with tf.Session(config=tf.ConfigProto(
allow_soft_placement=True)) as sess, gfile.Open(out_file_location,
"w+") as out_file:
video_id_batch, video_batch, num_frames_batch = get_input_data_tensors(
reader, data_pattern, batch_size)
inference_model_name = "segment_inference_model" if FLAGS.segment_labels else "inference_model"
checkpoint_file = os.path.join(train_dir, "inference_model",
inference_model_name)
if not gfile.Exists(checkpoint_file + ".meta"):
raise IOError("Cannot find %s. Did you run eval.py?" % checkpoint_file)
meta_graph_location = checkpoint_file + ".meta"
logging.info("loading meta-graph: " + meta_graph_location)
if FLAGS.output_model_tgz:
with tarfile.open(FLAGS.output_model_tgz, "w:gz") as tar:
for model_file in glob.glob(checkpoint_file + ".*"):
tar.add(model_file, arcname=os.path.basename(model_file))
tar.add(os.path.join(train_dir, "model_flags.json"),
arcname="model_flags.json")
print("Tarred model onto " + FLAGS.output_model_tgz)
with tf.device("/cpu:0"):
saver = tf.train.import_meta_graph(meta_graph_location,
clear_devices=True)
logging.info("restoring variables from " + checkpoint_file)
saver.restore(sess, checkpoint_file)
input_tensor = tf.get_collection("input_batch_raw")[0]
num_frames_tensor = tf.get_collection("num_frames")[0]
predictions_tensor = tf.get_collection("predictions")[0]
# Workaround for num_epochs issue.
def set_up_init_ops(variables):
init_op_list = []
for variable in list(variables):
if "train_input" in variable.name:
init_op_list.append(tf.assign(variable, 1))
variables.remove(variable)
init_op_list.append(tf.variables_initializer(variables))
return init_op_list
sess.run(
set_up_init_ops(tf.get_collection_ref(tf.GraphKeys.LOCAL_VARIABLES)))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
num_examples_processed = 0
start_time = time.time()
whitelisted_cls_mask = None
if FLAGS.segment_labels:
final_out_file = out_file
out_file = tempfile.NamedTemporaryFile()
logging.info(
"Segment temp prediction output will be written to temp file: %s",
out_file.name)
if FLAGS.segment_label_ids_file:
whitelisted_cls_mask = np.zeros((predictions_tensor.get_shape()[-1],),
dtype=np.float32)
segment_label_ids_file = FLAGS.segment_label_ids_file
if segment_label_ids_file.startswith("http"):
logging.info("Retrieving segment ID whitelist files from %s...",
segment_label_ids_file)
segment_label_ids_file, _ = urllib.request.urlretrieve(
segment_label_ids_file)
with tf.io.gfile.GFile(segment_label_ids_file) as fobj:
for line in fobj:
try:
cls_id = int(line)
whitelisted_cls_mask[cls_id] = 1.
except ValueError:
# Simply skip the non-integer line.
continue
out_file.write(u"VideoId,LabelConfidencePairs\n".encode("utf8"))
try:
while not coord.should_stop():
video_id_batch_val, video_batch_val, num_frames_batch_val = sess.run(
[video_id_batch, video_batch, num_frames_batch])
if FLAGS.segment_labels:
results = get_segments(video_batch_val, num_frames_batch_val, 5)
video_segment_ids = results["video_segment_ids"]
video_id_batch_val = video_id_batch_val[video_segment_ids[:, 0]]
video_id_batch_val = np.array([
"%s:%d" % (x.decode("utf8"), y)
for x, y in zip(video_id_batch_val, video_segment_ids[:, 1])
])
video_batch_val = results["video_batch"]
num_frames_batch_val = results["num_frames_batch"]
if input_tensor.get_shape()[1] != video_batch_val.shape[1]:
raise ValueError("max_frames mismatch. Please re-run the eval.py "
"with correct segment_labels settings.")
predictions_val, = sess.run([predictions_tensor],
feed_dict={
input_tensor: video_batch_val,
num_frames_tensor: num_frames_batch_val
})
now = time.time()
num_examples_processed += len(video_batch_val)
elapsed_time = now - start_time
logging.info("num examples processed: " + str(num_examples_processed) +
" elapsed seconds: " + "{0:.2f}".format(elapsed_time) +
" examples/sec: %.2f" %
(num_examples_processed / elapsed_time))
for line in format_lines(video_id_batch_val, predictions_val, top_k,
whitelisted_cls_mask):
out_file.write(line)
out_file.flush()
except tf.errors.OutOfRangeError:
logging.info("Done with inference. The output file was written to " +
out_file.name)
finally:
coord.request_stop()
if FLAGS.segment_labels:
# Re-read the file and do heap sort.
# Create multiple heaps.
logging.info("Post-processing segment predictions...")
segment_id_list = []
segment_classes = []
cls_result_arr = []
out_file.seek(0, 0)
for line in out_file:
segment_id, preds = line.decode("utf8").split(",")
if segment_id == "VideoId":
# Skip the headline.
continue
preds = preds.split(" ")
pred_cls_ids = [int(preds[idx]) for idx in range(0, len(preds), 2)]
# =======================================
segment_id = str(segment_id.split(":")[0])
if segment_id not in segment_id_list:
segment_id_list.append(str(segment_id))
segment_classes.append("")
index = segment_id_list.index(segment_id)
for classes in pred_cls_ids:
segment_classes[index] = str(segment_classes[index]) + str(
classes) + " " # append classes from new segment
for segs, item in zip(segment_id_list, segment_classes):
print('====== R E C O R D ======')
cls_arr = item.split(" ")[:-1]
cls_arr = list(map(int, cls_arr))
cls_arr = sorted(cls_arr)
result_string = ""
temp = Counter(cls_arr)
for item in temp:
result_string = result_string + str(item) + ":" + str(temp[item]) + ","
cls_result_arr.append(result_string[:-1])
logging.info(segs + " : " + result_string[:-1])
# =======================================
final_out_file.write("vid_id,seg_classes\n")
for seg_id, class_indcies in zip(segment_id_list, cls_result_arr):
final_out_file.write("%s,%s\n" % (seg_id, str(class_indcies)))
final_out_file.close()
out_file.close()
coord.join(threads)
sess.close()
def main(unused_argv):
logging.set_verbosity(tf.logging.INFO)
if FLAGS.input_model_tgz:
if FLAGS.train_dir:
raise ValueError("You cannot supply --train_dir if supplying "
"--input_model_tgz")
# Untar.
if not os.path.exists(FLAGS.untar_model_dir):
os.makedirs(FLAGS.untar_model_dir)
tarfile.open(FLAGS.input_model_tgz).extractall(FLAGS.untar_model_dir)
FLAGS.train_dir = FLAGS.untar_model_dir
flags_dict_file = os.path.join(FLAGS.train_dir, "model_flags.json")
if not file_io.file_exists(flags_dict_file):
raise IOError("Cannot find %s. Did you run eval.py?" % flags_dict_file)
flags_dict = json.loads(file_io.FileIO(flags_dict_file, "r").read())
# convert feature_names and feature_sizes to lists of values
feature_names, feature_sizes = utils.GetListOfFeatureNamesAndSizes(
flags_dict["feature_names"], flags_dict["feature_sizes"])
if flags_dict["frame_features"]:
reader = readers.YT8MFrameFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
else:
reader = readers.YT8MAggregatedFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
if not FLAGS.output_file:
raise ValueError("'output_file' was not specified. "
"Unable to continue with inference.")
if not FLAGS.input_data_pattern:
raise ValueError("'input_data_pattern' was not specified. "
"Unable to continue with inference.")
inference(reader, FLAGS.train_dir, FLAGS.input_data_pattern,
FLAGS.output_file, FLAGS.batch_size, FLAGS.top_k)
if __name__ == "__main__":
app.run()
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides definitions for non-regularized training or test losses."""
import tensorflow as tf
class BaseLoss(object):
"""Inherit from this class when implementing new losses."""
def calculate_loss(self, unused_predictions, unused_labels, **unused_params):
"""Calculates the average loss of the examples in a mini-batch.
Args:
unused_predictions: a 2-d tensor storing the prediction scores, in which
each row represents a sample in the mini-batch and each column
represents a class.
unused_labels: a 2-d tensor storing the labels, which has the same shape
as the unused_predictions. The labels must be in the range of 0 and 1.
unused_params: loss specific parameters.
Returns:
A scalar loss tensor.
"""
raise NotImplementedError()
class CrossEntropyLoss(BaseLoss):
"""Calculate the cross entropy loss between the predictions and labels."""
def calculate_loss(self,
predictions,
labels,
label_weights=None,
**unused_params):
with tf.name_scope("loss_xent"):
epsilon = 1e-5
float_labels = tf.cast(labels, tf.float32)
cross_entropy_loss = float_labels * tf.math.log(predictions + epsilon) + (
1 - float_labels) * tf.math.log(1 - predictions + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss)
if label_weights is not None:
cross_entropy_loss *= label_weights
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
class HingeLoss(BaseLoss):
"""Calculate the hinge loss between the predictions and labels.
Note the subgradient is used in the backpropagation, and thus the optimization
may converge slower. The predictions trained by the hinge loss are between -1
and +1.
"""
def calculate_loss(self, predictions, labels, b=1.0, **unused_params):
with tf.name_scope("loss_hinge"):
float_labels = tf.cast(labels, tf.float32)
all_zeros = tf.zeros(tf.shape(float_labels), dtype=tf.float32)
all_ones = tf.ones(tf.shape(float_labels), dtype=tf.float32)
sign_labels = tf.subtract(tf.scalar_mul(2, float_labels), all_ones)
hinge_loss = tf.maximum(
all_zeros,
tf.scalar_mul(b, all_ones) - sign_labels * predictions)
return tf.reduce_mean(tf.reduce_sum(hinge_loss, 1))
class SoftmaxLoss(BaseLoss):
"""Calculate the softmax loss between the predictions and labels.
The function calculates the loss in the following way: first we feed the
predictions to the softmax activation function and then we calculate
the minus linear dot product between the logged softmax activations and the
normalized ground truth label.
It is an extension to the one-hot label. It allows for more than one positive
labels for each sample.
"""
def calculate_loss(self, predictions, labels, **unused_params):
with tf.name_scope("loss_softmax"):
epsilon = 10e-8
float_labels = tf.cast(labels, tf.float32)
# l1 normalization (labels are no less than 0)
label_rowsum = tf.maximum(tf.reduce_sum(float_labels, 1, keep_dims=True),
epsilon)
norm_float_labels = tf.div(float_labels, label_rowsum)
softmax_outputs = tf.nn.softmax(predictions)
softmax_loss = tf.negative(
tf.reduce_sum(tf.multiply(norm_float_labels, tf.log(softmax_outputs)),
1))
return tf.reduce_mean(softmax_loss)
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calculate the mean average precision.
It provides an interface for calculating mean average precision
for an entire list or the top-n ranked items.
Example usages:
We first call the function accumulate many times to process parts of the ranked
list. After processing all the parts, we call peek_map_at_n
to calculate the mean average precision.
```
import random
p = np.array([[random.random() for _ in xrange(50)] for _ in xrange(1000)])
a = np.array([[random.choice([0, 1]) for _ in xrange(50)]
for _ in xrange(1000)])
# mean average precision for 50 classes.
calculator = mean_average_precision_calculator.MeanAveragePrecisionCalculator(
num_class=50)
calculator.accumulate(p, a)
aps = calculator.peek_map_at_n()
```
"""
import average_precision_calculator
class MeanAveragePrecisionCalculator(object):
"""This class is to calculate mean average precision."""
def __init__(self, num_class, filter_empty_classes=True, top_n=None):
"""Construct a calculator to calculate the (macro) average precision.
Args:
num_class: A positive Integer specifying the number of classes.
filter_empty_classes: whether to filter classes without any positives.
top_n: A positive Integer specifying the average precision at n, or None
to use all provided data points.
Raises:
ValueError: An error occurred when num_class is not a positive integer;
or the top_n_array is not a list of positive integers.
"""
if not isinstance(num_class, int) or num_class <= 1:
raise ValueError("num_class must be a positive integer.")
self._ap_calculators = [] # member of AveragePrecisionCalculator
self._num_class = num_class # total number of classes
self._filter_empty_classes = filter_empty_classes
for _ in range(num_class):
self._ap_calculators.append(
average_precision_calculator.AveragePrecisionCalculator(top_n=top_n))
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
Args:
predictions: A list of lists storing the prediction scores. The outer
dimension corresponds to classes.
actuals: A list of lists storing the ground truth labels. The dimensions
should correspond to the predictions input. Any value larger than 0 will
be treated as positives, otherwise as negatives.
num_positives: If provided, it is a list of numbers representing the
number of true positives for each class. If not provided, the number of
true positives will be inferred from the 'actuals' array.
Raises:
ValueError: An error occurred when the shape of predictions and actuals
does not match.
"""
if not num_positives:
num_positives = [None for i in range(self._num_class)]
calculators = self._ap_calculators
for i in range(self._num_class):
calculators[i].accumulate(predictions[i], actuals[i], num_positives[i])
def clear(self):
for calculator in self._ap_calculators:
calculator.clear()
def is_empty(self):
return ([calculator.heap_size for calculator in self._ap_calculators
] == [0 for _ in range(self._num_class)])
def peek_map_at_n(self):
"""Peek the non-interpolated mean average precision at n.
Returns:
An array of non-interpolated average precision at n (default 0) for each
class.
"""
aps = []
for i in range(self._num_class):
if (not self._filter_empty_classes or
self._ap_calculators[i].num_accumulated_positives > 0):
ap = self._ap_calculators[i].peek_ap_at_n()
aps.append(ap)
return aps
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a collection of util functions for model construction."""
import numpy
import tensorflow as tf
from tensorflow import logging
from tensorflow import flags
import tensorflow.contrib.slim as slim
def SampleRandomSequence(model_input, num_frames, num_samples):
"""Samples a random sequence of frames of size num_samples.
Args:
model_input: A tensor of size batch_size x max_frames x feature_size
num_frames: A tensor of size batch_size x 1
num_samples: A scalar
Returns:
`model_input`: A tensor of size batch_size x num_samples x feature_size
"""
batch_size = tf.shape(model_input)[0]
frame_index_offset = tf.tile(tf.expand_dims(tf.range(num_samples), 0),
[batch_size, 1])
max_start_frame_index = tf.maximum(num_frames - num_samples, 0)
start_frame_index = tf.cast(
tf.multiply(tf.random_uniform([batch_size, 1]),
tf.cast(max_start_frame_index + 1, tf.float32)), tf.int32)
frame_index = tf.minimum(start_frame_index + frame_index_offset,
tf.cast(num_frames - 1, tf.int32))
batch_index = tf.tile(tf.expand_dims(tf.range(batch_size), 1),
[1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def SampleRandomFrames(model_input, num_frames, num_samples):
"""Samples a random set of frames of size num_samples.
Args:
model_input: A tensor of size batch_size x max_frames x feature_size
num_frames: A tensor of size batch_size x 1
num_samples: A scalar
Returns:
`model_input`: A tensor of size batch_size x num_samples x feature_size
"""
batch_size = tf.shape(model_input)[0]
frame_index = tf.cast(
tf.multiply(tf.random_uniform([batch_size, num_samples]),
tf.tile(tf.cast(num_frames, tf.float32), [1, num_samples])),
tf.int32)
batch_index = tf.tile(tf.expand_dims(tf.range(batch_size), 1),
[1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def FramePooling(frames, method, **unused_params):
"""Pools over the frames of a video.
Args:
frames: A tensor with shape [batch_size, num_frames, feature_size].
method: "average", "max", "attention", or "none".
Returns:
A tensor with shape [batch_size, feature_size] for average, max, or
attention pooling. A tensor with shape [batch_size*num_frames, feature_size]
for none pooling.
Raises:
ValueError: if method is other than "average", "max", "attention", or
"none".
"""
if method == "average":
return tf.reduce_mean(frames, 1)
elif method == "max":
return tf.reduce_max(frames, 1)
elif method == "none":
feature_size = frames.shape_as_list()[2]
return tf.reshape(frames, [-1, feature_size])
else:
raise ValueError("Unrecognized pooling method: %s" % method)
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains the base class for models."""
class BaseModel(object):
"""Inherit from this class when implementing new models."""
def create_model(self, unused_model_input, **unused_params):
raise NotImplementedError()
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides readers configured for different datasets."""
import tensorflow as tf
import utils
def resize_axis(tensor, axis, new_size, fill_value=0):
"""Truncates or pads a tensor to new_size on on a given axis.
Truncate or extend tensor such that tensor.shape[axis] == new_size. If the
size increases, the padding will be performed at the end, using fill_value.
Args:
tensor: The tensor to be resized.
axis: An integer representing the dimension to be sliced.
new_size: An integer or 0d tensor representing the new value for
tensor.shape[axis].
fill_value: Value to use to fill any new entries in the tensor. Will be cast
to the type of tensor.
Returns:
The resized tensor.
"""
tensor = tf.convert_to_tensor(tensor)
shape = tf.unstack(tf.shape(tensor))
pad_shape = shape[:]
pad_shape[axis] = tf.maximum(0, new_size - shape[axis])
shape[axis] = tf.minimum(shape[axis], new_size)
shape = tf.stack(shape)
resized = tf.concat([
tf.slice(tensor, tf.zeros_like(shape), shape),
tf.fill(tf.stack(pad_shape), tf.cast(fill_value, tensor.dtype))
], axis)
# Update shape.
new_shape = tensor.get_shape().as_list() # A copy is being made.
new_shape[axis] = new_size
resized.set_shape(new_shape)
return resized
class BaseReader(object):
"""Inherit from this class when implementing new readers."""
def prepare_reader(self, unused_filename_queue):
"""Create a thread for generating prediction and label tensors."""
raise NotImplementedError()
class YT8MAggregatedFeatureReader(BaseReader):
"""Reads TFRecords of pre-aggregated Examples.
The TFRecords must contain Examples with a sparse int64 'labels' feature and
a fixed length float32 feature, obtained from the features in 'feature_name'.
The float features are assumed to be an average of dequantized values.
"""
def __init__( # pylint: disable=dangerous-default-value
self,
num_classes=3862,
feature_sizes=[1024, 128],
feature_names=["mean_rgb", "mean_audio"]):
"""Construct a YT8MAggregatedFeatureReader.
Args:
num_classes: a positive integer for the number of classes.
feature_sizes: positive integer(s) for the feature dimensions as a list.
feature_names: the feature name(s) in the tensorflow record as a list.
"""
assert len(feature_names) == len(feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(feature_names), len(feature_sizes)))
self.num_classes = num_classes
self.feature_sizes = feature_sizes
self.feature_names = feature_names
def prepare_reader(self, filename_queue, batch_size=1024):
"""Creates a single reader thread for pre-aggregated YouTube 8M Examples.
Args:
filename_queue: A tensorflow queue of filename locations.
batch_size: batch size used for feature output.
Returns:
A dict of video indexes, features, labels, and frame counts.
"""
reader = tf.TFRecordReader()
_, serialized_examples = reader.read_up_to(filename_queue, batch_size)
tf.add_to_collection("serialized_examples", serialized_examples)
return self.prepare_serialized_examples(serialized_examples)
def prepare_serialized_examples(self, serialized_examples):
"""Parse a single video-level TF Example."""
# set the mapping from the fields to data types in the proto
num_features = len(self.feature_names)
assert num_features > 0, "self.feature_names is empty!"
assert len(self.feature_names) == len(self.feature_sizes), \
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(self.feature_names), len(self.feature_sizes))
feature_map = {
"id": tf.io.FixedLenFeature([], tf.string),
"labels": tf.io.VarLenFeature(tf.int64)
}
for feature_index in range(num_features):
feature_map[self.feature_names[feature_index]] = tf.FixedLenFeature(
[self.feature_sizes[feature_index]], tf.float32)
features = tf.parse_example(serialized_examples, features=feature_map)
labels = tf.sparse_to_indicator(features["labels"], self.num_classes)
labels.set_shape([None, self.num_classes])
concatenated_features = tf.concat(
[features[feature_name] for feature_name in self.feature_names], 1)
output_dict = {
"video_ids": features["id"],
"video_matrix": concatenated_features,
"labels": labels,
"num_frames": tf.ones([tf.shape(serialized_examples)[0]])
}
return output_dict
class YT8MFrameFeatureReader(BaseReader):
"""Reads TFRecords of SequenceExamples.
The TFRecords must contain SequenceExamples with the sparse in64 'labels'
context feature and a fixed length byte-quantized feature vector, obtained
from the features in 'feature_names'. The quantized features will be mapped
back into a range between min_quantized_value and max_quantized_value.
"""
def __init__( # pylint: disable=dangerous-default-value
self,
num_classes=3862,
feature_sizes=[1024, 128],
feature_names=["rgb", "audio"],
max_frames=300,
segment_labels=False,
segment_size=5):
"""Construct a YT8MFrameFeatureReader.
Args:
num_classes: a positive integer for the number of classes.
feature_sizes: positive integer(s) for the feature dimensions as a list.
feature_names: the feature name(s) in the tensorflow record as a list.
max_frames: the maximum number of frames to process.
segment_labels: if we read segment labels instead.
segment_size: the segment_size used for reading segments.
"""
assert len(feature_names) == len(feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(feature_names), len(feature_sizes)))
self.num_classes = num_classes
self.feature_sizes = feature_sizes
self.feature_names = feature_names
self.max_frames = max_frames
self.segment_labels = segment_labels
self.segment_size = segment_size
def get_video_matrix(self, features, feature_size, max_frames,
max_quantized_value, min_quantized_value):
"""Decodes features from an input string and quantizes it.
Args:
features: raw feature values
feature_size: length of each frame feature vector
max_frames: number of frames (rows) in the output feature_matrix
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
feature_matrix: matrix of all frame-features
num_frames: number of frames in the sequence
"""
decoded_features = tf.reshape(
tf.cast(tf.decode_raw(features, tf.uint8), tf.float32),
[-1, feature_size])
num_frames = tf.minimum(tf.shape(decoded_features)[0], max_frames)
feature_matrix = utils.Dequantize(decoded_features, max_quantized_value,
min_quantized_value)
feature_matrix = resize_axis(feature_matrix, 0, max_frames)
return feature_matrix, num_frames
def prepare_reader(self,
filename_queue,
max_quantized_value=2,
min_quantized_value=-2):
"""Creates a single reader thread for YouTube8M SequenceExamples.
Args:
filename_queue: A tensorflow queue of filename locations.
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
A dict of video indexes, video features, labels, and frame counts.
"""
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
return self.prepare_serialized_examples(serialized_example,
max_quantized_value,
min_quantized_value)
def prepare_serialized_examples(self,
serialized_example,
max_quantized_value=2,
min_quantized_value=-2):
"""Parse single serialized SequenceExample from the TFRecords."""
# Read/parse frame/segment-level labels.
context_features = {
"id": tf.io.FixedLenFeature([], tf.string),
}
if self.segment_labels:
context_features.update({
# There is no need to read end-time given we always assume the segment
# has the same size.
"segment_labels": tf.io.VarLenFeature(tf.int64),
"segment_start_times": tf.io.VarLenFeature(tf.int64),
"segment_scores": tf.io.VarLenFeature(tf.float32)
})
else:
context_features.update({"labels": tf.io.VarLenFeature(tf.int64)})
sequence_features = {
feature_name: tf.io.FixedLenSequenceFeature([], dtype=tf.string)
for feature_name in self.feature_names
}
contexts, features = tf.io.parse_single_sequence_example(
serialized_example,
context_features=context_features,
sequence_features=sequence_features)
# loads (potentially) different types of features and concatenates them
num_features = len(self.feature_names)
assert num_features > 0, "No feature selected: feature_names is empty!"
assert len(self.feature_names) == len(self.feature_sizes), (
"length of feature_names (={}) != length of feature_sizes (={})".format(
len(self.feature_names), len(self.feature_sizes)))
num_frames = -1 # the number of frames in the video
feature_matrices = [None] * num_features # an array of different features
for feature_index in range(num_features):
feature_matrix, num_frames_in_this_feature = self.get_video_matrix(
features[self.feature_names[feature_index]],
self.feature_sizes[feature_index], self.max_frames,
max_quantized_value, min_quantized_value)
if num_frames == -1:
num_frames = num_frames_in_this_feature
feature_matrices[feature_index] = feature_matrix
# cap the number of frames at self.max_frames
num_frames = tf.minimum(num_frames, self.max_frames)
# concatenate different features
video_matrix = tf.concat(feature_matrices, 1)
# Partition frame-level feature matrix to segment-level feature matrix.
if self.segment_labels:
start_times = contexts["segment_start_times"].values
# Here we assume all the segments that started at the same start time has
# the same segment_size.
uniq_start_times, seg_idxs = tf.unique(start_times,
out_idx=tf.dtypes.int64)
# TODO(zhengxu): Ensure the segment_sizes are all same.
segment_size = self.segment_size
# Range gather matrix, e.g., [[0,1,2],[1,2,3]] for segment_size == 3.
range_mtx = tf.expand_dims(uniq_start_times, axis=-1) + tf.expand_dims(
tf.range(0, segment_size, dtype=tf.int64), axis=0)
# Shape: [num_segment, segment_size, feature_dim].
batch_video_matrix = tf.gather_nd(video_matrix,
tf.expand_dims(range_mtx, axis=-1))
num_segment = tf.shape(batch_video_matrix)[0]
batch_video_ids = tf.reshape(tf.tile([contexts["id"]], [num_segment]),
(num_segment,))
batch_frames = tf.reshape(tf.tile([segment_size], [num_segment]),
(num_segment,))
# For segment labels, all labels are not exhausively rated. So we only
# evaluate the rated labels.
# Label indices for each segment, shape: [num_segment, 2].
label_indices = tf.stack([seg_idxs, contexts["segment_labels"].values],
axis=-1)
label_values = contexts["segment_scores"].values
sparse_labels = tf.sparse.SparseTensor(label_indices, label_values,
(num_segment, self.num_classes))
batch_labels = tf.sparse.to_dense(sparse_labels, validate_indices=False)
sparse_label_weights = tf.sparse.SparseTensor(
label_indices, tf.ones_like(label_values, dtype=tf.float32),
(num_segment, self.num_classes))
batch_label_weights = tf.sparse.to_dense(sparse_label_weights,
validate_indices=False)
else:
# Process video-level labels.
label_indices = contexts["labels"].values
sparse_labels = tf.sparse.SparseTensor(
tf.expand_dims(label_indices, axis=-1),
tf.ones_like(contexts["labels"].values, dtype=tf.bool),
(self.num_classes,))
labels = tf.sparse.to_dense(sparse_labels,
default_value=False,
validate_indices=False)
# convert to batch format.
batch_video_ids = tf.expand_dims(contexts["id"], 0)
batch_video_matrix = tf.expand_dims(video_matrix, 0)
batch_labels = tf.expand_dims(labels, 0)
batch_frames = tf.expand_dims(num_frames, 0)
batch_label_weights = None
output_dict = {
"video_ids": batch_video_ids,
"video_matrix": batch_video_matrix,
"labels": batch_labels,
"num_frames": batch_frames,
}
if batch_label_weights is not None:
output_dict["label_weights"] = batch_label_weights
return output_dict
"""Eval mAP@N metric from inference file."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import app
from absl import flags
import mean_average_precision_calculator as map_calculator
import numpy as np
import tensorflow as tf
flags.DEFINE_string(
"eval_data_pattern", "",
"File glob defining the evaluation dataset in tensorflow.SequenceExample "
"format. The SequenceExamples are expected to have an 'rgb' byte array "
"sequence feature as well as a 'labels' int64 context feature.")
flags.DEFINE_string(
"label_cache", "",
"The path for the label cache file. Leave blank for not to cache.")
flags.DEFINE_string("submission_file", "",
"The segment submission file generated by inference.py.")
flags.DEFINE_integer(
"top_n", 0,
"The cap per-class predictions by a maximum of N. Use 0 for not capping.")
FLAGS = flags.FLAGS
class Labels(object):
"""Contains the class to hold label objects.
This class can serialize and de-serialize the groundtruths.
The ground truth is in a mapping from (segment_id, class_id) -> label_score.
"""
def __init__(self, labels):
"""__init__ method."""
self._labels = labels
@property
def labels(self):
"""Return the ground truth mapping. See class docstring for details."""
return self._labels
def to_file(self, file_name):
"""Materialize the GT mapping to file."""
with tf.gfile.Open(file_name, "w") as fobj:
for k, v in self._labels.items():
seg_id, label = k
line = "%s,%s,%s\n" % (seg_id, label, v)
fobj.write(line)
@classmethod
def from_file(cls, file_name):
"""Read the GT mapping from cached file."""
labels = {}
with tf.gfile.Open(file_name) as fobj:
for line in fobj:
line = line.strip().strip("\n")
seg_id, label, score = line.split(",")
labels[(seg_id, int(label))] = float(score)
return cls(labels)
def read_labels(data_pattern, cache_path=""):
"""Read labels from TFRecords.
Args:
data_pattern: the data pattern to the TFRecords.
cache_path: the cache path for the label file.
Returns:
a Labels object.
"""
if cache_path:
if tf.gfile.Exists(cache_path):
tf.logging.info("Reading cached labels from %s..." % cache_path)
return Labels.from_file(cache_path)
tf.enable_eager_execution()
data_paths = tf.gfile.Glob(data_pattern)
ds = tf.data.TFRecordDataset(data_paths, num_parallel_reads=50)
context_features = {
"id": tf.FixedLenFeature([], tf.string),
"segment_labels": tf.VarLenFeature(tf.int64),
"segment_start_times": tf.VarLenFeature(tf.int64),
"segment_scores": tf.VarLenFeature(tf.float32)
}
def _parse_se_func(sequence_example):
return tf.parse_single_sequence_example(sequence_example,
context_features=context_features)
ds = ds.map(_parse_se_func)
rated_labels = {}
tf.logging.info("Reading labels from TFRecords...")
last_batch = 0
batch_size = 5000
for cxt_feature_val, _ in ds:
video_id = cxt_feature_val["id"].numpy()
segment_labels = cxt_feature_val["segment_labels"].values.numpy()
segment_start_times = cxt_feature_val["segment_start_times"].values.numpy()
segment_scores = cxt_feature_val["segment_scores"].values.numpy()
for label, start_time, score in zip(segment_labels, segment_start_times,
segment_scores):
rated_labels[("%s:%d" % (video_id, start_time), label)] = score
batch_id = len(rated_labels) // batch_size
if batch_id != last_batch:
tf.logging.info("%d examples processed.", len(rated_labels))
last_batch = batch_id
tf.logging.info("Finish reading labels from TFRecords...")
labels_obj = Labels(rated_labels)
if cache_path:
tf.logging.info("Caching labels to %s..." % cache_path)
labels_obj.to_file(cache_path)
return labels_obj
def read_segment_predictions(file_path, labels, top_n=None):
"""Read segement predictions.
Args:
file_path: the submission file path.
labels: a Labels object containing the eval labels.
top_n: the per-class class capping.
Returns:
a segment prediction list for each classes.
"""
cls_preds = {} # A label_id to pred list mapping.
with tf.gfile.Open(file_path) as fobj:
tf.logging.info("Reading predictions from %s..." % file_path)
for line in fobj:
label_id, pred_ids_val = line.split(",")
pred_ids = pred_ids_val.split(" ")
if top_n:
pred_ids = pred_ids[:top_n]
pred_ids = [
pred_id for pred_id in pred_ids
if (pred_id, int(label_id)) in labels.labels
]
cls_preds[int(label_id)] = pred_ids
if len(cls_preds) % 50 == 0:
tf.logging.info("Processed %d classes..." % len(cls_preds))
tf.logging.info("Finish reading predictions.")
return cls_preds
def main(unused_argv):
"""Entry function of the script."""
if not FLAGS.submission_file:
raise ValueError("You must input submission file.")
eval_labels = read_labels(FLAGS.eval_data_pattern,
cache_path=FLAGS.label_cache)
tf.logging.info("Total rated segments: %d." % len(eval_labels.labels))
positive_counter = {}
for k, v in eval_labels.labels.items():
_, label_id = k
if v > 0:
positive_counter[label_id] = positive_counter.get(label_id, 0) + 1
seg_preds = read_segment_predictions(FLAGS.submission_file,
eval_labels,
top_n=FLAGS.top_n)
map_cal = map_calculator.MeanAveragePrecisionCalculator(len(seg_preds))
seg_labels = []
seg_scored_preds = []
num_positives = []
for label_id in sorted(seg_preds):
class_preds = seg_preds[label_id]
seg_label = [eval_labels.labels[(pred, label_id)] for pred in class_preds]
seg_labels.append(seg_label)
seg_scored_pred = []
if class_preds:
seg_scored_pred = [
float(x) / len(class_preds) for x in range(len(class_preds), 0, -1)
]
seg_scored_preds.append(seg_scored_pred)
num_positives.append(positive_counter[label_id])
map_cal.accumulate(seg_scored_preds, seg_labels, num_positives)
map_at_n = np.mean(map_cal.peek_map_at_n())
tf.logging.info("Num classes: %d | mAP@%d: %.6f" %
(len(seg_preds), FLAGS.top_n, map_at_n))
if __name__ == "__main__":
app.run(main)
Index
3
7
8
11
12
17
18
19
21
22
23
28
31
30
32
33
34
41
43
45
46
48
53
54
52
55
58
59
60
61
65
68
73
71
74
75
76
77
80
83
90
88
89
92
95
100
101
99
104
105
109
113
112
115
116
118
120
121
123
125
127
131
128
129
130
137
141
143
145
148
152
151
156
155
158
160
164
163
169
170
172
171
173
174
175
176
178
182
184
186
188
187
192
191
190
194
197
196
198
201
202
200
199
205
204
209
207
206
210
213
214
220
218
217
226
227
231
232
229
233
235
237
244
240
249
246
248
239
250
245
255
253
256
261
259
263
262
266
267
268
269
271
276
273
277
274
278
279
280
288
291
295
294
293
297
296
300
299
303
302
304
305
313
307
311
310
312
316
318
321
322
331
333
329
330
334
343
349
340
344
348
358
347
359
355
361
360
364
365
368
369
366
370
374
380
373
385
384
388
389
382
393
381
390
394
399
397
396
402
400
398
401
405
406
410
408
416
415
419
422
414
421
424
429
418
427
434
428
435
430
441
439
437
443
440
442
445
446
448
454
444
453
455
451
452
458
460
465
457
463
462
461
464
469
468
472
473
471
475
474
477
485
491
488
482
490
496
494
483
495
493
507
501
499
503
498
514
504
502
506
508
511
527
526
532
513
519
525
518
528
522
523
535
539
540
533
521
541
547
550
544
549
551
554
543
548
557
560
552
559
563
565
567
555
576
568
564
573
581
580
572
571
584
590
585
587
588
592
598
597
599
603
600
604
605
614
602
610
608
611
612
613
617
620
607
624
627
625
631
629
638
632
634
644
641
642
646
652
647
637
661
635
658
648
663
668
664
656
666
671
683
675
669
676
667
691
685
673
688
702
684
679
694
686
689
680
693
703
697
698
692
705
706
712
711
709
710
726
713
721
720
715
717
730
728
723
716
722
718
732
724
736
725
742
727
735
740
748
738
746
751
749
752
754
760
763
756
758
766
764
757
780
767
769
771
786
785
781
787
778
783
792
791
795
788
805
802
801
793
796
804
803
797
814
813
789
808
818
816
817
811
820
826
829
824
821
825
822
835
833
843
823
827
830
832
837
852
844
841
812
847
862
869
860
838
870
846
858
854
880
876
857
859
877
871
855
875
861
867
892
898
888
884
887
891
906
900
878
885
883
901
903
907
930
897
914
917
910
905
909
933
932
922
913
923
931
911
937
918
955
915
944
952
945
948
946
970
974
958
925
979
942
965
975
950
982
940
973
962
972
957
984
983
964
1007
971
981
954
993
991
996
1005
1015
1009
995
986
1000
985
980
1016
1011
999
1002
994
1013
1010
992
1008
1036
1025
1012
990
1037
1040
1031
1019
1052
1001
1055
1032
1069
1058
1014
1023
1030
1061
1035
1034
1053
1045
1046
1067
1060
1049
1056
1074
1066
1044
1038
1073
1077
1068
1057
1072
1104
1083
1089
1087
1099
1076
1086
1098
1094
1095
1096
1101
1107
1105
1117
1093
1106
1122
1119
1103
1128
1120
1126
1102
1115
1124
1123
1131
1136
1144
1121
1137
1132
1133
1157
1134
1143
1159
1164
1155
1142
1150
1148
1161
1165
1147
1162
1152
1174
1160
1166
1190
1175
1167
1156
1180
1171
1179
1172
1186
1188
1201
1177
1208
1183
1189
1192
1209
1214
1197
1168
1202
1205
1203
1199
1219
1217
1187
1206
1210
1241
1221
1218
1223
1236
1212
1237
1195
1216
1247
1234
1240
1257
1224
1243
1259
1242
1282
1222
1254
1227
1235
1269
1258
1290
1275
1262
1252
1248
1272
1246
1225
1245
1277
1298
1288
1271
1265
1286
1260
1266
1296
1280
1285
1293
1276
1287
1289
1261
1264
1295
1291
1283
1311
1303
1330
1315
1300
1333
1307
1325
1334
1316
1314
1317
1310
1329
1324
1339
1346
1342
1352
1321
1376
1366
1308
1345
1348
1386
1383
1372
1367
1400
1382
1375
1392
1380
1371
1393
1389
1353
1387
1374
1379
1381
1359
1360
1396
1399
1365
1424
1373
1411
1401
1397
1395
1412
1394
1368
1423
1391
1435
1409
1443
1402
1425
1415
1421
1426
1433
1420
1452
1436
1430
1408
1458
1429
1453
1454
1447
1472
1486
1468
1461
1467
1484
1457
1444
1450
1451
1459
1462
1449
1476
1470
1471
1498
1488
1442
1480
1456
1466
1505
1517
1464
1503
1490
1519
1481
1493
1463
1532
1487
1501
1500
1495
1509
1535
1506
1521
1580
1540
1502
1520
1496
1569
1515
1489
1507
1527
1545
1560
1510
1514
1526
1594
1511
1572
1548
1584
1556
1588
1628
1555
1568
1550
1622
1563
1603
1616
1576
1549
1537
1593
1618
1645
1624
1617
1634
1595
1597
1590
1632
1575
1559
1625
1615
1591
1630
1608
1621
1589
1646
1643
1652
1627
1611
1626
1613
1639
1655
1620
1602
1651
1653
1669
1638
1696
1649
1675
1660
1683
1666
1671
1703
1716
1637
1672
1676
1692
1711
1680
1641
1688
1708
1704
1690
1674
1718
1699
1723
1756
1700
1662
1715
1657
1733
1728
1670
1712
1685
1724
1735
1714
1730
1747
1656
1737
1705
1693
1713
1689
1753
1739
1721
1725
1749
1732
1743
1731
1767
1738
1831
1771
1726
1746
1776
1775
1799
1774
1780
1781
1769
1805
1788
1801
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Binary for training Tensorflow models on the YouTube-8M dataset."""
import json
import os
import time
import eval_util
import export_model
import losses
import frame_level_models
import video_level_models
import readers
import tensorflow as tf
import tensorflow.contrib.slim as slim
from tensorflow.python.lib.io import file_io
from tensorflow import app
from tensorflow import flags
from tensorflow import gfile
from tensorflow import logging
from tensorflow.python.client import device_lib
import utils
FLAGS = flags.FLAGS
if __name__ == "__main__":
# Dataset flags.
flags.DEFINE_string("train_dir", "/tmp/yt8m_model/",
"The directory to save the model files in.")
flags.DEFINE_string(
"train_data_pattern", "",
"File glob for the training dataset. If the files refer to Frame Level "
"features (i.e. tensorflow.SequenceExample), then set --reader_type "
"format. The (Sequence)Examples are expected to have 'rgb' byte array "
"sequence feature as well as a 'labels' int64 context feature.")
flags.DEFINE_string("feature_names", "mean_rgb", "Name of the feature "
"to use for training.")
flags.DEFINE_string("feature_sizes", "1024", "Length of the feature vectors.")
# Model flags.
flags.DEFINE_bool(
"frame_features", False,
"If set, then --train_data_pattern must be frame-level features. "
"Otherwise, --train_data_pattern must be aggregated video-level "
"features. The model must also be set appropriately (i.e. to read 3D "
"batches VS 4D batches.")
flags.DEFINE_bool(
"segment_labels", False,
"If set, then --train_data_pattern must be frame-level features (but with"
" segment_labels). Otherwise, --train_data_pattern must be aggregated "
"video-level features. The model must also be set appropriately (i.e. to "
"read 3D batches VS 4D batches.")
flags.DEFINE_string(
"model", "LogisticModel",
"Which architecture to use for the model. Models are defined "
"in models.py.")
flags.DEFINE_bool(
"start_new_model", False,
"If set, this will not resume from a checkpoint and will instead create a"
" new model instance.")
# Training flags.
flags.DEFINE_integer(
"num_gpu", 1, "The maximum number of GPU devices to use for training. "
"Flag only applies if GPUs are installed")
flags.DEFINE_integer("batch_size", 1024,
"How many examples to process per batch for training.")
flags.DEFINE_string("label_loss", "CrossEntropyLoss",
"Which loss function to use for training the model.")
flags.DEFINE_float(
"regularization_penalty", 1.0,
"How much weight to give to the regularization loss (the label loss has "
"a weight of 1).")
flags.DEFINE_float("base_learning_rate", 0.01,
"Which learning rate to start with.")
flags.DEFINE_float(
"learning_rate_decay", 0.95,
"Learning rate decay factor to be applied every "
"learning_rate_decay_examples.")
flags.DEFINE_float(
"learning_rate_decay_examples", 4000000,
"Multiply current learning rate by learning_rate_decay "
"every learning_rate_decay_examples.")
flags.DEFINE_integer(
"num_epochs", 5, "How many passes to make over the dataset before "
"halting training.")
flags.DEFINE_integer(
"max_steps", None,
"The maximum number of iterations of the training loop.")
flags.DEFINE_integer(
"export_model_steps", 1000,
"The period, in number of steps, with which the model "
"is exported for batch prediction.")
# Other flags.
flags.DEFINE_integer("num_readers", 8,
"How many threads to use for reading input files.")
flags.DEFINE_string("optimizer", "AdamOptimizer",
"What optimizer class to use.")
flags.DEFINE_float("clip_gradient_norm", 1.0, "Norm to clip gradients to.")
flags.DEFINE_bool(
"log_device_placement", False,
"Whether to write the device on which every op will run into the "
"logs on startup.")
def validate_class_name(flag_value, category, modules, expected_superclass):
"""Checks that the given string matches a class of the expected type.
Args:
flag_value: A string naming the class to instantiate.
category: A string used further describe the class in error messages (e.g.
'model', 'reader', 'loss').
modules: A list of modules to search for the given class.
expected_superclass: A class that the given class should inherit from.
Raises:
FlagsError: If the given class could not be found or if the first class
found with that name doesn't inherit from the expected superclass.
Returns:
True if a class was found that matches the given constraints.
"""
candidates = [getattr(module, flag_value, None) for module in modules]
for candidate in candidates:
if not candidate:
continue
if not issubclass(candidate, expected_superclass):
raise flags.FlagsError(
"%s '%s' doesn't inherit from %s." %
(category, flag_value, expected_superclass.__name__))
return True
raise flags.FlagsError("Unable to find %s '%s'." % (category, flag_value))
def get_input_data_tensors(reader,
data_pattern,
batch_size=1000,
num_epochs=None,
num_readers=1):
"""Creates the section of the graph which reads the training data.
Args:
reader: A class which parses the training data.
data_pattern: A 'glob' style path to the data files.
batch_size: How many examples to process at a time.
num_epochs: How many passes to make over the training data. Set to 'None' to
run indefinitely.
num_readers: How many I/O threads to use.
Returns:
A tuple containing the features tensor, labels tensor, and optionally a
tensor containing the number of frames per video. The exact dimensions
depend on the reader being used.
Raises:
IOError: If no files matching the given pattern were found.
"""
logging.info("Using batch size of " + str(batch_size) + " for training.")
with tf.name_scope("train_input"):
files = gfile.Glob(data_pattern)
if not files:
raise IOError("Unable to find training files. data_pattern='" +
data_pattern + "'.")
logging.info("Number of training files: %s.", str(len(files)))
filename_queue = tf.train.string_input_producer(files,
num_epochs=num_epochs,
shuffle=True)
training_data = [
reader.prepare_reader(filename_queue) for _ in range(num_readers)
]
return tf.train.shuffle_batch_join(training_data,
batch_size=batch_size,
capacity=batch_size * 5,
min_after_dequeue=batch_size,
allow_smaller_final_batch=True,
enqueue_many=True)
def find_class_by_name(name, modules):
"""Searches the provided modules for the named class and returns it."""
modules = [getattr(module, name, None) for module in modules]
return next(a for a in modules if a)
def build_graph(reader,
model,
train_data_pattern,
label_loss_fn=losses.CrossEntropyLoss(),
batch_size=1000,
base_learning_rate=0.01,
learning_rate_decay_examples=1000000,
learning_rate_decay=0.95,
optimizer_class=tf.train.AdamOptimizer,
clip_gradient_norm=1.0,
regularization_penalty=1,
num_readers=1,
num_epochs=None):
"""Creates the Tensorflow graph.
This will only be called once in the life of
a training model, because after the graph is created the model will be
restored from a meta graph file rather than being recreated.
Args:
reader: The data file reader. It should inherit from BaseReader.
model: The core model (e.g. logistic or neural net). It should inherit from
BaseModel.
train_data_pattern: glob path to the training data files.
label_loss_fn: What kind of loss to apply to the model. It should inherit
from BaseLoss.
batch_size: How many examples to process at a time.
base_learning_rate: What learning rate to initialize the optimizer with.
optimizer_class: Which optimization algorithm to use.
clip_gradient_norm: Magnitude of the gradient to clip to.
regularization_penalty: How much weight to give the regularization loss
compared to the label loss.
num_readers: How many threads to use for I/O operations.
num_epochs: How many passes to make over the data. 'None' means an unlimited
number of passes.
"""
global_step = tf.Variable(0, trainable=False, name="global_step")
local_device_protos = device_lib.list_local_devices()
gpus = [x.name for x in local_device_protos if x.device_type == "GPU"]
gpus = gpus[:FLAGS.num_gpu]
num_gpus = len(gpus)
if num_gpus > 0:
logging.info("Using the following GPUs to train: " + str(gpus))
num_towers = num_gpus
device_string = "/gpu:%d"
else:
logging.info("No GPUs found. Training on CPU.")
num_towers = 1
device_string = "/cpu:%d"
learning_rate = tf.train.exponential_decay(base_learning_rate,
global_step * batch_size *
num_towers,
learning_rate_decay_examples,
learning_rate_decay,
staircase=True)
tf.summary.scalar("learning_rate", learning_rate)
optimizer = optimizer_class(learning_rate)
input_data_dict = (get_input_data_tensors(reader,
train_data_pattern,
batch_size=batch_size * num_towers,
num_readers=num_readers,
num_epochs=num_epochs))
model_input_raw = input_data_dict["video_matrix"]
labels_batch = input_data_dict["labels"]
num_frames = input_data_dict["num_frames"]
print("model_input_shape, ", model_input_raw.shape)
tf.summary.histogram("model/input_raw", model_input_raw)
feature_dim = len(model_input_raw.get_shape()) - 1
model_input = tf.nn.l2_normalize(model_input_raw, feature_dim)
tower_inputs = tf.split(model_input, num_towers)
tower_labels = tf.split(labels_batch, num_towers)
tower_num_frames = tf.split(num_frames, num_towers)
tower_gradients = []
tower_predictions = []
tower_label_losses = []
tower_reg_losses = []
for i in range(num_towers):
# For some reason these 'with' statements can't be combined onto the same
# line. They have to be nested.
with tf.device(device_string % i):
with (tf.variable_scope(("tower"), reuse=True if i > 0 else None)):
with (slim.arg_scope([slim.model_variable, slim.variable],
device="/cpu:0" if num_gpus != 1 else "/gpu:0")):
result = model.create_model(tower_inputs[i],
num_frames=tower_num_frames[i],
vocab_size=reader.num_classes,
labels=tower_labels[i])
for variable in slim.get_model_variables():
tf.summary.histogram(variable.op.name, variable)
predictions = result["predictions"]
tower_predictions.append(predictions)
if "loss" in result.keys():
label_loss = result["loss"]
else:
label_loss = label_loss_fn.calculate_loss(predictions,
tower_labels[i])
if "regularization_loss" in result.keys():
reg_loss = result["regularization_loss"]
else:
reg_loss = tf.constant(0.0)
reg_losses = tf.losses.get_regularization_losses()
if reg_losses:
reg_loss += tf.add_n(reg_losses)
tower_reg_losses.append(reg_loss)
# Adds update_ops (e.g., moving average updates in batch normalization) as
# a dependency to the train_op.
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
if "update_ops" in result.keys():
update_ops += result["update_ops"]
if update_ops:
with tf.control_dependencies(update_ops):
barrier = tf.no_op(name="gradient_barrier")
with tf.control_dependencies([barrier]):
label_loss = tf.identity(label_loss)
tower_label_losses.append(label_loss)
# Incorporate the L2 weight penalties etc.
final_loss = regularization_penalty * reg_loss + label_loss
gradients = optimizer.compute_gradients(
final_loss, colocate_gradients_with_ops=False)
tower_gradients.append(gradients)
label_loss = tf.reduce_mean(tf.stack(tower_label_losses))
tf.summary.scalar("label_loss", label_loss)
if regularization_penalty != 0:
reg_loss = tf.reduce_mean(tf.stack(tower_reg_losses))
tf.summary.scalar("reg_loss", reg_loss)
merged_gradients = utils.combine_gradients(tower_gradients)
if clip_gradient_norm > 0:
with tf.name_scope("clip_grads"):
merged_gradients = utils.clip_gradient_norms(merged_gradients,
clip_gradient_norm)
train_op = optimizer.apply_gradients(merged_gradients,
global_step=global_step)
tf.add_to_collection("global_step", global_step)
tf.add_to_collection("loss", label_loss)
tf.add_to_collection("predictions", tf.concat(tower_predictions, 0))
tf.add_to_collection("input_batch_raw", model_input_raw)
tf.add_to_collection("input_batch", model_input)
tf.add_to_collection("num_frames", num_frames)
tf.add_to_collection("labels", tf.cast(labels_batch, tf.float32))
tf.add_to_collection("train_op", train_op)
class Trainer(object):
"""A Trainer to train a Tensorflow graph."""
def __init__(self,
cluster,
task,
train_dir,
model,
reader,
model_exporter,
log_device_placement=True,
max_steps=None,
export_model_steps=1000):
""""Creates a Trainer.
Args:
cluster: A tf.train.ClusterSpec if the execution is distributed. None
otherwise.
task: A TaskSpec describing the job type and the task index.
"""
self.cluster = cluster
self.task = task
self.is_master = (task.type == "master" and task.index == 0)
self.train_dir = train_dir
self.config = tf.ConfigProto(allow_soft_placement=True,
log_device_placement=log_device_placement)
self.config.gpu_options.allow_growth = True
self.model = model
self.reader = reader
self.model_exporter = model_exporter
self.max_steps = max_steps
self.max_steps_reached = False
self.export_model_steps = export_model_steps
self.last_model_export_step = 0
# if self.is_master and self.task.index > 0:
# raise StandardError("%s: Only one replica of master expected",
# task_as_string(self.task))
def run(self, start_new_model=False):
"""Performs training on the currently defined Tensorflow graph.
Returns:
A tuple of the training Hit@1 and the training PERR.
"""
if self.is_master and start_new_model:
self.remove_training_directory(self.train_dir)
if not os.path.exists(self.train_dir):
os.makedirs(self.train_dir)
model_flags_dict = {
"model": FLAGS.model,
"feature_sizes": FLAGS.feature_sizes,
"feature_names": FLAGS.feature_names,
"frame_features": FLAGS.frame_features,
"label_loss": FLAGS.label_loss,
}
flags_json_path = os.path.join(FLAGS.train_dir, "model_flags.json")
if file_io.file_exists(flags_json_path):
existing_flags = json.load(file_io.FileIO(flags_json_path, mode="r"))
if existing_flags != model_flags_dict:
logging.error(
"Model flags do not match existing file %s. Please "
"delete the file, change --train_dir, or pass flag "
"--start_new_model", flags_json_path)
logging.error("Ran model with flags: %s", str(model_flags_dict))
logging.error("Previously ran with flags: %s", str(existing_flags))
exit(1)
else:
# Write the file.
with file_io.FileIO(flags_json_path, mode="w") as fout:
fout.write(json.dumps(model_flags_dict))
target, device_fn = self.start_server_if_distributed()
meta_filename = self.get_meta_filename(start_new_model, self.train_dir)
with tf.Graph().as_default() as graph:
if meta_filename:
saver = self.recover_model(meta_filename)
with tf.device(device_fn):
if not meta_filename:
saver = self.build_model(self.model, self.reader)
global_step = tf.get_collection("global_step")[0]
loss = tf.get_collection("loss")[0]
predictions = tf.get_collection("predictions")[0]
labels = tf.get_collection("labels")[0]
train_op = tf.get_collection("train_op")[0]
init_op = tf.global_variables_initializer()
sv = tf.train.Supervisor(graph,
logdir=self.train_dir,
init_op=init_op,
is_chief=self.is_master,
global_step=global_step,
save_model_secs=15 * 60,
save_summaries_secs=120,
saver=saver)
logging.info("%s: Starting managed session.", task_as_string(self.task))
with sv.managed_session(target, config=self.config) as sess:
try:
logging.info("%s: Entering training loop.", task_as_string(self.task))
while (not sv.should_stop()) and (not self.max_steps_reached):
batch_start_time = time.time()
_, global_step_val, loss_val, predictions_val, labels_val = sess.run(
[train_op, global_step, loss, predictions, labels])
seconds_per_batch = time.time() - batch_start_time
examples_per_second = labels_val.shape[0] / seconds_per_batch
if self.max_steps and self.max_steps <= global_step_val:
self.max_steps_reached = True
if self.is_master and global_step_val % 10 == 0 and self.train_dir:
eval_start_time = time.time()
hit_at_one = eval_util.calculate_hit_at_one(predictions_val,
labels_val)
perr = eval_util.calculate_precision_at_equal_recall_rate(
predictions_val, labels_val)
gap = eval_util.calculate_gap(predictions_val, labels_val)
eval_end_time = time.time()
eval_time = eval_end_time - eval_start_time
logging.info("training step " + str(global_step_val) + " | Loss: " +
("%.2f" % loss_val) + " Examples/sec: " +
("%.2f" % examples_per_second) + " | Hit@1: " +
("%.2f" % hit_at_one) + " PERR: " + ("%.2f" % perr) +
" GAP: " + ("%.2f" % gap))
sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_Hit@1", hit_at_one),
global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_Perr", perr), global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_GAP", gap), global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("global_step/Examples/Second",
examples_per_second), global_step_val)
sv.summary_writer.flush()
# Exporting the model every x steps
time_to_export = ((self.last_model_export_step == 0) or
(global_step_val - self.last_model_export_step >=
self.export_model_steps))
if self.is_master and time_to_export:
self.export_model(global_step_val, sv.saver, sv.save_path, sess)
self.last_model_export_step = global_step_val
else:
logging.info("training step " + str(global_step_val) + " | Loss: " +
("%.2f" % loss_val) + " Examples/sec: " +
("%.2f" % examples_per_second))
except tf.errors.OutOfRangeError:
logging.info("%s: Done training -- epoch limit reached.",
task_as_string(self.task))
logging.info("%s: Exited training loop.", task_as_string(self.task))
sv.Stop()
def export_model(self, global_step_val, saver, save_path, session):
# If the model has already been exported at this step, return.
if global_step_val == self.last_model_export_step:
return
saver.save(session, save_path, global_step_val)
def start_server_if_distributed(self):
"""Starts a server if the execution is distributed."""
if self.cluster:
logging.info("%s: Starting trainer within cluster %s.",
task_as_string(self.task), self.cluster.as_dict())
server = start_server(self.cluster, self.task)
target = server.target
device_fn = tf.train.replica_device_setter(
ps_device="/job:ps",
worker_device="/job:%s/task:%d" % (self.task.type, self.task.index),
cluster=self.cluster)
else:
target = ""
device_fn = ""
return (target, device_fn)
def remove_training_directory(self, train_dir):
"""Removes the training directory."""
try:
logging.info("%s: Removing existing train directory.",
task_as_string(self.task))
gfile.DeleteRecursively(train_dir)
except:
logging.error(
"%s: Failed to delete directory " + train_dir +
" when starting a new model. Please delete it manually and" +
" try again.", task_as_string(self.task))
def get_meta_filename(self, start_new_model, train_dir):
if start_new_model:
logging.info("%s: Flag 'start_new_model' is set. Building a new model.",
task_as_string(self.task))
return None
latest_checkpoint = tf.train.latest_checkpoint(train_dir)
if not latest_checkpoint:
logging.info("%s: No checkpoint file found. Building a new model.",
task_as_string(self.task))
return None
meta_filename = latest_checkpoint + ".meta"
if not gfile.Exists(meta_filename):
logging.info("%s: No meta graph file found. Building a new model.",
task_as_string(self.task))
return None
else:
return meta_filename
def recover_model(self, meta_filename):
logging.info("%s: Restoring from meta graph file %s",
task_as_string(self.task), meta_filename)
return tf.train.import_meta_graph(meta_filename)
def build_model(self, model, reader):
"""Find the model and build the graph."""
label_loss_fn = find_class_by_name(FLAGS.label_loss, [losses])()
optimizer_class = find_class_by_name(FLAGS.optimizer, [tf.train])
build_graph(reader=reader,
model=model,
optimizer_class=optimizer_class,
clip_gradient_norm=FLAGS.clip_gradient_norm,
train_data_pattern=FLAGS.train_data_pattern,
label_loss_fn=label_loss_fn,
base_learning_rate=FLAGS.base_learning_rate,
learning_rate_decay=FLAGS.learning_rate_decay,
learning_rate_decay_examples=FLAGS.learning_rate_decay_examples,
regularization_penalty=FLAGS.regularization_penalty,
num_readers=FLAGS.num_readers,
batch_size=FLAGS.batch_size,
num_epochs=FLAGS.num_epochs)
return tf.train.Saver(max_to_keep=0, keep_checkpoint_every_n_hours=0.25)
def get_reader():
# Convert feature_names and feature_sizes to lists of values.
feature_names, feature_sizes = utils.GetListOfFeatureNamesAndSizes(
FLAGS.feature_names, FLAGS.feature_sizes)
if FLAGS.frame_features:
reader = readers.YT8MFrameFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes,
segment_labels=FLAGS.segment_labels)
else:
reader = readers.YT8MAggregatedFeatureReader(feature_names=feature_names,
feature_sizes=feature_sizes)
return reader
class ParameterServer(object):
"""A parameter server to serve variables in a distributed execution."""
def __init__(self, cluster, task):
"""Creates a ParameterServer.
Args:
cluster: A tf.train.ClusterSpec if the execution is distributed. None
otherwise.
task: A TaskSpec describing the job type and the task index.
"""
self.cluster = cluster
self.task = task
def run(self):
"""Starts the parameter server."""
logging.info("%s: Starting parameter server within cluster %s.",
task_as_string(self.task), self.cluster.as_dict())
server = start_server(self.cluster, self.task)
server.join()
def start_server(cluster, task):
"""Creates a Server.
Args:
cluster: A tf.train.ClusterSpec if the execution is distributed. None
otherwise.
task: A TaskSpec describing the job type and the task index.
"""
if not task.type:
raise ValueError("%s: The task type must be specified." %
task_as_string(task))
if task.index is None:
raise ValueError("%s: The task index must be specified." %
task_as_string(task))
# Create and start a server.
return tf.train.Server(tf.train.ClusterSpec(cluster),
protocol="grpc",
job_name=task.type,
task_index=task.index)
def task_as_string(task):
return "/job:%s/task:%s" % (task.type, task.index)
def main(unused_argv):
# Load the environment.
env = json.loads(os.environ.get("TF_CONFIG", "{}"))
# Load the cluster data from the environment.
cluster_data = env.get("cluster", None)
cluster = tf.train.ClusterSpec(cluster_data) if cluster_data else None
# Load the task data from the environment.
task_data = env.get("task", None) or {"type": "master", "index": 0}
task = type("TaskSpec", (object,), task_data)
# Logging the version.
logging.set_verbosity(tf.logging.INFO)
logging.info("%s: Tensorflow version: %s.", task_as_string(task),
tf.__version__)
# Dispatch to a master, a worker, or a parameter server.
if not cluster or task.type == "master" or task.type == "worker":
model = find_class_by_name(FLAGS.model,
[frame_level_models, video_level_models])()
reader = get_reader()
model_exporter = export_model.ModelExporter(
frame_features=FLAGS.frame_features, model=model, reader=reader)
Trainer(cluster, task, FLAGS.train_dir, model, reader, model_exporter,
FLAGS.log_device_placement, FLAGS.max_steps,
FLAGS.export_model_steps).run(start_new_model=FLAGS.start_new_model)
elif task.type == "ps":
ParameterServer(cluster, task).run()
else:
raise ValueError("%s: Invalid task_type: %s." %
(task_as_string(task), task.type))
if __name__ == "__main__":
app.run()
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a collection of util functions for training and evaluating."""
import numpy
import tensorflow as tf
from tensorflow import logging
try:
xrange # Python 2
except NameError:
xrange = range # Python 3
def Dequantize(feat_vector, max_quantized_value=2, min_quantized_value=-2):
"""Dequantize the feature from the byte format to the float format.
Args:
feat_vector: the input 1-d vector.
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.
Returns:
A float vector which has the same shape as feat_vector.
"""
assert max_quantized_value > min_quantized_value
quantized_range = max_quantized_value - min_quantized_value
scalar = quantized_range / 255.0
bias = (quantized_range / 512.0) + min_quantized_value
return feat_vector * scalar + bias
def MakeSummary(name, value):
"""Creates a tf.Summary proto with the given name and value."""
summary = tf.Summary()
val = summary.value.add()
val.tag = str(name)
val.simple_value = float(value)
return summary
def AddGlobalStepSummary(summary_writer,
global_step_val,
global_step_info_dict,
summary_scope="Eval"):
"""Add the global_step summary to the Tensorboard.
Args:
summary_writer: Tensorflow summary_writer.
global_step_val: a int value of the global step.
global_step_info_dict: a dictionary of the evaluation metrics calculated for
a mini-batch.
summary_scope: Train or Eval.
Returns:
A string of this global_step summary
"""
this_hit_at_one = global_step_info_dict["hit_at_one"]
this_perr = global_step_info_dict["perr"]
this_loss = global_step_info_dict["loss"]
examples_per_second = global_step_info_dict.get("examples_per_second", -1)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Hit@1", this_hit_at_one),
global_step_val)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Perr", this_perr),
global_step_val)
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Loss", this_loss),
global_step_val)
if examples_per_second != -1:
summary_writer.add_summary(
MakeSummary("GlobalStep/" + summary_scope + "_Example_Second",
examples_per_second), global_step_val)
summary_writer.flush()
info = (
"global_step {0} | Batch Hit@1: {1:.3f} | Batch PERR: {2:.3f} | Batch "
"Loss: {3:.3f} | Examples_per_sec: {4:.3f}").format(
global_step_val, this_hit_at_one, this_perr, this_loss,
examples_per_second)
return info
def AddEpochSummary(summary_writer,
global_step_val,
epoch_info_dict,
summary_scope="Eval"):
"""Add the epoch summary to the Tensorboard.
Args:
summary_writer: Tensorflow summary_writer.
global_step_val: a int value of the global step.
epoch_info_dict: a dictionary of the evaluation metrics calculated for the
whole epoch.
summary_scope: Train or Eval.
Returns:
A string of this global_step summary
"""
epoch_id = epoch_info_dict["epoch_id"]
avg_hit_at_one = epoch_info_dict["avg_hit_at_one"]
avg_perr = epoch_info_dict["avg_perr"]
avg_loss = epoch_info_dict["avg_loss"]
aps = epoch_info_dict["aps"]
gap = epoch_info_dict["gap"]
mean_ap = numpy.mean(aps)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Hit@1", avg_hit_at_one),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Perr", avg_perr),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_Avg_Loss", avg_loss),
global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_MAP", mean_ap), global_step_val)
summary_writer.add_summary(
MakeSummary("Epoch/" + summary_scope + "_GAP", gap), global_step_val)
summary_writer.flush()
info = ("epoch/eval number {0} | Avg_Hit@1: {1:.3f} | Avg_PERR: {2:.3f} "
"| MAP: {3:.3f} | GAP: {4:.3f} | Avg_Loss: {5:3f} | num_classes: {6}"
).format(epoch_id, avg_hit_at_one, avg_perr, mean_ap, gap, avg_loss,
len(aps))
return info
def GetListOfFeatureNamesAndSizes(feature_names, feature_sizes):
"""Extract the list of feature names and the dimensionality of each feature
from string of comma separated values.
Args:
feature_names: string containing comma separated list of feature names
feature_sizes: string containing comma separated list of feature sizes
Returns:
List of the feature names and list of the dimensionality of each feature.
Elements in the first/second list are strings/integers.
"""
list_of_feature_names = [
feature_names.strip() for feature_names in feature_names.split(",")
]
list_of_feature_sizes = [
int(feature_sizes) for feature_sizes in feature_sizes.split(",")
]
if len(list_of_feature_names) != len(list_of_feature_sizes):
logging.error("length of the feature names (=" +
str(len(list_of_feature_names)) + ") != length of feature "
"sizes (=" + str(len(list_of_feature_sizes)) + ")")
return list_of_feature_names, list_of_feature_sizes
def clip_gradient_norms(gradients_to_variables, max_norm):
"""Clips the gradients by the given value.
Args:
gradients_to_variables: A list of gradient to variable pairs (tuples).
max_norm: the maximum norm value.
Returns:
A list of clipped gradient to variable pairs.
"""
clipped_grads_and_vars = []
for grad, var in gradients_to_variables:
if grad is not None:
if isinstance(grad, tf.IndexedSlices):
tmp = tf.clip_by_norm(grad.values, max_norm)
grad = tf.IndexedSlices(tmp, grad.indices, grad.dense_shape)
else:
grad = tf.clip_by_norm(grad, max_norm)
clipped_grads_and_vars.append((grad, var))
return clipped_grads_and_vars
def combine_gradients(tower_grads):
"""Calculate the combined gradient for each shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
tower_grads: List of lists of (gradient, variable) tuples. The outer list is
over individual gradients. The inner list is over the gradient calculation
for each tower.
Returns:
List of pairs of (gradient, variable) where the gradient has been summed
across all towers.
"""
filtered_grads = [
[x for x in grad_list if x[0] is not None] for grad_list in tower_grads
]
final_grads = []
for i in xrange(len(filtered_grads[0])):
grads = [filtered_grads[t][i] for t in xrange(len(filtered_grads))]
grad = tf.stack([x[0] for x in grads], 0)
grad = tf.reduce_sum(grad, 0)
final_grads.append((
grad,
filtered_grads[0][i][1],
))
return final_grads
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains model definitions."""
import math
import models
import tensorflow as tf
import utils
from tensorflow import flags
import tensorflow.contrib.slim as slim
FLAGS = flags.FLAGS
flags.DEFINE_integer(
"moe_num_mixtures", 2,
"The number of mixtures (excluding the dummy 'expert') used for MoeModel.")
class LogisticModel(models.BaseModel):
"""Logistic model with L2 regularization."""
def create_model(self,
model_input,
vocab_size,
l2_penalty=1e-8,
**unused_params):
"""Creates a logistic model.
Args:
model_input: 'batch' x 'num_features' matrix of input features.
vocab_size: The number of classes in the dataset.
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
batch_size x num_classes.
"""
output = slim.fully_connected(
model_input,
vocab_size,
activation_fn=tf.nn.sigmoid,
weights_regularizer=slim.l2_regularizer(l2_penalty))
return {"predictions": output}
class MoeModel(models.BaseModel):
"""A softmax over a mixture of logistic models (with L2 regularization)."""
def create_model(self,
model_input,
vocab_size,
num_mixtures=None,
l2_penalty=1e-8,
**unused_params):
"""Creates a Mixture of (Logistic) Experts model.
The model consists of a per-class softmax distribution over a
configurable number of logistic classifiers. One of the classifiers in the
mixture is not trained, and always predicts 0.
Args:
model_input: 'batch_size' x 'num_features' matrix of input features.
vocab_size: The number of classes in the dataset.
num_mixtures: The number of mixtures (excluding a dummy 'expert' that
always predicts the non-existence of an entity).
l2_penalty: How much to penalize the squared magnitudes of parameter
values.
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
batch_size x num_classes.
"""
num_mixtures = num_mixtures or FLAGS.moe_num_mixtures
gate_activations = slim.fully_connected(
model_input,
vocab_size * (num_mixtures + 1),
activation_fn=None,
biases_initializer=None,
weights_regularizer=slim.l2_regularizer(l2_penalty),
scope="gates")
expert_activations = slim.fully_connected(
model_input,
vocab_size * num_mixtures,
activation_fn=None,
weights_regularizer=slim.l2_regularizer(l2_penalty),
scope="experts")
gating_distribution = tf.nn.softmax(
tf.reshape(
gate_activations,
[-1, num_mixtures + 1])) # (Batch * #Labels) x (num_mixtures + 1)
expert_distribution = tf.nn.sigmoid(
tf.reshape(expert_activations,
[-1, num_mixtures])) # (Batch * #Labels) x num_mixtures
final_probabilities_by_class_and_batch = tf.reduce_sum(
gating_distribution[:, :num_mixtures] * expert_distribution, 1)
final_probabilities = tf.reshape(final_probabilities_by_class_and_batch,
[-1, vocab_size])
return {"predictions": final_probabilities}