% load_ext autoreload
% autoreload 2
% matplotlib inline
% load_ext cythonmagic
% config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
import numpy as np, matplotlib
from cython_lstm.network import Network
from cython_lstm.neuron import LogisticNeuron, TanhNeuron, SoftmaxNeuron
from cython_lstm.layers import LoopLayer, SliceLayer, ActivationLayer, LinearLayer
from cython_lstm.trainer import Trainer
from cython_lstm.error import MSE, CategoricalCrossEntropy, BinaryCrossEntropy
from cython_lstm.dataset import create_xor_dataset, create_digit_dataset
import cython_lstm.network_viewer
def test_net():
# create a test dataset
xor_dataset, xor_labels = create_xor_dataset()
# create a small network:
net = Network(metric= BinaryCrossEntropy)
print("Initialization OK")
first_layer = LinearLayer(xor_dataset.shape[1], 6)
activation_layer = ActivationLayer(LogisticNeuron)
first_layer.connect_to(activation_layer)
second_layer = LinearLayer(6, xor_labels.shape[1])
activation_layer.connect_to(second_layer)
second_activation_layer = ActivationLayer(LogisticNeuron)
second_layer.connect_to(second_activation_layer)
net.add_layer(first_layer, input=True)
net.add_layer(activation_layer)
net.add_layer(second_layer)
net.add_layer(activation_layer)
net.add_layer(second_layer)
net.add_layer(second_activation_layer, output=True)
print("Construction OK")
net.clear()
print("Clearing OK")
net.activate(xor_dataset)
print("Activation OK")
net.backpropagate(xor_labels)
print("Backpropagation OK")
for gparam, param in zip(net.get_gradients(), net.get_parameters()):
assert(gparam.shape == param.shape), "Weight updates are not the same size"
print("Updates and parameters shapes OK")
trainer = Trainer(net, 0.3)
print("Trainer OK")
epochs = 2000
for epoch in range(epochs):
er = trainer.train(xor_dataset, xor_labels)
if epoch > 0 and epoch % 250 == 0:
print("epoch %d, Error %.2f" % (epoch, er))
print("Training OK")
net.clear()
np.set_printoptions(precision=2)
passed_predictions = []
for data, prediction, label in zip(xor_dataset, net.activate(xor_dataset), xor_labels):
passed_predictions.append(np.allclose(prediction.round(), label))
print("%r => %r : %r" % (data.astype(np.float64), np.around(prediction.astype(np.float64), decimals=2), passed_predictions[-1]))
if all(passed_predictions):
print("Learning OK")
test_net()
Initialization OK Construction OK Clearing OK Activation OK Backpropagation OK Updates and parameters shapes OK Trainer OK epoch 250, Error 5.53 epoch 500, Error 0.93 epoch 750, Error 0.39 epoch 1000, Error 0.28 epoch 1250, Error 0.22 epoch 1500, Error 0.19 epoch 1750, Error 0.17 Training OK array([ 0., 0.]) => array([ 0.02, 0.98]) : True array([ 0., 1.]) => array([ 0.98, 0.02]) : True array([ 1., 0.]) => array([ 0.98, 0.02]) : True array([ 1., 1.]) => array([ 0.02, 0.98]) : True Learning OK
def test_softmax_net():
# create a simple binary to decimal converter
digit_dataset, digit_labels = create_digit_dataset()
# create a small network:
net = Network(metric = CategoricalCrossEntropy)
print("Initialization OK")
first_layer = LinearLayer(digit_dataset.shape[1], 3)
first_layer_activation = ActivationLayer(LogisticNeuron)
first_layer.connect_to(first_layer_activation)
second_layer = LinearLayer(3, 11) # 0, 1, ..., 9, 10
first_layer_activation.connect_to(second_layer)
second_layer_activation = ActivationLayer(SoftmaxNeuron)
second_layer.connect_to(second_layer_activation)
net.add_layer(first_layer, input=True)
net.add_layer(first_layer_activation)
net.add_layer(second_layer)
net.add_layer(second_layer_activation, output=True)
print("Construction OK")
net.clear()
print("Clearing OK")
net.activate(digit_dataset)
print("Activation OK")
net.backpropagate(digit_labels)
print("Backpropagation OK")
for gparam, param in zip(net.get_gradients(), net.get_parameters()):
assert(gparam.shape == param.shape), "Weight updates are not the same size"
print("Updates and parameters shapes OK")
trainer = Trainer(net, 0.01)
print("Trainer OK")
epochs = 2000
for epoch in range(epochs):
er = trainer.train(digit_dataset, digit_labels)
if epoch % 250 == 0:
print("epoch %d, Error %.2f" % (epoch, er))
print("Training OK")
net.clear()
np.set_printoptions(precision=2)
passed_predictions = []
plt.matshow(net.activate(digit_dataset), cmap = matplotlib.cm.Blues)
plt.xticks(np.arange(0,11), [str(w) for w in list(np.arange(0,11))])
plt.yticks(np.arange(0,11), [str(datum) for datum in digit_dataset])
plt.title("Prediction distribution for decimals from binary codes")
for data, prediction, label in zip(digit_dataset, net.activate(digit_dataset), digit_labels):
passed_predictions.append(prediction.argmax() == label)
print("%r => %r : %r" % (data.astype(np.float64), prediction.argmax(), passed_predictions[-1]))
if all(passed_predictions):
print("Learning OK")
test_softmax_net()
Initialization OK Construction OK Clearing OK Activation OK Backpropagation OK Updates and parameters shapes OK Trainer OK epoch 0, Error 27.20 epoch 250, Error 0.63 epoch 500, Error 0.28 epoch 750, Error 0.18 epoch 1000, Error 0.13 epoch 1250, Error 0.10 epoch 1500, Error 0.08 epoch 1750, Error 0.07 Training OK array([ 0., 0., 0., 0.]) => 0 : True array([ 0., 0., 0., 1.]) => 1 : True array([ 0., 0., 1., 0.]) => 2 : True array([ 0., 0., 1., 1.]) => 3 : True array([ 0., 1., 0., 0.]) => 4 : True array([ 0., 1., 0., 1.]) => 5 : True array([ 0., 1., 1., 0.]) => 6 : True array([ 0., 1., 1., 1.]) => 7 : True array([ 1., 0., 0., 0.]) => 8 : True array([ 1., 0., 0., 1.]) => 9 : True array([ 1., 0., 1., 0.]) => 10 : True Learning OK
def test_tensor_net():
# create a simple binary to decimal converter
digit_dataset, digit_labels = create_digit_dataset()
# create a small network:
net = Network(metric=CategoricalCrossEntropy)
print("Initialization OK")
first_layer = LinearLayer(digit_dataset.shape[1], 3)
first_layer_activation = ActivationLayer(LogisticNeuron)
first_layer.connect_to(first_layer_activation)
second_layer = LinearLayer(3, 11, tensor=True) # 0, 1, ..., 9, 10
first_layer_activation.connect_to(second_layer)
second_layer_activation = ActivationLayer(SoftmaxNeuron)
second_layer.connect_to(second_layer_activation)
net.add_layer(first_layer, input=True)
net.add_layer(first_layer_activation)
net.add_layer(second_layer)
net.add_layer(second_layer_activation, output=True)
print("Construction OK")
net.clear()
print("Clearing OK")
net.activate(digit_dataset)
print("Activation OK")
net.backpropagate(digit_labels)
print("Backpropagation OK")
for gparam, param in zip(net.get_gradients(), net.get_parameters()):
assert(gparam.shape == param.shape), "Weight updates are not the same size"
print("Updates and parameters shapes OK")
trainer = Trainer(net, 0.01)
print("Trainer OK")
epochs = 2000
for epoch in range(epochs):
er = trainer.train(digit_dataset, digit_labels)
if epoch % 250 == 0:
print("epoch %d, Error %.2f" % (epoch, er))
print("Training OK")
net.clear()
np.set_printoptions(precision=2)
passed_predictions = []
plt.matshow(net.activate(digit_dataset), cmap = matplotlib.cm.Blues)
plt.xticks(np.arange(0,11), [str(w) for w in list(np.arange(0,11))])
plt.yticks(np.arange(0,11), [str(datum) for datum in digit_dataset])
plt.title("Prediction distribution for decimals from binary codes")
for data, prediction, label in zip(digit_dataset, net.activate(digit_dataset), digit_labels):
passed_predictions.append(prediction.argmax() == label)
print("%r => %r : %r" % (data.astype(np.float64), prediction.argmax(), passed_predictions[-1]))
if all(passed_predictions):
print("Learning OK")
test_tensor_net()
Initialization OK Construction OK Clearing OK Activation OK Backpropagation OK Updates and parameters shapes OK Trainer OK epoch 0, Error 27.47 epoch 250, Error 0.13 epoch 500, Error 0.06 epoch 750, Error 0.04 epoch 1000, Error 0.03 epoch 1250, Error 0.02 epoch 1500, Error 0.02 epoch 1750, Error 0.02 Training OK array([ 0., 0., 0., 0.]) => 0 : True array([ 0., 0., 0., 1.]) => 1 : True array([ 0., 0., 1., 0.]) => 2 : True array([ 0., 0., 1., 1.]) => 3 : True array([ 0., 1., 0., 0.]) => 4 : True array([ 0., 1., 0., 1.]) => 5 : True array([ 0., 1., 1., 0.]) => 6 : True array([ 0., 1., 1., 1.]) => 7 : True array([ 1., 0., 0., 0.]) => 8 : True array([ 1., 0., 0., 1.]) => 9 : True array([ 1., 0., 1., 0.]) => 10 : True Learning OK
def update_step(data, step):
data[step,:,:] = data[step-1,:,:]
data[step,:,0] += 1
for stream in range(data.shape[1]):
if data[step,stream,0] > 1:
data[step,stream,0] = 0
data[step,stream,1] += 1
if data[step,stream,1] > 1:
data[step,stream,1] = 0
data[step,stream,2] += 1
if data[step,stream,2] > 1:
data[step,stream,2] = 0
if data[step,stream,1] > 1:
data[step,stream,1] = 0
data[step,stream,2] += 1
if data[step,stream,2] > 1:
data[step,stream,2] = 0
if data[step,stream,2] > 1:
data[step,stream,2] = 0
def binary_addition_data(TIMESTEPS = 20,
DIFFERENT_OBSERVABLES = 3,
OBSERVATION_DIMENSIONS = 3,
NOISE_SIZE = 0.03):
recurrent_data = np.zeros([TIMESTEPS, DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS], dtype=np.float32)
start_step = np.random.randint(0, 1, size=(DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS))
def update_step(data, step):
data[step,:,:] = data[step-1,:,:]
data[step,:,0] += 1
for stream in range(data.shape[1]):
if data[step,stream,0] > 1:
data[step,stream,0] = 0
data[step,stream,1] += 1
if data[step,stream,1] > 1:
data[step,stream,1] = 0
data[step,stream,2] += 1
if data[step,stream,2] > 1:
data[step,stream,2] = 0
if data[step,stream,1] > 1:
data[step,stream,1] = 0
data[step,stream,2] += 1
if data[step,stream,2] > 1:
data[step,stream,2] = 0
if data[step,stream,2] > 1:
data[step,stream,2] = 0
recurrent_data[0,:,:] = start_step
for i in range(1, TIMESTEPS):
update_step(recurrent_data, i)
noisy_data = recurrent_data + NOISE_SIZE * np.random.standard_normal(recurrent_data.shape).astype(np.float32)
return noisy_data, recurrent_data
def one_trick_pony(network, temporal=False):
print("Simple binary additions using network:")
for num in range(0, 6):
bin_repr = np.binary_repr(num)[::-1][:3]
if len(bin_repr) < 3:
bin_repr = bin_repr + (3 - len(bin_repr)) * "0"
if temporal:
bin_repr = np.array([[list(bin_repr)]])
else:
bin_repr = np.array([list(bin_repr)])
print("%d + 1 ~= %d" % (num, sum(2 ** k if i > 0 else 0. for k, i in enumerate(network.activate(bin_repr)[0].round()))))
def test_reccurent_net():
# Binary addition problem
TIMESTEPS = 20
DIFFERENT_OBSERVABLES = 3
OBSERVATION_DIMENSIONS = 3
NOISE_SIZE = 0.03
noisy_data, recurrent_data = binary_addition_data(TIMESTEPS,
DIFFERENT_OBSERVABLES,
OBSERVATION_DIMENSIONS,
NOISE_SIZE)
HIDDEN_DIMENSIONS = 8
net = Network()
input_layer = LinearLayer(OBSERVATION_DIMENSIONS, HIDDEN_DIMENSIONS)
activ_layer = ActivationLayer(TanhNeuron)
prediction_layer = LinearLayer(HIDDEN_DIMENSIONS, OBSERVATION_DIMENSIONS)
output_layer = ActivationLayer(LogisticNeuron)
input_layer.connect_to(activ_layer)
activ_layer.connect_to(prediction_layer)
prediction_layer.connect_to(output_layer)
temporal_loop = LoopLayer(OBSERVATION_DIMENSIONS, input_layer)
slice_layer = SliceLayer((-1,-1))
temporal_loop.connect_to(slice_layer)
net.add_layer(temporal_loop, input=True)
net.add_layer(slice_layer, output=True)
net.set_error(BinaryCrossEntropy)
net.activate(noisy_data[:-1,:,:])
net.backpropagate(recurrent_data[-1,:,:].astype(np.int32))
print("Backpropagation Through Time OK")
for gparam, param in zip(net.get_gradients(), net.get_parameters()):
assert(gparam.shape == param.shape), "Weight updates are not the same size"
print("Updates and parameters shapes OK")
trainer = Trainer(net, method="adadelta", rho=0.95)
print("Trainer OK")
epochs = 5000
subepochs = 10
print("before we start, here's the network's view of addition:")
one_trick_pony(net, True)
# use last time step for prediction
# Note: if you look closely, this is a really poor
# example, since we are showing many useless observations
# and finally closing with one useful one for training
# and context is not useful in this instance for prediction.
er = 0.
for epoch in range(epochs):
for subepoch in range(subepochs):
random_range_begin = np.random.randint(0, TIMESTEPS-5)
random_range_end = random_range_begin + 1#np.random.randint(random_range_begin+4, TIMESTEPS)
er += trainer.train(noisy_data[random_range_begin:random_range_end,:,:], recurrent_data[random_range_end,:,:].astype(np.int32))
if epoch > 0 and epoch % 1000 == 0:
print("epoch %d, Error %.2f" % (epoch * subepochs, er))
er = 0.
print("Training OK")
one_trick_pony(net, True)
return net
def test_binary_addition_net():
# Binary addition problem
TIMESTEPS = 200
DIFFERENT_OBSERVABLES = 10
OBSERVATION_DIMENSIONS = 3
NOISE_SIZE = 0.03
noisy_data, recurrent_data = binary_addition_data(TIMESTEPS,
DIFFERENT_OBSERVABLES,
OBSERVATION_DIMENSIONS,
NOISE_SIZE)
HIDDEN_DIMENSIONS = 8
net = Network()
input_layer = LinearLayer(OBSERVATION_DIMENSIONS, HIDDEN_DIMENSIONS)
activ_layer = ActivationLayer(TanhNeuron)
prediction_layer = LinearLayer(HIDDEN_DIMENSIONS, OBSERVATION_DIMENSIONS)
output_layer = ActivationLayer(LogisticNeuron)
input_layer.connect_to(activ_layer)
activ_layer.connect_to(prediction_layer)
prediction_layer.connect_to(output_layer)
net.add_layer(input_layer, input=True)
net.add_layer(activ_layer)
net.add_layer(prediction_layer)
net.add_layer(output_layer, output=True)
net.set_error(BinaryCrossEntropy)
net.activate(noisy_data[0,:,:])
net.backpropagate(recurrent_data[1,:,:].astype(np.int32))
for gparam, param in zip(net.get_gradients(), net.get_parameters()):
assert(gparam.shape == param.shape), "Weight updates are not the same size"
print("Updates and parameters shapes OK")
trainer = Trainer(net, method="adadelta", rho=0.95)
print("Trainer OK")
epochs = 5000
subepochs = 10
print("before we start, here's the network's view of addition:")
one_trick_pony(net)
er = 0.
for epoch in range(epochs):
for subepoch in range(subepochs):
random_range_begin = np.random.randint(0, TIMESTEPS-5)
random_range_end = random_range_begin +1
er += trainer.train(noisy_data[random_range_begin,:,:], recurrent_data[random_range_end,:,:].astype(np.int32))
if epoch > 0 and epoch % 1000 == 0:
print("epoch %d, Error %.2f" % (epoch * subepochs, er))
er = 0.
print("Training OK")
one_trick_pony(net)
return net
flat_calculator_net = test_binary_addition_net()
Updates and parameters shapes OK Trainer OK before we start, here's the network's view of addition: Simple binary additions using network: 0 + 1 ~= 6 1 + 1 ~= 6 2 + 1 ~= 6 3 + 1 ~= 6 4 + 1 ~= 6 5 + 1 ~= 6 epoch 10000, Error 35194.14 epoch 20000, Error 792.28 epoch 30000, Error 527.60 epoch 40000, Error 426.45 Training OK Simple binary additions using network: 0 + 1 ~= 1 1 + 1 ~= 2 2 + 1 ~= 3 3 + 1 ~= 4 4 + 1 ~= 5 5 + 1 ~= 6
recurrent_net = test_reccurent_net()
Backpropagation Through Time OK Updates and parameters shapes OK Trainer OK before we start, here's the network's view of addition: Simple binary additions using network: 0 + 1 ~= 0 1 + 1 ~= 0 2 + 1 ~= 0 3 + 1 ~= 0 4 + 1 ~= 0 5 + 1 ~= 0 epoch 10000, Error 12438.66 epoch 20000, Error 701.64 epoch 30000, Error 329.03 epoch 40000, Error 251.96 Training OK Simple binary additions using network: 0 + 1 ~= 1 1 + 1 ~= 2 2 + 1 ~= 3 3 + 1 ~= 4 4 + 1 ~= 5 5 + 1 ~= 6
net = Network()
first_layer = Layer(3, neuron=TanhNeuron)
net.add_layer(first_layer, input=True)
second_layer = Layer(5, 2, neuron=SoftmaxNeuron)
net.add_layer(second_layer, output=True)
first_layer.connect_to(second_layer)
cython_lstm.network_viewer.draw(net)
Prevent parameter duplication by adding uuids to each, and using sets to identify them (at least initially)
Slice Layer, and add layer are clunky ways of achieving simple things. These should implicity act on the layer and connect it.
In doing so the network should also implicitly gobble up all the resulting layers into a coherent whole.
Optimization is then a matter of preventing the saving of the non transformed input of a linear layer, and converting all these classes either to Numba or Cython for compilation to avoid doing all sorts of polymorphic checks everywhere.
Implement RNN with memory
Implement gate unit by overloading the __add__
operator
Implement the loop layer in Cython with nogil where possible
def topology_test():
# create a test dataset
xor_dataset, xor_labels = create_xor_dataset()
# create a small network:
net = Network(metric = BinaryCrossEntropy)
print("Initialization OK")
first_layer = LinearLayer(xor_dataset.shape[1], 6)
activation_layer = ActivationLayer(LogisticNeuron)
first_layer.connect_to(activation_layer)
second_input = cython_lstm.network.DataLayer()
second_layer = LinearLayer(3, 6)
second_input.connect_to(second_layer)
third_layer = activation_layer + second_layer
net.add_layer(first_layer, input=True)
net.add_layer(activation_layer)
net.add_layer(second_layer)
net.add_layer(second_input)
net.add_layer(third_layer, output=True)
return net
a = topology_test()
Initialization OK
[b.layer for b in a.topsort()]
[<cython_lstm.network.DataLayer at 0x110fbed30>, <LinearLayer {'output_size': 6, 'activation': '', 'input_size': 3}>, <cython_lstm.network.DataLayer at 0x110f78a20>, <LinearLayer {'output_size': 6, 'activation': '', 'input_size': 2}>, <ActivationLayer {'output_size': '', 'activation': 'Sigmoid', 'input_size': ''}>, <ElementWiseSum {'output_size': '', 'activation': '', 'input_size': ''}>]