import numpy import theano import theano.tensor as T from theano.tensor.signal import downsample from theano.tensor.nnet import conv from theano.tensor.nnet import conv3d2d from theano import tensor from theano.tensor.signal.downsample import DownsampleFactorMax def shared_dataset(data_x, data_y, borrow=True): shared_x = theano.shared(numpy.asarray(data_x, dtype=theano.config.floatX), borrow=borrow) shared_y = theano.shared(numpy.asarray(data_y, dtype=theano.config.floatX), borrow=borrow) return shared_x, T.cast(shared_y, 'int32') def relu(X): """Rectified linear units (relu)""" return T.maximum(0,X) def _dropout_from_layer(rng, layer, p): srng = theano.tensor.shared_randomstreams.RandomStreams( rng.randint(999999)) mask = srng.binomial(n=1, p=1-p, size=layer.shape) output = layer * T.cast(mask, theano.config.floatX) return output class Conv_3d_Layer(object): def __init__(self, rng, input, filter_shape, image_shape, W=None, b=None): #signals_shape = (batchsize, in_time, in_channels, in_height, in_width) #filters_shape = (flt_channels, flt_time, in_channels, flt_height, flt_width) self.input=input assert image_shape[2] == filter_shape[2] fan_in = numpy.prod(filter_shape[2:]) fan_out = (filter_shape[0] * numpy.prod(filter_shape[3:])) W_bound = numpy.sqrt(6. / (fan_in + fan_out)) if W is None: self.W = theano.shared(numpy.asarray( rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX), borrow=True, name='W') else: self.W = W if b is None: b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX) self.b = theano.shared(value=b_values, borrow=True, name='b') else: self.b = b conv_out5D = conv3d2d.conv3d(signals=input, filters=self.W, signals_shape=image_shape, filters_shape=filter_shape) self.output = relu(conv_out5D + self.b.dimshuffle('x', 0, 'x', 'x')) self.params = [self.W, self.b] class Dropout_Conv_3d_Layer(Conv_3d_Layer): def __init__(self, rng, input, filter_shape, image_shape, dropout_rate=0.5, W=None, b=None): super(Dropout_Conv_3d_Layer, self).__init__( rng=rng, input=input, filter_shape=filter_shape, image_shape=image_shape, W=W, b=b) self.output = _dropout_from_layer(rng, self.output, p=dropout_rate) class LogisticRegression(object): def __init__(self, input, n_in, n_out, W=None, b=None): # initialize with 0 the weights W as a matrix of shape (n_in, n_out) if W is None: self.W = theano.shared( value=numpy.zeros((n_in, n_out), dtype=theano.config.floatX), name='W') else: self.W = W # initialize the baises b as a vector of n_out 0s if b is None: self.b = theano.shared( value=numpy.zeros((n_out,), dtype=theano.config.floatX), name='b') else: self.b = b self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) self.score = T.dot(input, self.W) + self.b self.y_pred = T.argmax(self.p_y_given_x, axis=1) self.params = [self.W, self.b] def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) def class_score(self, y): return (self.score)[0,y] def errors(self, y): if y.ndim != self.y_pred.ndim: raise TypeError( 'y should have the same shape as self.y_pred', ('y', y.type, 'y_pred', self.y_pred.type) ) if y.dtype.startswith('int'): return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError() class HiddenLayer(object): def __init__(self, rng, input, n_in, n_out, W=None, b=None, activation=T.tanh): self.input = input if W is None: W_values = numpy.asarray( rng.uniform( low=-numpy.sqrt(6. / (n_in + n_out)), high=numpy.sqrt(6. / (n_in + n_out)), size=(n_in, n_out) ), dtype=theano.config.floatX ) if activation == theano.tensor.nnet.sigmoid: W_values *= 4 W = theano.shared(value=W_values, name='W', borrow=True) if b is None: b_values = numpy.zeros((n_out,), dtype=theano.config.floatX) b = theano.shared(value=b_values, name='b', borrow=True) self.W = W self.b = b lin_output = T.dot(input, self.W) + self.b self.output = ( lin_output if activation is None else activation(lin_output) ) # parameters of the model self.params = [self.W, self.b] class DropoutHiddenLayer(HiddenLayer): def __init__(self, rng, input, n_in, n_out, activation, dropout_rate, use_bias=True, W=None, b=None): super(DropoutHiddenLayer, self).__init__( rng=rng, input=input, n_in=n_in, n_out=n_out, W=W, b=b, activation=activation) self.output = _dropout_from_layer(rng, self.output, p=dropout_rate) def max_pool_3d(input, ds, ignore_border=False): if input.ndim < 3: raise NotImplementedError('max_pool_3d requires a dimension >= 3') # extract nr dimensions vid_dim = input.ndim frame_shape = input.shape[-2:] batch_size = tensor.prod(input.shape[:-2]) batch_size = tensor.shape_padright(batch_size,1) new_shape = tensor.cast(tensor.join(0, batch_size, tensor.as_tensor([1,]), frame_shape), 'int32') input_4D = tensor.reshape(input, new_shape, ndim=4) op = DownsampleFactorMax((ds[1],ds[2]), ignore_border) output = op(input_4D) # restore to original shape outshape = tensor.join(0, input.shape[:-2], output.shape[-2:]) out = tensor.reshape(output, outshape, ndim=input.ndim) shufl = (list(range(vid_dim-3)) + [vid_dim-2]+[vid_dim-1]+[vid_dim-3]) input_time = out.dimshuffle(shufl) # reset dimensions vid_shape = input_time.shape[-2:] batch_size = tensor.prod(input_time.shape[:-2]) batch_size = tensor.shape_padright(batch_size,1) new_shape = tensor.cast(tensor.join(0, batch_size, tensor.as_tensor([1,]), vid_shape), 'int32') input_4D_time = tensor.reshape(input_time, new_shape, ndim=4) op = DownsampleFactorMax((1,ds[0]), ignore_border) outtime = op(input_4D_time) outshape = tensor.join(0, input_time.shape[:-2], outtime.shape[-2:]) shufl = (list(range(vid_dim-3)) + [vid_dim-1]+[vid_dim-3]+[vid_dim-2]) return tensor.reshape(outtime, outshape, ndim=input.ndim).dimshuffle(shufl) class PoolLayer3D(object): def __init__(self, input, pool_shape, method="max"): self.__dict__.update(locals()) del self.self if method=="max": out = max_pool_3d(input,pool_shape) else: raise NotImplementedError() self.output = out