model.py 6.96 KB
from keras.models import Model

from keras.layers import Input, Reshape
from keras.layers.core import Dense, Lambda, Activation
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import GlobalAveragePooling2D, MaxPooling2D
from keras.layers.merge import concatenate, add, multiply
from keras.layers.normalization import BatchNormalization

from keras.regularizers import l2
from keras.utils.data_utils import get_file
from keras_applications.imagenet_utils import _obtain_input_shape

import keras.backend as K

import os


class SEResNeXt(Model):
    def __init__(self, weight, input_shape=None, depth=[3, 8, 36, 3], cardinality=32, width=4, reduction_ratio=4, weight_decay=5e-4, classes=1000, channel_axis=None):
        '''
            ResNext Model

            ## Args
            + input_shape: optional shape tuple
            + depth: number or layers in the each block, defined as a list
            + cardinality: the size of the set of transformations
            + width: multiplier to the ResNeXt width (number of filters)
            + redution_ratio: ratio of reducition in SE Block 
            + weight_decay: weight decay (l2 norm)
            + classes: number of classes to classify images into
            + channel_axis: channel axis in keras.backend.image_data_format()
        '''
        self.__weight = weight
        self.__depth = depth
        self.__cardinality = cardinality
        self.__width = width
        self.__reduction_ratio = reduction_ratio
        self.__weight_decay = weight_decay
        self.__classes = classes
        self.__channel_axis = 1 if K.image_data_format() == "channels_first" else -1

        self.__input_shape = _obtain_input_shape(input_shape, default_size = 224, min_size = 112, data_format=K.image_data_format(), require_flatten=True)
        self.__img_input = Input(shape=self.__input_shape)

        # Create model.
        super(SEResNeXt, self).__init__(self.__img_input, self.__create_res_next(), name='seresnext')

    def __initial_conv_block(self):
        '''
            Adds an initial conv block, with batch norm and relu for the inception resnext
        '''
        channel_axis = -1

        x = Conv2D(64, (7, 7), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay), strides=(2, 2))(self.__img_input)
        x = BatchNormalization(axis=channel_axis)(x)
        x = Activation('relu')(x)

        x = MaxPooling2D((3, 3), strides=(2, 2), padding='same')(x)

        return x

    def __grouped_convolution_block(self, input, grouped_channels, strides):
        ''' 
            Adds a grouped convolution block. It is an equivalent block from the paper

            ## Args
            + input: input tensor
            + grouped_channels: grouped number of filters
            + strides: performs strided convolution for downscaling if > 1

            ## Returns
            a keras tensor
        '''
        init = input

        group_list = []
        for c in range(self.__cardinality):
            x = Lambda(lambda z: z[:, :, :, c * grouped_channels:(c + 1) * grouped_channels])(input)
            x = Conv2D(grouped_channels, (3, 3), padding='same', use_bias=False, strides=(strides, strides), kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(x)
            group_list.append(x)

        group_merge = concatenate(group_list, axis=self.__channel_axis)
        x = BatchNormalization(axis=self.__channel_axis)(group_merge)
        x = Activation('relu')(x)

        return x

    def __bottleneck_block(self, input, filters=64, strides=1):
        '''
            Adds a bottleneck block
            
            ## Args
            + input: input tensor
            + filters: number of output filters
            + strides: performs strided convolution for downsampling if > 1

            ## Returns
            a keras tensor
        '''
        init = input

        grouped_channels = int(filters / self.__cardinality)

        # Check if input number of filters is same as 16 * k, else create convolution2d for this input
        if init._keras_shape[-1] != 2 * filters:
            init = Conv2D(filters * 2, (1, 1), padding='same', strides=(strides, strides), use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(init)
            init = BatchNormalization(axis=self.__channel_axis)(init)

        x = Conv2D(filters, (1, 1), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(input)
        x = BatchNormalization(axis=self.__channel_axis)(x)
        x = Activation('relu')(x)
        x = self.__squeeze_excitation_layer(x, x[0].get_shape()[self.__channel_axis])

        x = self.__grouped_convolution_block(x, grouped_channels, strides)

        x = Conv2D(filters * 2, (1, 1), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(x)
        x = BatchNormalization(axis=self.__channel_axis)(x)

        x = add([init, x])
        x = Activation('relu')(x)

        return x

    def __create_res_next(self):
        ''' 
            Creates a ResNeXt model with specified parameters
        '''

        N = list(self.__depth)
        
        filters = self.__cardinality * self.__width
        filters_list = []
        for i in range(len(N)):
            filters_list.append(filters)
            filters *= 2  # double the size of the filters

        x = self.__initial_conv_block()

        # block 1 (no pooling)
        for i in range(N[0]):
            x = self.__bottleneck_block(x, filters_list[0], strides=1)

        N = N[1:]  # remove the first block from block definition list
        filters_list = filters_list[1:]  # remove the first filter from the filter list

        # block 2 to N
        for block_idx, n_i in enumerate(N):
            for i in range(n_i):
                if i == 0:
                    x = self.__bottleneck_block(x, filters_list[block_idx], strides=2)
                else:
                    x = self.__bottleneck_block(x, filters_list[block_idx], strides=1)

        
        x = GlobalAveragePooling2D()(x)
        x = Dense(self.__classes, use_bias=False, kernel_regularizer=l2(self.__weight_decay), kernel_initializer='he_normal', activation='softmax')(x)
        
        return x

    def __squeeze_excitation_layer(self, x, out_dim):
        '''
            SE Block Function

            ## Args
            + x : input feature map
            + out_dim : dimention of output channel
        '''
        squeeze = GlobalAveragePooling2D()(x)
        
        excitation = Dense(units=out_dim // self.__reduction_ratio)(squeeze)
        excitation = Activation('relu')(excitation)
        excitation = Dense(units=out_dim)(excitation)
        excitation = Activation('sigmoid')(excitation)
        excitation = Reshape((1,1,out_dim))(excitation)
        
        scale = multiply([x,excitation])
        
        return scale


if __name__ == '__main__':
    model = SEResNeXt((112, 112, 3))
    model.summary()