Implementation of 1-dimensional kCSD method in Python. This is a brutal port with minimal changes to the existing version.
Let's work with two gaussian sources - generate CSD and calculate potentials.
Potentials measured by an electrode at the height H from the tissue: ϕ(xe)=14πσ∫∞−∞I(x)dx√(x−xe)2+H2
%pylab inline
Populating the interactive namespace from numpy and matplotlib
WARNING: pylab import has clobbered these variables: ['csd'] `%matplotlib` prevents importing * from pylab and numpy
rcParams['figure.figsize'] = 14, 7
x = linspace(-4.0, 4.0, 400)
h = 0.7
H = 0.7
sigma = 0.1 #conductance
true_csd = 0.7*np.exp(-(x-0.6)**2 / sigma) - 0.9*np.exp(-(x+0.6)**2 / sigma)
fig, (ax1, ax2) = plt.subplots(1, 2, sharex=True)
ax1.plot(x, true_csd)
ax1.set_title('True CSD')
def calculate_pot(x, r):
return 1.0/(4*np.pi* sigma) * np.trapz( true_csd/np.sqrt( (x - r)**2 + H**2), x)
pots = [calculate_pot(x,r) for r in x]
ax2.plot(x, pots)
ax2.set_title('Potentials')
<matplotlib.text.Text at 0xf11c4d0>
Lets now insert 10 uniformly distributed electrodes and plot their readings.
elec_pos = [ h * i for i in xrange(-5,5)]
print elec_pos
meas_pot = [pots[int(200+r*50)] for r in elec_pos]
print len(meas_pot)
scatter(elec_pos, meas_pot)
print meas_pot
[-3.5, -2.8, -2.0999999999999996, -1.4, -0.7, 0.0, 0.7, 1.4, 2.0999999999999996, 2.8] 10 [-0.060020526557476174, -0.085156845172231349, -0.13366952756985112, -0.23511735830787805, -0.32986291502619441, -0.092151882151558534, 0.14900777994365988, 0.10477135691038296, 0.045569642538395544, 0.019849696932428045]
If we were now to calculate CSD using finite difference approximation, we would get:
csd = np.array([ -(meas_pot[i-1]+meas_pot[i+1] -2*meas_pot[i])/(elec_pos[i]-elec_pos[i-1])**2 for i in xrange(1, len(elec_pos)-1)])
plot(elec_pos[1:9], csd)
[<matplotlib.lines.Line2D at 0xf9e36d0>]
Travel through the existing Matlab code with partial Python adaptation.
Firstly we are setting some basic parameters.
n_sources = 200 #looks best when n_sources == dist_density
space_extension = 0.0
estimation_area = linspace(min(elec_pos), max(elec_pos), 200)
source_space = linspace(min(elec_pos)-space_extension, max(elec_pos)+space_extension, n_sources)
max_dist = max(estimation_area)-min(estimation_area)
#extension of the sources space
ext = 0
#define the positions of the basis functions
xmin = min(elec_pos) - ext
xmax = max(elec_pos) + ext
src = linspace(xmin, xmax,n_sources)
Calculating contributions of sources at different distances.
def pot_intarg(src, arg, current_pos, h, R, sigma, src_type):
# only gauss implemented! needs expansion for step basis functions
y=(1./(2*sigma))*(sqrt((arg-current_pos)**2+R**2)-abs(arg-current_pos))* gauss_rescale(src, current_pos, h);
# this is formula (8) from Pettersen et al., 2006
return y
def gauss_rescale(x, mi, three_sigma):
# INPUT
# r - radius of the point at which we calculate the density
# three_sigma - three times the std of the distribution
# OUTPUT
# gauss_rescale - value of a density proportional to
# - standard gaussian with std=three_sigma/3
#comment outdated
sigma_n2 = (three_sigma/3.0)**2
g = 1./sqrt(2.*pi*sigma_n2) * exp(-(1./(2.*sigma_n2))* (x-mi)**2) * (abs(x-mi)<three_sigma)
return g
We are integrating such a function:
f(x)=12σ(√(arg−curr_pos)2+R2−|arg−curr_pos|)⋅gauss_rescale(src, curr_pos, h)where Gauss rescale g(x,μ,3σ):
σn2=(3σ3)2
g(x)=1√2πσn2⋅exp(−(x−μ)22σn2⋅(|x−μ|<3σ))
It is important to state here that σ in the 3σ doesn't mean tissue conductivity. It is just a scaling constant.
Using the functions above we are creating a table with the potential generated by a single source base elements as a function of distance.
#make distance table
dist_density = 200
#!!!!!
h = 1*(elec_pos[1]-elec_pos[0])
R = 3*(0.5)
def b_pot_quad(src,arg,h,R,sigma,src_type):
if src_type == "gaussian":
x = linspace(src-4*h, src+4*h, 60) #manipulate the resolution
y = [pot_intarg(src, arg, current_pos, h, R, sigma, src_type) for current_pos in x]
z = np.trapz(y, x)
return z
def create_dist_table(dist_dens,dist_max,h,R,sigma,src_type):
dist_table=zeros(dist_dens+1)
for i in xrange(0,dist_dens+1):
dist_table[i] = b_pot_quad(0,(float(i)/dist_dens)*dist_max, h, R, sigma, src_type)
return dist_table
dt = create_dist_table(dist_density, max_dist, h, R, sigma, "gaussian")
#print dt
plot(dt)
[<matplotlib.lines.Line2D at 0xf70d910>]
We can adjust the number of points used to calculate the integrals in the dimensions where we assume the current distribution. Doing this we can balance between the accuracy and performance.
def bPotMatrixCalc(x, src, elPos, distTable, dist_dens):
"""
compute the matrix of potentials generated by every source basis function
at every electrode position. This procedure can be optimized
bPotMatrix(j,k) = b_j(x_k)
with x_k electrode position, b_j j-th basis source from the paper
"""
nEl = len(elPos);
nSrc = len(src);
distMax = max(x) - min(x)
bPotMatrix = zeros((nSrc, nEl))
for srcInd in xrange(0,nSrc):
currentSrc = src[srcInd];
for argInd in xrange(0,nEl):
arg = elPos[argInd]
r = abs(currentSrc - arg);
distTableInd = uint16(dist_dens *float(r)/distMax);
bPotMatrix[srcInd, argInd] = distTable[distTableInd];
return bPotMatrix
bpm = bPotMatrixCalc(estimation_area, source_space, elec_pos, dt, dist_density)
#print bpm
imshow(bpm, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0xfe69c20>
KPot=dot(transpose(bpm),bpm)
# Kernel matrix K from (2.14) from the paper
imshow(KPot, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x102b2290>
def bSrcMatrixCalc(X, src, h):
nSrc = len(src);
nGx = len(X);
bSrcMatrix = zeros((nGx, nSrc));
for currentSrc in xrange(0, nSrc):
bSrcMatrix[:, currentSrc] = gauss_rescale(X, src[currentSrc], h);
return bSrcMatrix
bSrcMatrix = bSrcMatrixCalc(estimation_area, source_space, h);
imshow(bSrcMatrix, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x1050da28>
Note: this is definitely a sparse and potentialy large matrix.
load_factor = float(count_nonzero(bSrcMatrix))/bSrcMatrix.size * 100
print "Just %f%% of elements of basis source matrix is nonzero." %load_factor
Just 21.235000% of elements of basis source matrix is nonzero.
Possibly some memory can be saved here, especially for bigger simulations.
interpCross = dot(bSrcMatrix, bpm)
imshow(interpCross, interpolation="none")
colorbar()
# cross-kernel matrix from (2.17) in the paper
<matplotlib.colorbar.Colorbar instance at 0x107f4fc8>
def bInterpPotMatrixCalc(X, src, distTable, dist_dens):
"""
compute the matrix of potentials generated by every source basis function
at every electrode position. This procedure can be optimized
bPotMatrix(j,k) = b_j(x_k)
with x_k electrode position, b_j j-th basis source from the paper
"""
npoints = len(X);
nSrc = len(src);
distMax = max(X) - min(X);
bInterpPotMatrix = zeros((nSrc, npoints));
for srcInd in xrange(0, nSrc):
currentSrc = src[srcInd]
for argInd in xrange(0, npoints):
arg = X[argInd];
r = abs(currentSrc - arg);
distTableInd = uint16(dist_dens*float(r)/distMax)-1;
bInterpPotMatrix[srcInd, argInd] = distTable[distTableInd];
return bInterpPotMatrix
distTable = dt
bInterpPotMatrix = bInterpPotMatrixCalc(estimation_area, source_space, distTable, dist_density);
imshow(bInterpPotMatrix, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x10cc07a0>
Unfortunately this matrix is dense.
load_factor = float(count_nonzero(bInterpPotMatrix))/bSrcMatrix.size * 100
print "%f%% of elements of basis source matrix is nonzero." %load_factor
100.000000% of elements of basis source matrix is nonzero.
InterpPot = dot(transpose(bInterpPotMatrix),bpm)
imshow(InterpPot, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x10f123b0>
Now give me my pots!
estimation_table = InterpPot;
lambd=0.0
KInv = linalg.inv(KPot + lambd *identity(sqrt(size(KPot))))
imshow(KInv, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x113abf80>
def estimate_pots():
estimation_table = InterpPot;
lambd=0.0
KInv = linalg.inv(KPot + lambd *identity(sqrt(size(KPot))))
beta = dot(KInv, meas_pot)
output = dot(estimation_table, beta)
return output
output = estimate_pots()
plot(linspace(min(elec_pos), max(elec_pos), len(output)), output)
plot(elec_pos, meas_pot) # measured potential
plot(x,pots) #real pot
legend((r'kCSD estimated potential', r'linearly interpolated potential', r'true potential'), shadow = True, loc = (0.01, 0.75))
<matplotlib.legend.Legend at 0x115b4550>
Now its time for CSD
estimation_table = interpCross;
KInv = linalg.inv(KPot + lambd *identity(KPot.shape[0]))
imshow(KInv, interpolation="none")
colorbar()
<matplotlib.colorbar.Colorbar instance at 0x11916050>
def estimate_csd():
estimation_table = interpCross;
KInv = linalg.inv(KPot + lambd *identity(KPot.shape[0]))
beta = dot(KInv, meas_pot)
output = dot(estimation_table, beta)
return output
output = estimate_csd()
plot(linspace(min(elec_pos), max(elec_pos), len(output)), output) #csd calculated with kCSD
plot(elec_pos[1:9], csd) #finite difference approximation
plot(x, true_csd)
legend((r'kCSD estimated CSD', r'finite difference approximation', r'true CSD'), shadow = True, loc = (0.01, 0.75))
<matplotlib.legend.Legend at 0x11c0fb10>
This is the result for λ = 0. Not very impressive yet, but the direction is good.
Now let's calculate the regularization parameter λ.
The λ parameter measures the importance of solution smoothness against the importance of the training data. Higher λ means that the smoothness is more important and the training data -- not so much.
lambdas = [1/(0.5*i)**2 for i in xrange(1,30)]
plot([0]*29, lambdas, '.')
yscale('log')
def calc_err(lambd, Pot, K, Ind_test, Ind_train, norm_order, subset, subset_size):
nr_of_frames = size(Pot,2);
if subset == 'all':
Time_ind = np.array(range(0,nr_of_frames-1))
if subset == 'part':
Time_ind = randperm(nr_of_frames,subset_size);
if subset == 'one':
diffs = max(Pot)-min(Pot);
Time_ind = find(diffs==max(diffs),1);
K_train = K(Ind_train, Ind_train);
Pot_train = Pot(Ind_train,Time_ind);
Pot_test = Pot(Ind_test,Time_ind);
I = identity(len(Ind_train))
beta = linalg.inv(K_train + lambd*I)*Pot_train;
K_cross = K(Ind_test, Ind_train);
Pot_est = K_cross * beta;
err = norm(Pot_test - Pot_est, norm_order);
return err
def get_choose_lambda_parameters(args): propertyArgIn = args;
while len(propertyArgIn) >= 2:
prop = propertyArgIn[0];
val = propertyArgIn[1];
propertyArgIn = propertyArgIn[2:];
if prop == 'n_folds':
n_folds = val;
elif prop == 'n_iter':
n_iter = val;
elif prop == 'sampling':
sampling = val;
else:
print 'no method defined for input: ', prop
print 'Available inputs: n_folds, n_iter, sampling, choose_R'
if n_folds is None:
#[n_el, ] = size(pots);
#n_folds = n_el;
n_folds = size(pots)
if n_iter is None:
n_iter = 1;
if sampling is None:
sampling = 1
return [n_folds, n_iter, sampling]
def cross_validation(lambd, pot, K, n_folds, Ind_perm, norm_order, subset, subset_size):
""" K - fold cross validation
preparing subsets of electrodes
"""
nEl = len(pot);
width = int(floor(nEl/n_folds))
Ind = [0 for i in xrange(n_folds)]
All_ind = range(0,nEl-1)
for i in xrange(0, n_folds-1):
Ind_set = np.array(range(0,int(width))) + width*(i-1) #check this out in Matlab
print 'Ind_perm:', Ind_perm
print 'Ind_set', Ind_set
#Ind_set_perm = [Ind_perm[z] for z in Ind_set]
Ind_set_perm = Ind_perm.copy()
Ind[i] = Ind_set_perm
last = max(Ind_set)
if last < nEl:
Ind[n_folds-1] = [Ind[n_folds-1], Ind_perm[last:nEl-1]]
errors = np.zeros(n_folds);
for i in xrange(0,n_folds-1):
Ind_test = Ind[i];
Ind_train = All_ind;
Ind_train[Ind[i]] =[];
errors[i] = calc_err(lambd, Pot, K, Ind_test, Ind_train, norm_order, subset, subset_size);
err = mean(errors);
return err
def calcCvError(varargin): """A cross-validation estimator of the error of the estimation. Used in methods that choose parameters through cross-validation. """ if varargin==None: n_folds = len(elec_pos); else: n_folds = varargin[0];
Ind_perm = np.random.permutation(range(len(elec_pos)))
err = cross_validation(lambd, pots, KPot, n_folds, Ind_perm, norm_order, subset, subset_size)
return err
def lambda_sampling_1(n_folds, n_iter): n = len(lambdas); errors = zeros(n); errors_iter = zeros(n_iter);
for i in xrange(n-1):
lambd = lambdas[i];
for j in xrange(0,n_iter-1):
errors_iter[j] = calcCvError([n_folds])
errors[i] = mean(errors_iter)
value = lambdas[np.where(errors == min(errors))[0][0]]
return value
def chooseLambda(varargin): """ Chooses the regularisation lambda parameter for ridge regression. The user can enter options by providing 'property_name', property_value pairs:
'n_folds' number of folds to perform Cross validation (CV)
'n_iter' number of iterations for the CV procedure
'sampling' ways of looking for the optimal lambda:
1 - simple sampling
2 - using fminbnd function
"""
[n_folds, n_iter, sampling] = get_choose_lambda_parameters(varargin)
lambd = lambda_sampling_1(n_folds, n_iter)
return lambd
import sklearn from sklearn.cross_validation import KFold sklearn.linear_model import RidgeCV
def chooseLambda(): validator = RidgeCV(alphas=lambdas, fit_intercept=True, normalize=False, scoring=None, score_func=None, loss_func=None, cv=None, gcv_mode=None, store_cv_values=True) lambd = 0.0 return lambd
norm_order = 2 subset = 'part' subset_size = 500 lambd = chooseLambda(['n_folds', None, 'n_iter', 2, 'sampling', None]) print 'lambda = ', lambd csd = estimate_csd() plot(x,true_csd) plot(linspace(min(elec_pos), max(elec_pos), len(output)),csd) lambd = 0.0 csd = estimate_csd() plot(linspace(min(elec_pos), max(elec_pos), len(output)),csd) legend((r'kCSD estimated CSD, λ=4.0', r'true CSD',r'kCSD estimated CSD, λ=0.0'), shadow = True, loc = (0.01, 0.75))
import sklearn
from sklearn.cross_validation import KFold X = np.array([[0., 0.], [1., 1.], [-1., -1.], [2., 2.]]) Y = np.array([0, 1, 0, 1])
kf = KFold(len(Y), n_folds=2, indices=False) print(kf)
sklearn.cross_validation.KFold(n=4, n_folds=2)
for train, test in kf:
... print("%s %s" % (train, test))