# Author: Wen Torng and Russ B. Altman (2018) # Network modules import numpy import theano import theano.tensor as T from theano.tensor.signal import downsample from theano.tensor.nnet import conv from theano.tensor.nnet import conv3d2d from theano.tensor.extra_ops import repeat def relu(X): """Rectified linear units (relu)""" return T.maximum(0,X) def _dropout_from_layer(rng, layer, p): """p is the probablity of dropping a unit """ srng = theano.tensor.shared_randomstreams.RandomStreams( rng.randint(999999)) # p=1-p because 1's indicate keep and p is prob of dropping mask = srng.binomial(n=1, p=1-p, size=layer.shape) # The cast is important because # int * float32 = float64 which pulls things off the gpu output = layer * T.cast(mask, theano.config.floatX) return output class Conv_1D(object): def __init__(self, rng, input, filter_shape, image_shape, W=None, b=None): #signals_shape = (batchsize, 1, 1, 480) #filters_shape = (num_filters, in_chs, filter width). self.input=input # initialize weights with random weights fan_in = 480 fan_out = (filter_shape[0]*numpy.prod(filter_shape[2:])) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) if W is None: self.W = theano.shared(numpy.asarray( rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX), borrow=True) else: self.W = W if b is None: b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True) else: self.b = b conv_out = theano.tensor.nnet.conv.conv2d(input, filters=self.W, image_shape=image_shape, filter_shape=filter_shape) #conv_out = theano.tensor.signal.conv.conv2d(input, filters=self.W, image_shape=image_shape, filter_shape=filter_shape) self.output = relu(conv_out + self.b.dimshuffle('x', 0,'x','x')) self.params = [self.W, self.b] class DropoutCNNLayer(Conv_1D): def __init__(self, rng, input, filter_shape, image_shape, dropout_rate, use_bias=True, W=None, b=None): super(DropoutCNNLayer, self).__init__( rng=rng, input=input, filter_shape=filter_shape, image_shape=image_shape, W=W, b=b) self.output = _dropout_from_layer(rng, self.output, p=dropout_rate) class padding_3D(object): # X with shape [Ns, Ts, C, Hs, Ws] def __init__(self, input, data_shape): self.input=input [Ns, Ts, C, Hs, Ws]=data_shape pad_w=theano.shared(numpy.zeros((Ns, Ts, C, Hs,1),dtype=theano.config.floatX)) x_pad_w=T.concatenate([pad_w,input, pad_w],axis=4) #x_pad_left=T.concatenate([pad_w, x_pad_right],axis=3) pad_h=theano.shared(numpy.zeros((Ns, Ts, C, 1, Ws+2),dtype=theano.config.floatX)) x_pad_h=T.concatenate([pad_h, x_pad_w, pad_h],axis=3) pad_d=theano.shared(numpy.zeros((Ns, 1, C, Hs+2, Ws+2),dtype=theano.config.floatX)) x_pad_d=T.concatenate([pad_d, x_pad_h, pad_d],axis=1) #x_pad=T.concatenate([pad_h, x_pad_up],axis=2) self.output = x_pad_d class Conv_3d_Layer(object): def __init__(self, rng, input, filter_shape, image_shape, W=None, b=None): (batchsize, in_time, in_channels, in_height, in_width) = image_shape pad_image_shape = (batchsize, in_time+2, in_channels, in_height+2, in_width+2) #filters_shape = (flt_channels, flt_time, in_channels, flt_height, flt_width) self.input=input assert image_shape[2] == filter_shape[2] # initialize weights with random weights fan_in = numpy.prod(filter_shape[2:]) fan_out = (filter_shape[0] * numpy.prod(filter_shape[3:])) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) if W is None: self.W = theano.shared(numpy.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),dtype=theano.config.floatX),borrow=True) else: #W = theano.shared(value=W, name='W', borrow=True) self.W = W if b is None: b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True) else: self.b = b pad_layer=padding_3D(input=input,data_shape=image_shape) pad_inp = pad_layer.output conv_out5D = conv3d2d.conv3d(signals=pad_inp, filters=self.W, signals_shape=pad_image_shape, filters_shape=filter_shape) self.output = relu(conv_out5D + self.b.dimshuffle('x', 0, 'x', 'x')) self.params = [self.W, self.b] class Conv_3d_Layer_nopad(object): def __init__(self, rng, input, filter_shape, image_shape, W=None, b=None): #signals_shape = (batchsize, in_time, in_channels, in_height, in_width) #filters_shape = (flt_channels, flt_time, in_channels, flt_height, flt_width) self.input=input assert image_shape[2] == filter_shape[2] # initialize weights with random weights fan_in = numpy.prod(filter_shape[2:]) fan_out = (filter_shape[0] * numpy.prod(filter_shape[3:])) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) if W is None: self.W = theano.shared(numpy.asarray( rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX), borrow=True) else: self.W = W if b is None: b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True) else: self.b = b conv_out5D = conv3d2d.conv3d(signals=input, filters=self.W, signals_shape=image_shape, filters_shape=filter_shape) # activation #out_4D = relu(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')) self.output = relu(conv_out5D + self.b.dimshuffle('x', 0, 'x', 'x')) # store parameters of this layer self.params = [self.W, self.b] class Dropout_Conv_3d_Layer(Conv_3d_Layer_nopad): def __init__(self, rng, input, filter_shape, image_shape, dropout_rate=0.5, W=None, b=None): super(Dropout_Conv_3d_Layer, self).__init__( rng=rng, input=input, filter_shape=filter_shape, image_shape=image_shape, W=W, b=b) self.output = _dropout_from_layer(rng, self.output, p=dropout_rate) class Pad_Conv_Pool(object): def __init__(self, rng, input, filter_shape, image_shape, W=None, b=None): assert image_shape[2] == filter_shape[2] fan_in = numpy.prod(filter_shape[2:]) fan_out = (filter_shape[0] * numpy.prod(filter_shape[3:])) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) if W is None: self.W = theano.shared(numpy.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),dtype=theano.config.floatX),borrow=True) else: self.W = W if b is None: b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True) else: self.b = b self.input=input pad_conv_layer = Conv_3d_Layer(rng=rng, input=input, filter_shape=filter_shape, image_shape=image_shape, W=self.W, b=self.b) pool_layer = PoolLayer3D(input=pad_conv_layer.output.dimshuffle(0,2,1,3,4), pool_shape=(2,2,2)) self.output = pool_layer.output.dimshuffle(0,2,1,3,4) self.params = [self.W, self.b] class LogisticRegression(object): def __init__(self, input, n_in, n_out, W=None, b=None): # initialize with 0 the weights W as a matrix of shape (n_in, n_out) if W is None: self.W = theano.shared( value=numpy.zeros((n_in, n_out), dtype=theano.config.floatX), name='W') else: self.W = W # initialize the baises b as a vector of n_out 0s if b is None: self.b = theano.shared( value=numpy.zeros((n_out,), dtype=theano.config.floatX), name='b') else: self.b = b self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) self.y_pred = T.argmax(self.p_y_given_x, axis=1) self.score = T.dot(input, self.W) + self.b self.params = [self.W, self.b] def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) def class_score(self, y): return (self.score)[0,y] def errors(self, y): if y.ndim != self.y_pred.ndim: raise TypeError( 'y should have the same shape as self.y_pred', ('y', y.type, 'y_pred', self.y_pred.type) ) # check if y is of the correct datatype if y.dtype.startswith('int'): # the T.neq operator returns a vector of 0s and 1s, where 1 # represents a mistake in prediction return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError() class HiddenLayer(object): def __init__(self, rng, input, n_in, n_out, W=None, b=None, activation=T.tanh): self.input = input if W is None: W_values = numpy.asarray( rng.uniform( low=-numpy.sqrt(6. / (n_in + n_out)), high=numpy.sqrt(6. / (n_in + n_out)), size=(n_in, n_out) ), dtype=theano.config.floatX ) if activation == theano.tensor.nnet.sigmoid: W_values *= 4 W = theano.shared(value=W_values, name='W', borrow=True) if b is None: b_values = numpy.zeros((n_out,), dtype=theano.config.floatX) b = theano.shared(value=b_values, name='b', borrow=True) self.W = W self.b = b lin_output = T.dot(input, self.W) + self.b self.output = ( lin_output if activation is None else activation(lin_output) ) # parameters of the model self.params = [self.W, self.b] from theano import tensor from theano.tensor.signal.downsample import DownsampleFactorMax def max_pool_3d(input, ds, ignore_border=False): if input.ndim < 3: raise NotImplementedError('max_pool_3d requires a dimension >= 3') # extract nr dimensions vid_dim = input.ndim # extract dimensions frame_shape = input.shape[-2:] # count the number of "leading" dimensions, store as dmatrix batch_size = tensor.prod(input.shape[:-2]) batch_size = tensor.shape_padright(batch_size,1) # store as 4D tensor with shape: (batch_size,1,height,width) new_shape = tensor.cast(tensor.join(0, batch_size, tensor.as_tensor([1,]), frame_shape), 'int32') input_4D = tensor.reshape(input, new_shape, ndim=4) # downsample mini-batch of videos in rows and cols op = DownsampleFactorMax((ds[1],ds[2]), ignore_border) output = op(input_4D) # restore to original shape outshape = tensor.join(0, input.shape[:-2], output.shape[-2:]) out = tensor.reshape(output, outshape, ndim=input.ndim) # output (time, rows, cols), reshape so that time is in the back shufl = (list(range(vid_dim-3)) + [vid_dim-2]+[vid_dim-1]+[vid_dim-3]) input_time = out.dimshuffle(shufl) # reset dimensions vid_shape = input_time.shape[-2:] # count the number of "leading" dimensions, store as dmatrix batch_size = tensor.prod(input_time.shape[:-2]) batch_size = tensor.shape_padright(batch_size,1) # store as 4D tensor with shape: (batch_size,1,width,time) new_shape = tensor.cast(tensor.join(0, batch_size, tensor.as_tensor([1,]), vid_shape), 'int32') input_4D_time = tensor.reshape(input_time, new_shape, ndim=4) # downsample mini-batch of videos in time op = DownsampleFactorMax((1,ds[0]), ignore_border) outtime = op(input_4D_time) # output # restore to original shape (xxx, rows, cols, time) outshape = tensor.join(0, input_time.shape[:-2], outtime.shape[-2:]) shufl = (list(range(vid_dim-3)) + [vid_dim-1]+[vid_dim-3]+[vid_dim-2]) return tensor.reshape(outtime, outshape, ndim=input.ndim).dimshuffle(shufl) class PoolLayer3D(object): """ Subsampling and pooling layer """ def __init__(self, input, pool_shape, method="max"): """ method: "max", "avg", "L2", "L4", ... """ self.__dict__.update(locals()) del self.self if method=="max": out = max_pool_3d(input,pool_shape) else: raise NotImplementedError() self.output = out