% load_ext autoreload % autoreload 2 % matplotlib inline % load_ext cythonmagic % config InlineBackend.figure_format = 'svg' import matplotlib.pyplot as plt import numpy as np, matplotlib from cython_lstm.network import Network from cython_lstm.neuron import LogisticNeuron, TanhNeuron, SoftmaxNeuron from cython_lstm.layers import LoopLayer, SliceLayer, ActivationLayer, LinearLayer from cython_lstm.trainer import Trainer from cython_lstm.error import MSE, CategoricalCrossEntropy, BinaryCrossEntropy from cython_lstm.dataset import create_xor_dataset, create_digit_dataset import cython_lstm.network_viewer def test_net(): # create a test dataset xor_dataset, xor_labels = create_xor_dataset() # create a small network: net = Network(metric= BinaryCrossEntropy) print("Initialization OK") first_layer = LinearLayer(xor_dataset.shape[1], 6) activation_layer = ActivationLayer(LogisticNeuron) first_layer.connect_to(activation_layer) second_layer = LinearLayer(6, xor_labels.shape[1]) activation_layer.connect_to(second_layer) second_activation_layer = ActivationLayer(LogisticNeuron) second_layer.connect_to(second_activation_layer) net.add_layer(first_layer, input=True) net.add_layer(activation_layer) net.add_layer(second_layer) net.add_layer(activation_layer) net.add_layer(second_layer) net.add_layer(second_activation_layer, output=True) print("Construction OK") net.clear() print("Clearing OK") net.activate(xor_dataset) print("Activation OK") net.backpropagate(xor_labels) print("Backpropagation OK") for gparam, param in zip(net.get_gradients(), net.get_parameters()): assert(gparam.shape == param.shape), "Weight updates are not the same size" print("Updates and parameters shapes OK") trainer = Trainer(net, 0.3) print("Trainer OK") epochs = 2000 for epoch in range(epochs): er = trainer.train(xor_dataset, xor_labels) if epoch > 0 and epoch % 250 == 0: print("epoch %d, Error %.2f" % (epoch, er)) print("Training OK") net.clear() np.set_printoptions(precision=2) passed_predictions = [] for data, prediction, label in zip(xor_dataset, net.activate(xor_dataset), xor_labels): passed_predictions.append(np.allclose(prediction.round(), label)) print("%r => %r : %r" % (data.astype(np.float64), np.around(prediction.astype(np.float64), decimals=2), passed_predictions[-1])) if all(passed_predictions): print("Learning OK") test_net() def test_softmax_net(): # create a simple binary to decimal converter digit_dataset, digit_labels = create_digit_dataset() # create a small network: net = Network(metric = CategoricalCrossEntropy) print("Initialization OK") first_layer = LinearLayer(digit_dataset.shape[1], 3) first_layer_activation = ActivationLayer(LogisticNeuron) first_layer.connect_to(first_layer_activation) second_layer = LinearLayer(3, 11) # 0, 1, ..., 9, 10 first_layer_activation.connect_to(second_layer) second_layer_activation = ActivationLayer(SoftmaxNeuron) second_layer.connect_to(second_layer_activation) net.add_layer(first_layer, input=True) net.add_layer(first_layer_activation) net.add_layer(second_layer) net.add_layer(second_layer_activation, output=True) print("Construction OK") net.clear() print("Clearing OK") net.activate(digit_dataset) print("Activation OK") net.backpropagate(digit_labels) print("Backpropagation OK") for gparam, param in zip(net.get_gradients(), net.get_parameters()): assert(gparam.shape == param.shape), "Weight updates are not the same size" print("Updates and parameters shapes OK") trainer = Trainer(net, 0.01) print("Trainer OK") epochs = 2000 for epoch in range(epochs): er = trainer.train(digit_dataset, digit_labels) if epoch % 250 == 0: print("epoch %d, Error %.2f" % (epoch, er)) print("Training OK") net.clear() np.set_printoptions(precision=2) passed_predictions = [] plt.matshow(net.activate(digit_dataset), cmap = matplotlib.cm.Blues) plt.xticks(np.arange(0,11), [str(w) for w in list(np.arange(0,11))]) plt.yticks(np.arange(0,11), [str(datum) for datum in digit_dataset]) plt.title("Prediction distribution for decimals from binary codes") for data, prediction, label in zip(digit_dataset, net.activate(digit_dataset), digit_labels): passed_predictions.append(prediction.argmax() == label) print("%r => %r : %r" % (data.astype(np.float64), prediction.argmax(), passed_predictions[-1])) if all(passed_predictions): print("Learning OK") test_softmax_net() def test_tensor_net(): # create a simple binary to decimal converter digit_dataset, digit_labels = create_digit_dataset() # create a small network: net = Network(metric=CategoricalCrossEntropy) print("Initialization OK") first_layer = LinearLayer(digit_dataset.shape[1], 3) first_layer_activation = ActivationLayer(LogisticNeuron) first_layer.connect_to(first_layer_activation) second_layer = LinearLayer(3, 11, tensor=True) # 0, 1, ..., 9, 10 first_layer_activation.connect_to(second_layer) second_layer_activation = ActivationLayer(SoftmaxNeuron) second_layer.connect_to(second_layer_activation) net.add_layer(first_layer, input=True) net.add_layer(first_layer_activation) net.add_layer(second_layer) net.add_layer(second_layer_activation, output=True) print("Construction OK") net.clear() print("Clearing OK") net.activate(digit_dataset) print("Activation OK") net.backpropagate(digit_labels) print("Backpropagation OK") for gparam, param in zip(net.get_gradients(), net.get_parameters()): assert(gparam.shape == param.shape), "Weight updates are not the same size" print("Updates and parameters shapes OK") trainer = Trainer(net, 0.01) print("Trainer OK") epochs = 2000 for epoch in range(epochs): er = trainer.train(digit_dataset, digit_labels) if epoch % 250 == 0: print("epoch %d, Error %.2f" % (epoch, er)) print("Training OK") net.clear() np.set_printoptions(precision=2) passed_predictions = [] plt.matshow(net.activate(digit_dataset), cmap = matplotlib.cm.Blues) plt.xticks(np.arange(0,11), [str(w) for w in list(np.arange(0,11))]) plt.yticks(np.arange(0,11), [str(datum) for datum in digit_dataset]) plt.title("Prediction distribution for decimals from binary codes") for data, prediction, label in zip(digit_dataset, net.activate(digit_dataset), digit_labels): passed_predictions.append(prediction.argmax() == label) print("%r => %r : %r" % (data.astype(np.float64), prediction.argmax(), passed_predictions[-1])) if all(passed_predictions): print("Learning OK") test_tensor_net() def update_step(data, step): data[step,:,:] = data[step-1,:,:] data[step,:,0] += 1 for stream in range(data.shape[1]): if data[step,stream,0] > 1: data[step,stream,0] = 0 data[step,stream,1] += 1 if data[step,stream,1] > 1: data[step,stream,1] = 0 data[step,stream,2] += 1 if data[step,stream,2] > 1: data[step,stream,2] = 0 if data[step,stream,1] > 1: data[step,stream,1] = 0 data[step,stream,2] += 1 if data[step,stream,2] > 1: data[step,stream,2] = 0 if data[step,stream,2] > 1: data[step,stream,2] = 0 def binary_addition_data(TIMESTEPS = 20, DIFFERENT_OBSERVABLES = 3, OBSERVATION_DIMENSIONS = 3, NOISE_SIZE = 0.03): recurrent_data = np.zeros([TIMESTEPS, DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS], dtype=np.float32) start_step = np.random.randint(0, 1, size=(DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS)) def update_step(data, step): data[step,:,:] = data[step-1,:,:] data[step,:,0] += 1 for stream in range(data.shape[1]): if data[step,stream,0] > 1: data[step,stream,0] = 0 data[step,stream,1] += 1 if data[step,stream,1] > 1: data[step,stream,1] = 0 data[step,stream,2] += 1 if data[step,stream,2] > 1: data[step,stream,2] = 0 if data[step,stream,1] > 1: data[step,stream,1] = 0 data[step,stream,2] += 1 if data[step,stream,2] > 1: data[step,stream,2] = 0 if data[step,stream,2] > 1: data[step,stream,2] = 0 recurrent_data[0,:,:] = start_step for i in range(1, TIMESTEPS): update_step(recurrent_data, i) noisy_data = recurrent_data + NOISE_SIZE * np.random.standard_normal(recurrent_data.shape).astype(np.float32) return noisy_data, recurrent_data def one_trick_pony(network, temporal=False): print("Simple binary additions using network:") for num in range(0, 6): bin_repr = np.binary_repr(num)[::-1][:3] if len(bin_repr) < 3: bin_repr = bin_repr + (3 - len(bin_repr)) * "0" if temporal: bin_repr = np.array([[list(bin_repr)]]) else: bin_repr = np.array([list(bin_repr)]) print("%d + 1 ~= %d" % (num, sum(2 ** k if i > 0 else 0. for k, i in enumerate(network.activate(bin_repr)[0].round())))) def test_reccurent_net(): # Binary addition problem TIMESTEPS = 20 DIFFERENT_OBSERVABLES = 3 OBSERVATION_DIMENSIONS = 3 NOISE_SIZE = 0.03 noisy_data, recurrent_data = binary_addition_data(TIMESTEPS, DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS, NOISE_SIZE) HIDDEN_DIMENSIONS = 8 net = Network() input_layer = LinearLayer(OBSERVATION_DIMENSIONS, HIDDEN_DIMENSIONS) activ_layer = ActivationLayer(TanhNeuron) prediction_layer = LinearLayer(HIDDEN_DIMENSIONS, OBSERVATION_DIMENSIONS) output_layer = ActivationLayer(LogisticNeuron) input_layer.connect_to(activ_layer) activ_layer.connect_to(prediction_layer) prediction_layer.connect_to(output_layer) temporal_loop = LoopLayer(OBSERVATION_DIMENSIONS, input_layer) slice_layer = SliceLayer((-1,-1)) temporal_loop.connect_to(slice_layer) net.add_layer(temporal_loop, input=True) net.add_layer(slice_layer, output=True) net.set_error(BinaryCrossEntropy) net.activate(noisy_data[:-1,:,:]) net.backpropagate(recurrent_data[-1,:,:].astype(np.int32)) print("Backpropagation Through Time OK") for gparam, param in zip(net.get_gradients(), net.get_parameters()): assert(gparam.shape == param.shape), "Weight updates are not the same size" print("Updates and parameters shapes OK") trainer = Trainer(net, method="adadelta", rho=0.95) print("Trainer OK") epochs = 5000 subepochs = 10 print("before we start, here's the network's view of addition:") one_trick_pony(net, True) # use last time step for prediction # Note: if you look closely, this is a really poor # example, since we are showing many useless observations # and finally closing with one useful one for training # and context is not useful in this instance for prediction. er = 0. for epoch in range(epochs): for subepoch in range(subepochs): random_range_begin = np.random.randint(0, TIMESTEPS-5) random_range_end = random_range_begin + 1#np.random.randint(random_range_begin+4, TIMESTEPS) er += trainer.train(noisy_data[random_range_begin:random_range_end,:,:], recurrent_data[random_range_end,:,:].astype(np.int32)) if epoch > 0 and epoch % 1000 == 0: print("epoch %d, Error %.2f" % (epoch * subepochs, er)) er = 0. print("Training OK") one_trick_pony(net, True) return net def test_binary_addition_net(): # Binary addition problem TIMESTEPS = 200 DIFFERENT_OBSERVABLES = 10 OBSERVATION_DIMENSIONS = 3 NOISE_SIZE = 0.03 noisy_data, recurrent_data = binary_addition_data(TIMESTEPS, DIFFERENT_OBSERVABLES, OBSERVATION_DIMENSIONS, NOISE_SIZE) HIDDEN_DIMENSIONS = 8 net = Network() input_layer = LinearLayer(OBSERVATION_DIMENSIONS, HIDDEN_DIMENSIONS) activ_layer = ActivationLayer(TanhNeuron) prediction_layer = LinearLayer(HIDDEN_DIMENSIONS, OBSERVATION_DIMENSIONS) output_layer = ActivationLayer(LogisticNeuron) input_layer.connect_to(activ_layer) activ_layer.connect_to(prediction_layer) prediction_layer.connect_to(output_layer) net.add_layer(input_layer, input=True) net.add_layer(activ_layer) net.add_layer(prediction_layer) net.add_layer(output_layer, output=True) net.set_error(BinaryCrossEntropy) net.activate(noisy_data[0,:,:]) net.backpropagate(recurrent_data[1,:,:].astype(np.int32)) for gparam, param in zip(net.get_gradients(), net.get_parameters()): assert(gparam.shape == param.shape), "Weight updates are not the same size" print("Updates and parameters shapes OK") trainer = Trainer(net, method="adadelta", rho=0.95) print("Trainer OK") epochs = 5000 subepochs = 10 print("before we start, here's the network's view of addition:") one_trick_pony(net) er = 0. for epoch in range(epochs): for subepoch in range(subepochs): random_range_begin = np.random.randint(0, TIMESTEPS-5) random_range_end = random_range_begin +1 er += trainer.train(noisy_data[random_range_begin,:,:], recurrent_data[random_range_end,:,:].astype(np.int32)) if epoch > 0 and epoch % 1000 == 0: print("epoch %d, Error %.2f" % (epoch * subepochs, er)) er = 0. print("Training OK") one_trick_pony(net) return net flat_calculator_net = test_binary_addition_net() recurrent_net = test_reccurent_net() net = Network() first_layer = Layer(3, neuron=TanhNeuron) net.add_layer(first_layer, input=True) second_layer = Layer(5, 2, neuron=SoftmaxNeuron) net.add_layer(second_layer, output=True) first_layer.connect_to(second_layer) cython_lstm.network_viewer.draw(net) def topology_test(): # create a test dataset xor_dataset, xor_labels = create_xor_dataset() # create a small network: net = Network(metric = BinaryCrossEntropy) print("Initialization OK") first_layer = LinearLayer(xor_dataset.shape[1], 6) activation_layer = ActivationLayer(LogisticNeuron) first_layer.connect_to(activation_layer) second_input = cython_lstm.network.DataLayer() second_layer = LinearLayer(3, 6) second_input.connect_to(second_layer) third_layer = activation_layer + second_layer net.add_layer(first_layer, input=True) net.add_layer(activation_layer) net.add_layer(second_layer) net.add_layer(second_input) net.add_layer(third_layer, output=True) return net a = topology_test() [b.layer for b in a.topsort()]