JHyunB

소스코드 제출

## Kafka and Spark for real time video processing
Ubuntu 16.04.1 LTS
kafka_2.13-2.6.0
spark-2.4.7-hadoop2.7
python 3.5.2
\ No newline at end of file
kafka-python==2.0.2
numpy==1.18.5
opencv-python==4.4.0.42
py4j==0.10.9
from kafka import KafkaProducer
import cv2
topic = "testing" # 구독할 topic
producer = KafkaProducer(bootstrap_servers='') # kafka 부트스트랩 서버 입력
video_file = "" # 입력 파일
video = cv2.VideoCapture(video_file)
while video.isOpened():
success, frame = video.read()
if not success:
break
_, img = cv2.imencode('.jpg', frame)
producer.send(topic, img.tobytes())
video.release()
\ No newline at end of file
{"class_name": "Model", "backend": "tensorflow", "keras_version": "2.3.1", "config": {"name": "model_1", "input_layers": [["conv1_input", 0, 0]], "layers": [{"name": "conv1_input", "inbound_nodes": [], "class_name": "InputLayer", "config": {"name": "conv1_input", "sparse": false, "batch_input_shape": [null, 16, 112, 112, 3], "dtype": "float32"}}, {"name": "conv1", "inbound_nodes": [[["conv1_input", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv1", "filters": 64, "activation": "relu", "batch_input_shape": [null, 16, 112, 112, 3], "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "pool1", "inbound_nodes": [[["conv1", 0, 0, {}]]], "class_name": "MaxPooling3D", "config": {"padding": "same", "strides": [1, 2, 2], "dtype": "float32", "trainable": true, "name": "pool1", "data_format": "channels_last", "pool_size": [1, 2, 2]}}, {"name": "conv2", "inbound_nodes": [[["pool1", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv2", "filters": 128, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "pool2", "inbound_nodes": [[["conv2", 0, 0, {}]]], "class_name": "MaxPooling3D", "config": {"padding": "valid", "strides": [2, 2, 2], "dtype": "float32", "trainable": true, "name": "pool2", "data_format": "channels_last", "pool_size": [2, 2, 2]}}, {"name": "conv3a", "inbound_nodes": [[["pool2", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv3a", "filters": 256, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "conv3b", "inbound_nodes": [[["conv3a", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv3b", "filters": 256, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "pool3", "inbound_nodes": [[["conv3b", 0, 0, {}]]], "class_name": "MaxPooling3D", "config": {"padding": "valid", "strides": [2, 2, 2], "dtype": "float32", "trainable": true, "name": "pool3", "data_format": "channels_last", "pool_size": [2, 2, 2]}}, {"name": "conv4a", "inbound_nodes": [[["pool3", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv4a", "filters": 512, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "conv4b", "inbound_nodes": [[["conv4a", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv4b", "filters": 512, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "pool4", "inbound_nodes": [[["conv4b", 0, 0, {}]]], "class_name": "MaxPooling3D", "config": {"padding": "valid", "strides": [2, 2, 2], "dtype": "float32", "trainable": true, "name": "pool4", "data_format": "channels_last", "pool_size": [2, 2, 2]}}, {"name": "conv5a", "inbound_nodes": [[["pool4", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv5a", "filters": 512, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "conv5b", "inbound_nodes": [[["conv5a", 0, 0, {}]]], "class_name": "Conv3D", "config": {"bias_regularizer": null, "padding": "same", "dilation_rate": [1, 1, 1], "kernel_size": [3, 3, 3], "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "conv5b", "filters": 512, "activation": "relu", "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "data_format": "channels_last", "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true, "strides": [1, 1, 1]}}, {"name": "zero_padding3d_1", "inbound_nodes": [[["conv5b", 0, 0, {}]]], "class_name": "ZeroPadding3D", "config": {"trainable": true, "name": "zero_padding3d_1", "padding": [[0, 0], [1, 1], [1, 1]], "data_format": "channels_last", "dtype": "float32"}}, {"name": "pool5", "inbound_nodes": [[["zero_padding3d_1", 0, 0, {}]]], "class_name": "MaxPooling3D", "config": {"padding": "valid", "strides": [2, 2, 2], "dtype": "float32", "trainable": true, "name": "pool5", "data_format": "channels_last", "pool_size": [2, 2, 2]}}, {"name": "flatten_1", "inbound_nodes": [[["pool5", 0, 0, {}]]], "class_name": "Flatten", "config": {"trainable": true, "name": "flatten_1", "data_format": "channels_last", "dtype": "float32"}}, {"name": "fc6", "inbound_nodes": [[["flatten_1", 0, 0, {}]]], "class_name": "Dense", "config": {"bias_regularizer": null, "activation": "relu", "kernel_initializer": {"class_name": "VarianceScaling", "config": {"mode": "fan_avg", "seed": null, "distribution": "uniform", "scale": 1.0}}, "dtype": "float32", "name": "fc6", "units": 4096, "kernel_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "activity_regularizer": null, "kernel_regularizer": null, "bias_constraint": null, "trainable": true, "use_bias": true}}], "output_layers": [["fc6", 0, 0]]}}
\ No newline at end of file
# -*- coding: utf-8 -*-
import numpy as np
from keras.utils.data_utils import get_file
from scipy.misc import imresize
from bigdl.nn.layer import *
C3D_MEAN_PATH = 'https://github.com/adamcasson/c3d/releases/download/v0.1/c3d_mean.npy'
def preprocess_input(video):
intervals = np.ceil(np.linspace(0, video.shape[0] - 1, 16)).astype(int)
frames = video[intervals]
# Reshape to 128x171
reshape_frames = np.zeros((frames.shape[0], 128, 171, frames.shape[3]))
for i, img in enumerate(frames):
img = imresize(img, (128, 171), 'bicubic')
reshape_frames[i, :, :, :] = img
mean_path = get_file('c3d_mean.npy',
C3D_MEAN_PATH,
cache_subdir='models',
md5_hash='08a07d9761e76097985124d9e8b2fe34')
mean = np.load(mean_path)
reshape_frames -= mean
# Crop to 112x112
reshape_frames = reshape_frames[:, 8:120, 30:142, :]
# Add extra dimension for samples
reshape_frames = np.expand_dims(reshape_frames, axis=0)
return reshape_frames
def c3d_feature_extractor():
feature_extractor_model = Model.load_keras(json_path='./c3d.json')
return feature_extractor_model
{"class_name": "Sequential", "config": {"name": "sequential_1", "layers": [{"class_name": "Dense", "config": {"name": "dense_1", "use_bias": true, "bias_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": {"class_name": "L1L2", "config": {"l1": 0.0, "l2": 0.0010000000474974513}}, "kernel_initializer": {"class_name": "VarianceScaling", "config": {"distribution": "normal", "seed": null, "mode": "fan_avg", "scale": 1.0}}, "batch_input_shape": [null, 4096], "activity_regularizer": null, "activation": "relu", "kernel_constraint": null, "dtype": "float32", "trainable": true, "bias_regularizer": null, "units": 512}}, {"class_name": "Dropout", "config": {"name": "dropout_1", "dtype": "float32", "trainable": true, "seed": null, "rate": 0.6, "noise_shape": null}}, {"class_name": "Dense", "config": {"name": "dense_2", "use_bias": true, "bias_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": {"class_name": "L1L2", "config": {"l1": 0.0, "l2": 0.0010000000474974513}}, "kernel_initializer": {"class_name": "VarianceScaling", "config": {"distribution": "normal", "seed": null, "mode": "fan_avg", "scale": 1.0}}, "activity_regularizer": null, "activation": "linear", "kernel_constraint": null, "dtype": "float32", "trainable": true, "bias_regularizer": null, "units": 32}}, {"class_name": "Dropout", "config": {"name": "dropout_2", "dtype": "float32", "trainable": true, "seed": null, "rate": 0.6, "noise_shape": null}}, {"class_name": "Dense", "config": {"name": "dense_3", "use_bias": true, "bias_constraint": null, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": {"class_name": "L1L2", "config": {"l1": 0.0, "l2": 0.0010000000474974513}}, "kernel_initializer": {"class_name": "VarianceScaling", "config": {"distribution": "normal", "seed": null, "mode": "fan_avg", "scale": 1.0}}, "activity_regularizer": null, "activation": "sigmoid", "kernel_constraint": null, "dtype": "float32", "trainable": true, "bias_regularizer": null, "units": 1}}]}, "keras_version": "2.3.1", "backend": "tensorflow"}
\ No newline at end of file
from bigdl.nn.layer import *
def build_classifier_model():
model = Model.load_keras(json_path='classifier.json')
model.summary()
c3d_model_weights = './trained_models/c3d_sports1m.h5'
classifier_model_weigts = './trained_models/weights_L1L2.mat'
classifier_model_json = './trained_models/model.json'
input_folder = './input'
output_folder = './output'
sample_video_path = './input/Robbery056_x264.mp4'
frame_height = 240
frame_width = 320
channels = 3
frame_count = 16
features_per_bag = 32
kafka-python==2.0.2
numpy==1.18.5
opencv-python==4.4.0.42
py4j==0.10.9
pyspark==2.4.6
keras==2.3.1
Keras-Applications==1.0.8
Keras-Preprocessing==1.1.0
scipy==1.1.0
\ No newline at end of file
import numpy as np
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def interpolate(features, features_per_bag):
# 기존 172, 4096
feature_size = np.array(features).shape[1]
# 32, 4096
interpolated_features = np.zeros((features_per_bag, feature_size))
interpolation_indicies = np.round(np.linspace(0, len(features) - 1, num=features_per_bag + 1))
count = 0
for index in range(0, len(interpolation_indicies) - 1):
start = int(interpolation_indicies[index])
end = int(interpolation_indicies[index + 1])
assert end >= start
if start == end:
temp_vect = features[start, :]
else:
temp_vect = np.mean(features[start:end + 1, :], axis=0)
temp_vect = temp_vect / np.linalg.norm(temp_vect)
if np.linalg.norm(temp_vect) == 0:
print("Error")
interpolated_features[count, :] = temp_vect
count = count + 1
return np.array(interpolated_features)
def extrapolate(outputs, num_frames):
extrapolated_outputs = []
extrapolation_indicies = np.round(np.linspace(0, len(outputs) - 1, num=num_frames))
for index in extrapolation_indicies:
extrapolated_outputs.append(outputs[int(index)])
return np.array(extrapolated_outputs)
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import cv2
import spark.parameters as params
from spark.c3d import *
from spark.classifier import *
from spark.utils.array_util import *
def deserializer(img):
return img[0], np.frombuffer(img[1], dtype=np.uint8)
def decode(img):
return img[0], cv2.cvtColor(cv2.imdecode(img[1], cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
def sliding_window(arr, size, stride):
num_chunks = int((len(arr) - size) / stride) + 2
result = []
for i in range(0, num_chunks * stride, stride):
if len(arr[i:i + size]) > 0:
result.append(arr[i:i + size])
return np.array(result)
sc = SparkContext(appName="test")
ssc = StreamingContext(sc,1)
brokers, topic = sys.argv[1:]
kafka_stream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers}, valueDecoder=lambda x: x)
frames = kafka_stream.map(deserializer).map(decode).map(lambda x: x[1])
frame_list = []
frames.foreachRDD(lambda x:frame_list.append(x.collect())) # rdd -> list
video_clips = sliding_window(frame_list, params.frame_count, params.frame_count) # list -> np
# build models
feature_extractor = c3d_feature_extractor()
classifier_model = build_classifier_model()
# extract features
rgb_features = []
for i, clip in enumerate(video_clips):
clip = np.array(clip)
if len(clip) < params.frame_count:
continue
clip = preprocess_input(clip)
sc.parallelize(clip)
rgb_feature = feature_extractor.predict(clip)[0]
rgb_features.append(rgb_feature)
rgb_features = np.array(rgb_features)
# bag features
rgb_feature_bag = interpolate(rgb_features, params.features_per_bag)
# classify using the trained classifier model
sc.parallelize(rgb_feature_bag)
predictions = classifier_model.predict(rgb_feature_bag)
predictions = np.array(predictions).squeeze()
# predictions
predictions = extrapolate(predictions, len(frame_list))
frames.pprint()
ssc.start()
ssc.awaitTermination()