"""
.. module:: datas
:synopsis: Module for data extraction and preparation
.. moduleauthor:: Marc Javin
"""
import math
import numpy as np
import pandas as pd
import pylab as plt
import scipy as sp
from scipy import signal
from scipy.interpolate import splrep, splev
from scipy.signal import savgol_filter
from random import random as rd
DUMP_FILE= 'data'
plt.rc('ytick', labelsize=8) # fontsize of the tick labels
try:
df = pd.read_csv('data/AVAL1.csv').head(3100)
except:
df = pd.read_csv('../data/AVAL1.csv').head(3100)
[docs]def check_alpha(show=True):
"""study the hill equation
"""
d = df.head(1000)
t = d['timeVector']
i = d['inputCurrent']
trace = d['trace']
vals = np.logspace(math.log10(1.2), math.log10(20.), num=5)
idx=1
plt.subplot(6,1,1)
plt.plot(trace, 'r')
for alpha in vals:
idx += 1
k = 189.e-6
n = 3.8
bas = (-k*trace) / (trace - np.full(trace.shape, alpha))
bas[bas < 0] = 0
cac = np.power(bas, 1/n)
plt.subplot(6, 1, idx)
plt.plot(cac, label='$\\alpha$=%.2f'%alpha)
# z2 = savgol_filter(cac, 9, 3)
# plt.plot(z2, 'r', label='smooth')
plt.legend()
if (show):
plt.show()
plt.close()
[docs]def get_real_data_norm(file='data/AVAL{}.csv'):
df = pd.read_csv(file.format(1)).head(3100)
t = np.array(df['timeVector']) * 1000
trace = np.array(df['trace']) * 10
i = np.array(df['inputCurrent']) * 10
k = 189.e-6
n = 3.8
alpha = 2.
bas = (-k * trace) / (trace - np.full(trace.shape, alpha))
# bas[bas < 0] = 0
# plt.subplot(3,1,1)
# plt.plot(t, trace)
# plt.title('Calcium imaging data')
# cac = np.power(bas, 1 / n)
# plt.subplot(3,1,2)
# plt.plot(t, cac)
# plt.title('Calcium concentration')
# plt.subplot(3,1,3)
# plt.plot(t, i)
# plt.title('Input current')
# plt.show()
df2 = pd.read_csv(file.format(2)).head(3100)
trace2 = np.array(df2['trace']) * 10
bas = (-k * trace2) / (trace2 - np.full(trace2.shape, alpha))
bas[bas < 0] = 0
cac2 = np.power(bas, 1 / n)
t2 = np.array(df2['timeVector']) * 1000
i2 = np.array(df2['inputCurrent']) * 10
train = [t, i[:, np.newaxis], [None, trace[:, np.newaxis]]]
test = [t, i2[:, np.newaxis], [None, trace2[:, np.newaxis]]]
return train, test
[docs]def get_real_data(delta=500, final_time=4000., dt=0.2, show=False):
"""dump real data into our format
Args:
delta: (Default value = 500)
final_time: (Default value = 4000.)
dt(float): time step (Default value = 0.2)
Returns:
"""
# df = df.head(510)
trace = np.array(df['trace'])*10
unit_time = final_time/delta
t = sp.arange(0., final_time, dt)
i = np.array(df['inputCurrent']) * 10
intervals = [0, 0, 420, 1140, 2400]
curs = np.zeros((len(t), len(intervals)))
cas = np.zeros(curs.shape)
for j, st in enumerate(intervals):
td = np.arange(0., final_time, unit_time)
ca = trace[st:st + delta]
spl = splrep(td, ca, s=0.25)
s_ca = splev(t, spl)
spli = splrep(td, i[st:st+delta])
s_i = splev(t, spli)
plt.subplot(len(intervals), 1, j + 1)
plt.plot(td, ca)
plt.plot(t, s_ca)
curs[:,j] = s_i
cas[:,j] = s_ca
if(show):
plt.show()
plt.close()
train = [t, curs, [None, cas]]
t_all = sp.arange(0., len(trace)*unit_time, dt)
td_all = sp.arange(0., len(trace))*unit_time
spl = splrep(td_all, trace, s=0.25)
s_ca_all = splev(t_all, spl)
spli = splrep(td_all, i)
s_i_all = splev(t_all, spli)
plt.plot(td_all, trace)
plt.plot(t_all, s_ca_all)
if (show):
plt.show()
plt.close()
test = [t_all, s_i_all, [None, s_ca_all]]
return train, test
DT = 0.1
[docs]def give_train(dt=DT, nb_neuron_zero=None, max_t=1200.):
"""time and currents for optimization
Args:
dt(float): time step (Default value = DT)
nb_neuron_zero: (Default value = None)
max_t: (Default value = 1200.)
Returns:
"""
t = np.array(sp.arange(0.0, max_t, dt))
i = 10. * ((t > 100) & (t < 300)) + 20. * ((t > 400) & (t < 600)) + 40. * ((t > 800) & (t < 950))
i2 = 30. * ((t > 100) & (t < 500)) + 25. * ((t > 800) & (t < 900))
i3 = np.sum([(10. + (n * 2 / 100)) * ((t > n) & (t < n + 50)) for n in range(100, 1100, 100)], axis=0)
i4 = 15. * ((t > 400) & (t < 800))
i5 = (t - 450) * (8. / 550) * ((t > 100) & (t <= 1100))
i_fin = np.stack([i, i2, i3, i4, i5], axis=1)
i_noise = 0.1 * (np.random.rand(i_fin.shape[0], i_fin.shape[1]) - 0.5)
i_fin += i_noise
# plt.plot(i_noise[:,0])
# plt.show()
if nb_neuron_zero is not None:
i_zeros = np.zeros(i_fin.shape)
i_fin = np.stack([i_fin] + [i_zeros for _ in range(nb_neuron_zero)], axis=2)
return t, i_fin
[docs]def give_periodic(t, max_i, size, freq):
return np.sum([max_i * ((t > n) & (t < n + size)) for n in range(0, int(t[-1]), freq)], axis=0)
[docs]def give_train2(dt=DT):
t = np.array(sp.arange(0.0, 1200., dt))
b1 = 40. * t / 1200
b2 = -b1/2
b3 = 40. - b1
b4 = -b3/2
i_fin = np.stack([b1 * rd() + b2 * rd() + b3 * rd() + b4 * rd() + give_periodic(t, rd() * 15., rd()*200, int(rd() * 500)) +
give_periodic(t, rd() * 15., rd()*200, int(rd() * 500)) + give_periodic(t, rd() * 15., rd()*200, int(rd() * 500))
for _ in range(10)], axis=1)
return t, i_fin
[docs]def give_test(dt=DT, max_t=1200.):
"""time and currents for optimization
Args:
dt(float): time step (Default value = DT)
Returns:
"""
t = np.array(sp.arange(0.0, max_t, dt))
i1 = (t - 100) * (30. / 100) * ((t > 100) & (t <= 200)) + 30 * ((t > 200) & (t <= 1100)) - (t - 1000) * (
30. / 100) * ((t > 1000) & (t <= 1100))
i2 = 30. * ((t > 100) & (t < 300)) + 15. * ((t > 400) & (t < 500)) + 10. * ((t > 700) & (t < 1000))
i3 = (t - 600) * (1. / 500) * ((t > 100) & (t <= 600)) + (1100 - t) * (1. / 500) * (
(t > 600) & (t <= 1100))
i4 = np.sum([(30. - (n * 4 / 100)) * ((t > n) & (t < n + 50)) for n in range(100, 1100, 100)], axis=0)
i5 = signal.gaussian(len(t), std=len(t)/5) * 20.
i_fin = np.stack([i1, i2, i3, i4, i5], axis=1)
return t, i_fin
[docs]def full4(dt=DT, nb_neuron_zero=None, max_t=1200.):
t = np.array(sp.arange(0.0, max_t, dt))
i1 = 10. * ((t > 200) & (t < 600))
i2 = 10. * ((t > 300) & (t < 700))
i3 = 10. * ((t > 400) & (t < 800))
i4 = 10. * ((t > 500) & (t < 900))
is_ = np.stack([i1, i2, i3, i4], axis=1)
i1 = 20. * ((t > 400) & (t < 900))
i2 = 10. * ((t > 300) & (t < 700))
i3 = 30. * ((t > 200) & (t < 800))
i4 = 10. * ((t > 600) & (t < 800))
is_2 = np.stack([i1, i2, i3, i4], axis=1)
i1 = np.sum([10. * ((t > n) & (t < n + 100)) for n in range(70, 1100, 200)], axis=0)
i2 = np.sum([(10. + (n * 1 / 100)) * ((t > n) & (t < n + 50)) for n in range(100, 1100, 100)], axis=0)
i3 = np.sum([(22. - (n * 1 / 100)) * ((t > n) & (t < n + 50)) for n in range(120, 1100, 100)], axis=0)
i4 = np.sum([(10. + (n * 2 / 100)) * ((t > n) & (t < n + 20)) for n in range(100, 1100, 80)], axis=0)
is_3 = np.stack([i1, i2, i3, i4], axis=1)
i_fin = np.stack([is_, is_2, is_3], axis=1)
if nb_neuron_zero is not None:
i_zeros = np.zeros((len(t), i_fin.shape[1], nb_neuron_zero))
i_fin = np.append(i_fin, i_zeros, axis=2)
return t, i_fin
[docs]def full4_test(dt=DT, nb_neuron_zero=None, max_t=1200.):
t = np.array(sp.arange(0.0, max_t, dt))
i1 = 10. * ((t > 100) & (t < 200))
i2 = 10. * ((t > 500) & (t < 700))
i3 = 10. * ((t > 500) & (t < 800))
i4 = 10. * ((t > 700) & (t < 900))
is_ = np.stack([i1, i2, i3, i4], axis=1)
i1 = 20. * ((t > 100) & (t < 1000))
i2 = 10. * ((t > 900) & (t < 1000))
i3 = 30. * ((t > 300) & (t < 700))
i4 = 10. * ((t > 350) & (t < 950))
is_2 = np.stack([i1, i2, i3, i4], axis=1)
i1 = np.sum([5. * ((t > n) & (t < n + 100)) for n in range(70, 1100, 300)], axis=0)
i2 = np.sum([(15. + (n * 3 / 100)) * ((t > n) & (t < n + 20)) for n in range(100, 1100, 50)], axis=0)
i3 = np.sum([(12. - (n * 1 / 100)) * ((t > n) & (t < n + 50)) for n in range(120, 1100, 120)], axis=0)
i4 = np.sum([(7. + (n * 0.5 / 100)) * ((t > n) & (t < n + 70)) for n in range(100, 1100, 80)], axis=0)
is_3 = np.stack([i1, i2, i3, i4], axis=1)
i_fin = np.stack([is_, is_2, is_3], axis=1)
if nb_neuron_zero is not None:
i_zeros = np.zeros((len(t), i_fin.shape[1], nb_neuron_zero))
i_fin = np.append(i_fin, i_zeros, axis=2)
return t, i_fin
[docs]def test():
t, i = full4()
print(i.shape)
i_1 = np.zeros((len(t), i.shape[1], 6))
print(i_1.shape)
i = np.append(i, i_1, axis=2)
print(i.shape)
t_len = 5000.
t = np.array(sp.arange(0.0, t_len, DT))
i_inj = 10. * ((t > 100) & (t < 750)) + 20. * ((t > 1500) & (t < 2500)) + 40. * ((t > 3000) & (t < 4000))
v_inj = 115 * (t / t_len) - np.full(t.shape, 65)
v_inj_rev = np.full(t.shape, 50) - v_inj
i_inj = np.array(i_inj, dtype=np.float32)
t_test = np.array(sp.arange(0.0, 2000, DT))
i_test = 10. * ((t_test > 100) & (t_test < 300)) + 20. * ((t_test > 400) & (t_test < 600)) + 40. * (
(t_test > 800) & (t_test < 950)) + \
(t_test - 1200) * (50. / 500) * ((t_test > 1200) & (t_test < 1700))
if __name__ == '__main__':
# get_real_data_norm()
check_alpha()
# give_train2(0.5)
# exit(0)
#
# df = pd.read_csv('data/SMDBoxes.csv')
# df = df.head(1510)
# trace = np.array(df['Calcium_bc'])
# i = np.array(df['Input']) * 100
# tinit = np.array(df['time [s]']) * 1000
# t = np.arange(0, tinit[-1], step=2)
#
#
# t1 = time.time()
#
# # f = interp1d(tinit, l, kind='cubic')
# # z = f(t)
# t2 = time.time()
# print('lowess+interp : %s'%(t2-t1))
#
# t1 = time.time()
# exact = splrep(tinit, trace, k=1)
# spl = splrep(tinit, trace, s=0.25)
# zexact = splev(tinit, exact)
# z = splev(tinit, spl)
# t2 = time.time()
# print('splrep : %s' % (t2-t1))
#
# spli = splrep(tinit, i, k=2)
# i = splev(tinit, spli)
#
# plt.subplot(211)
# plt.plot(tinit, trace, 'r', label='trace')
# plt.plot(tinit, z, 'b', label='splrev')
# plt.legend()
# plt.subplot(212)
# plt.plot(i)
# plt.show()