video_consumer.py
2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import cv2
import spark.parameters as params
from spark.c3d import *
from spark.classifier import *
from spark.utils.array_util import *
def deserializer(img):
return img[0], np.frombuffer(img[1], dtype=np.uint8)
def decode(img):
return img[0], cv2.cvtColor(cv2.imdecode(img[1], cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
def sliding_window(arr, size, stride):
num_chunks = int((len(arr) - size) / stride) + 2
result = []
for i in range(0, num_chunks * stride, stride):
if len(arr[i:i + size]) > 0:
result.append(arr[i:i + size])
return np.array(result)
sc = SparkContext(appName="test")
ssc = StreamingContext(sc,1)
brokers, topic = sys.argv[1:]
kafka_stream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers}, valueDecoder=lambda x: x)
frames = kafka_stream.map(deserializer).map(decode).map(lambda x: x[1])
frame_list = []
frames.foreachRDD(lambda x:frame_list.append(x.collect())) # rdd -> list
video_clips = sliding_window(frame_list, params.frame_count, params.frame_count) # list -> np
# build models
feature_extractor = c3d_feature_extractor()
classifier_model = build_classifier_model()
# extract features
rgb_features = []
for i, clip in enumerate(video_clips):
clip = np.array(clip)
if len(clip) < params.frame_count:
continue
clip = preprocess_input(clip)
sc.parallelize(clip)
rgb_feature = feature_extractor.predict(clip)[0]
rgb_features.append(rgb_feature)
rgb_features = np.array(rgb_features)
# bag features
rgb_feature_bag = interpolate(rgb_features, params.features_per_bag)
# classify using the trained classifier model
sc.parallelize(rgb_feature_bag)
predictions = classifier_model.predict(rgb_feature_bag)
predictions = np.array(predictions).squeeze()
# predictions
predictions = extrapolate(predictions, len(frame_list))
frames.pprint()
ssc.start()
ssc.awaitTermination()