model.py 7.31 KB
from keras.models import Model

from keras.layers import Input, Reshape
from keras.layers.core import Dense, Lambda, Activation
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import GlobalAveragePooling2D, MaxPooling2D
from keras.layers.merge import concatenate, add, multiply
from keras.layers.normalization import BatchNormalization

from keras.regularizers import l2
from keras.utils.data_utils import get_file
from keras_applications.imagenet_utils import _obtain_input_shape

import keras.backend as K

import os


class SEResNeXt(Model):
    def __init__(self, weight, input_shape=None):
        '''
            ResNext Model

            ## Args
            + weight: 
            + input_shape: optional shape tuple
        '''

        if weights not in {'cifar10', 'imagenet'}:
            raise ValueError

        self.__weight = weight

        if weight == 'cifar10':
            self.__depth = 29
            self.__cardinality = 8
            self.__width = 64
            self.__classes = 10
        else:
            self.__depth = [3, 8, 36, 3]
            self.__cardinality = 32
            self.__width = 4
            self.__classes = 1000

        self.__reduction_ratio = 4
        self.__weight_decay = 5e-4
        self.__channel_axis = 1 if K.image_data_format() == "channels_first" else -1

        if weight == 'cifar10':
            self.__input_shape = _obtain_input_shape(input_shape, default_size=32, min_size=8, data_format=K.image_data_format(), require_flatten=True)
        else:
            self.__input_shape = _obtain_input_shape(input_shape, default_size=224, min_size=112, data_format=K.image_data_format(), require_flatten=True)
        self.__img_input = Input(shape=self.__input_shape)

        # Create model.
        super(SEResNeXt, self).__init__(self.__img_input, self.__create_res_next(), name='seresnext')

    def __initial_conv_block(self):
        '''
            Adds an initial conv block, with batch norm and relu for the inception resnext
        '''
        if weight == 'cifar10':
            x = Conv2D(64, (3, 3), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(self.__img_input)
            x = BatchNormalization(axis=self.__channel_axis)(x)
            x = Activation('relu')(x)
            return x
        else:
            x = Conv2D(64, (7, 7), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay), strides=(2, 2))(self.__img_input)
            x = BatchNormalization(axis=self.__channel_axis)(x)
            x = Activation('relu')(x)
            x = MaxPooling2D((3, 3), strides=(2, 2), padding='same')(x)
            return x

    def __grouped_convolution_block(self, input, grouped_channels, strides):
        ''' 
            Adds a grouped convolution block. It is an equivalent block from the paper

            ## Args
            + input: input tensor
            + grouped_channels: grouped number of filters
            + strides: performs strided convolution for downscaling if > 1

            ## Returns
            a keras tensor
        '''
        init = input

        group_list = []
        for c in range(self.__cardinality):
            x = Lambda(lambda z: z[:, :, :, c * grouped_channels:(c + 1) * grouped_channels])(input)
            x = Conv2D(grouped_channels, (3, 3), padding='same', use_bias=False, strides=(strides, strides), kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(x)
            group_list.append(x)

        group_merge = concatenate(group_list, axis=self.__channel_axis)
        x = BatchNormalization(axis=self.__channel_axis)(group_merge)
        x = Activation('relu')(x)

        return x

    def __bottleneck_block(self, input, filters=64, strides=1):
        '''
            Adds a bottleneck block
            
            ## Args
            + input: input tensor
            + filters: number of output filters
            + strides: performs strided convolution for downsampling if > 1

            ## Returns
            a keras tensor
        '''
        init = input

        grouped_channels = int(filters / self.__cardinality)

        # Check if input number of filters is same as 16 * k, else create convolution2d for this input
        if init._keras_shape[-1] != 2 * filters:
            init = Conv2D(filters * 2, (1, 1), padding='same', strides=(strides, strides), use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(init)
            init = BatchNormalization(axis=self.__channel_axis)(init)

        x = Conv2D(filters, (1, 1), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(input)
        x = BatchNormalization(axis=self.__channel_axis)(x)
        x = Activation('relu')(x)
        x = self.__squeeze_excitation_layer(x, x[0].get_shape()[self.__channel_axis])

        x = self.__grouped_convolution_block(x, grouped_channels, strides)

        x = Conv2D(filters * 2, (1, 1), padding='same', use_bias=False, kernel_initializer='he_normal', kernel_regularizer=l2(self.__weight_decay))(x)
        x = BatchNormalization(axis=self.__channel_axis)(x)

        x = add([init, x])
        x = Activation('relu')(x)

        return x

    def __create_res_next(self):
        ''' 
            Creates a ResNeXt model with specified parameters
        '''
        if type(self.__depth) is list or type(self.__depth) is tuple:
            N = list(self.__depth)
        else:
            N = [(self.__depth - 2) // 9 for _ in range(3)]
        print(N)
        
        filters = self.__cardinality * self.__width
        filters_list = []
        for i in range(len(N)):
            filters_list.append(filters)
            filters *= 2  # double the size of the filters

        x = self.__initial_conv_block()

        # block 1 (no pooling)
        for i in range(N[0]):
            x = self.__bottleneck_block(x, filters_list[0], strides=1)

        N = N[1:]  # remove the first block from block definition list
        filters_list = filters_list[1:]  # remove the first filter from the filter list

        # block 2 to N
        for block_idx, n_i in enumerate(N):
            for i in range(n_i):
                if i == 0:
                    x = self.__bottleneck_block(x, filters_list[block_idx], strides=2)
                else:
                    x = self.__bottleneck_block(x, filters_list[block_idx], strides=1)

        
        x = GlobalAveragePooling2D()(x)
        x = Dense(self.__classes, use_bias=False, kernel_regularizer=l2(self.__weight_decay), kernel_initializer='he_normal', activation='softmax')(x)
        
        return x

    def __squeeze_excitation_layer(self, x, out_dim):
        '''
            SE Block Function

            ## Args
            + x : input feature map
            + out_dim : dimention of output channel
        '''
        squeeze = GlobalAveragePooling2D()(x)
        
        excitation = Dense(units=out_dim // self.__reduction_ratio)(squeeze)
        excitation = Activation('relu')(excitation)
        excitation = Dense(units=out_dim)(excitation)
        excitation = Activation('sigmoid')(excitation)
        excitation = Reshape((1,1,out_dim))(excitation)
        
        scale = multiply([x,excitation])
        
        return scale


if __name__ == '__main__':
    model = SEResNeXt((112, 112, 3))
    model.summary()