Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 804bf1dd8d | |||
| ee0fa21e21 | |||
| 11e1752c4f | |||
| a4709194cb | |||
| aba3a50428 | |||
| 2dc54c791d | |||
| 3f77a9352f | |||
| 764680163c | |||
| bdb36bc4a4 | |||
| b0df9d73b1 |
@@ -0,0 +1,246 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
import numpy as np
|
||||
import wave
|
||||
import os
|
||||
import math
|
||||
from scipy.fftpack.realtransforms import dct
|
||||
from copy import deepcopy
|
||||
from scipy.fftpack import fft, ifft
|
||||
from scikits.talkbox.linpred import lpc
|
||||
np.random.seed(1337)
|
||||
epsilon = 0.0000000001
|
||||
|
||||
|
||||
def build_data(wav, begin=None, end=None):
|
||||
wav_in_file = wave.Wave_read(wav)
|
||||
wav_in_num_samples = wav_in_file.getnframes()
|
||||
N = wav_in_file.getnframes()
|
||||
dstr = wav_in_file.readframes(N)
|
||||
data = np.fromstring(dstr, np.int16)
|
||||
return data
|
||||
|
||||
|
||||
def periodogram(x, nfft=None, fs=1):
|
||||
"""Compute the periodogram of the given signal, with the given fft size.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x : array-like
|
||||
input signal
|
||||
nfft : int
|
||||
size of the fft to compute the periodogram. If None (default), the
|
||||
length of the signal is used. if nfft > n, the signal is 0 padded.
|
||||
fs : float
|
||||
Sampling rate. By default, is 1 (normalized frequency. e.g. 0.5 is the
|
||||
Nyquist limit).
|
||||
|
||||
Returns
|
||||
-------
|
||||
pxx : array-like
|
||||
The psd estimate.
|
||||
fgrid : array-like
|
||||
Frequency grid over which the periodogram was estimated.
|
||||
|
||||
Examples
|
||||
--------
|
||||
Generate a signal with two sinusoids, and compute its periodogram:
|
||||
|
||||
>>> fs = 1000
|
||||
>>> x = np.sin(2 * np.pi * 0.1 * fs * np.linspace(0, 0.5, 0.5*fs))
|
||||
>>> x += np.sin(2 * np.pi * 0.2 * fs * np.linspace(0, 0.5, 0.5*fs))
|
||||
>>> px, fx = periodogram(x, 512, fs)
|
||||
|
||||
Notes
|
||||
-----
|
||||
Only real signals supported for now.
|
||||
|
||||
Returns the one-sided version of the periodogram.
|
||||
|
||||
Discrepency with matlab: matlab compute the psd in unit of power / radian /
|
||||
sample, and we compute the psd in unit of power / sample: to get the same
|
||||
result as matlab, just multiply the result from talkbox by 2pi"""
|
||||
x = np.atleast_1d(x)
|
||||
n = x.size
|
||||
|
||||
if x.ndim > 1:
|
||||
raise ValueError("Only rank 1 input supported for now.")
|
||||
if not np.isrealobj(x):
|
||||
raise ValueError("Only real input supported for now.")
|
||||
if not nfft:
|
||||
nfft = n
|
||||
if nfft < n:
|
||||
raise ValueError("nfft < signal size not supported yet")
|
||||
|
||||
pxx = np.abs(fft(x, nfft)) ** 2
|
||||
if nfft % 2 == 0:
|
||||
pn = nfft / 2 + 1
|
||||
else:
|
||||
pn = (nfft + 1) / 2
|
||||
|
||||
fgrid = np.linspace(0, fs * 0.5, pn)
|
||||
return pxx[:pn] / (n * fs), fgrid
|
||||
|
||||
|
||||
def arspec(x, order, nfft=None, fs=1):
|
||||
"""Compute the spectral density using an AR model.
|
||||
|
||||
An AR model of the signal is estimated through the Yule-Walker equations;
|
||||
the estimated AR coefficient are then used to compute the spectrum, which
|
||||
can be computed explicitely for AR models.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x : array-like
|
||||
input signal
|
||||
order : int
|
||||
Order of the LPC computation.
|
||||
nfft : int
|
||||
size of the fft to compute the periodogram. If None (default), the
|
||||
length of the signal is used. if nfft > n, the signal is 0 padded.
|
||||
fs : float
|
||||
Sampling rate. By default, is 1 (normalized frequency. e.g. 0.5 is the
|
||||
Nyquist limit).
|
||||
|
||||
Returns
|
||||
-------
|
||||
pxx : array-like
|
||||
The psd estimate.
|
||||
fgrid : array-like
|
||||
Frequency grid over which the periodogram was estimated.
|
||||
"""
|
||||
|
||||
x = np.atleast_1d(x)
|
||||
n = x.size
|
||||
|
||||
if x.ndim > 1:
|
||||
raise ValueError("Only rank 1 input supported for now.")
|
||||
if not np.isrealobj(x):
|
||||
raise ValueError("Only real input supported for now.")
|
||||
if not nfft:
|
||||
nfft = n
|
||||
a, e, k = lpc(x, order)
|
||||
|
||||
# This is not enough to deal correctly with even/odd size
|
||||
if nfft % 2 == 0:
|
||||
pn = nfft / 2 + 1
|
||||
else:
|
||||
pn = (nfft + 1) / 2
|
||||
|
||||
px = 1 / np.fft.fft(a, nfft)[:pn]
|
||||
pxx = np.real(np.conj(px) * px)
|
||||
pxx /= fs / e
|
||||
fx = np.linspace(0, fs * 0.5, pxx.size)
|
||||
return pxx, fx
|
||||
|
||||
|
||||
def arspecs(input_wav, order, Atal=False):
|
||||
epsilon = 0.0000000001
|
||||
data = input_wav
|
||||
ar = []
|
||||
ars = arspec(data, order, 4096)
|
||||
for k, l in zip(ars[0], ars[1]):
|
||||
ar.append(math.log(math.sqrt((k ** 2) + (l ** 2))))
|
||||
for val in range(0, len(ar)):
|
||||
if ar[val] == 0.0:
|
||||
ar[val] = deepcopy(epsilon)
|
||||
mspec1 = np.log10(ar)
|
||||
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
|
||||
ar = dct(mspec1, type=2, norm='ortho', axis=-1)
|
||||
return ar[:30]
|
||||
|
||||
|
||||
def specPS(input_wav, pitch):
|
||||
N = len(input_wav)
|
||||
samps = N / pitch
|
||||
if samps == 0:
|
||||
samps = 1
|
||||
frames = N / samps
|
||||
data = input_wav[0:frames]
|
||||
specs = periodogram(data, nfft=4096)
|
||||
for i in range(1, int(samps)):
|
||||
data = input_wav[frames * i:frames * (i + 1)]
|
||||
peri = periodogram(data, nfft=4096)
|
||||
for sp in range(len(peri[0])):
|
||||
specs[0][sp] += peri[0][sp]
|
||||
for s in range(len(specs[0])):
|
||||
specs[0][s] /= float(samps)
|
||||
peri = []
|
||||
for k, l in zip(specs[0], specs[1]):
|
||||
if k == 0 and l == 0:
|
||||
peri.append(epsilon)
|
||||
else:
|
||||
peri.append(math.log(math.sqrt((k ** 2) + (l ** 2))))
|
||||
# Filter the spectrum through the triangle filterbank
|
||||
mspec = np.log10(peri)
|
||||
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
|
||||
ceps = dct(mspec, type=2, norm='ortho', axis=-1)
|
||||
return ceps[:50]
|
||||
|
||||
|
||||
def build_single_feature_row(data):
|
||||
lpcs = [8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
|
||||
arr = []
|
||||
periodo = specPS(data, 50)
|
||||
arr.extend(periodo)
|
||||
for j in lpcs:
|
||||
ars = arspecs(data, j)
|
||||
arr.extend(ars)
|
||||
for i in range(len(arr)):
|
||||
if np.isnan(np.float(arr[i])):
|
||||
arr[i] = 0.0
|
||||
return arr
|
||||
|
||||
|
||||
def get_y():
|
||||
data = np.load('timit.npy')
|
||||
timit = []
|
||||
for row in data:
|
||||
y = open('Y/' + str(row[0]).replace("timit", "VTRFormants") + ".y").readline().split()
|
||||
arr = []
|
||||
arr.append(float(y[0]))
|
||||
arr.append(float(y[1]))
|
||||
arr.append(float(y[2]))
|
||||
arr.append(float(y[3]))
|
||||
arr.extend(row)
|
||||
timit.append(arr)
|
||||
nump = np.asarray(timit)
|
||||
np.save('timit_train_arspec',nump)
|
||||
return
|
||||
|
||||
|
||||
def build_timit_data():
|
||||
arcep_mat = []
|
||||
path = 'X_test/'
|
||||
for file in [f for f in os.listdir(path) if f.endswith('.wav')]:
|
||||
name = file.replace('.wav', '')
|
||||
y = open('Y_test' + '/' + str(name).replace("timit", "VTRFormants") + ".y").readline().split()
|
||||
X = build_data(path + file)
|
||||
arr = [name]
|
||||
arr.append(float(y[0]))
|
||||
arr.append(float(y[1]))
|
||||
arr.append(float(y[2]))
|
||||
arr.append(float(y[3]))
|
||||
arr.extend(build_single_feature_row(X))
|
||||
arcep_mat.append(arr)
|
||||
nump = np.asarray(arcep_mat)
|
||||
np.save('timitTest',nump)
|
||||
|
||||
arcep_mat = []
|
||||
path = 'X/'
|
||||
for file in [f for f in os.listdir(path) if f.endswith('.wav')]:
|
||||
name = file.replace('.wav', '')
|
||||
y = open('Y/' + str(name).replace("timit", "VTRFormants") + ".y").readline().split()
|
||||
X = build_data(path + file)
|
||||
arr = [name]
|
||||
arr.append(float(y[0]))
|
||||
arr.append(float(y[1]))
|
||||
arr.append(float(y[2]))
|
||||
arr.append(float(y[3]))
|
||||
arr.extend(build_single_feature_row(X))
|
||||
arcep_mat.append(arr)
|
||||
nump = np.asarray(arcep_mat)
|
||||
np.save('timitTrain',nump)
|
||||
return
|
||||
|
||||
build_timit_data()
|
||||
@@ -0,0 +1,135 @@
|
||||
from __future__ import print_function, division
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from torch.autograd import Variable
|
||||
import torch.nn.functional as F
|
||||
from torch import optim
|
||||
import numpy as np
|
||||
|
||||
train_data = np.load("timitTrain.npy")
|
||||
test_data = np.load("timitTest.npy")
|
||||
Xtrain = train_data[:,5:].astype(np.float32)
|
||||
Ytrain = train_data[:,1:5].astype(np.float32)
|
||||
Xtest = test_data[:,5:].astype(np.float32)
|
||||
Ytest = test_data[:,1:5].astype(np.float32)
|
||||
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda" if use_cuda else "cpu")
|
||||
_, D = Xtrain.shape
|
||||
K = len(Ytrain)
|
||||
|
||||
print(D, K)
|
||||
|
||||
class Net(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super(Net, self).__init__()
|
||||
self.Dense1 = nn.Linear(D, 1024)
|
||||
self.Dense2 = nn.Linear(1024, 512)
|
||||
self.Dense3 = nn.Linear(512, 256)
|
||||
self.out = nn.Linear(256, 4)
|
||||
|
||||
def forward(self, x):
|
||||
x = torch.sigmoid(self.Dense1(x))
|
||||
x = torch.sigmoid(self.Dense2(x))
|
||||
x = torch.sigmoid(self.Dense3(x))
|
||||
return self.out(x)
|
||||
|
||||
|
||||
|
||||
|
||||
loss = nn.L1Loss()
|
||||
|
||||
def train(model, loss, optimizer, inputs, labels):
|
||||
inputs = Variable(inputs.to(device))
|
||||
labels = Variable(labels.to(device))
|
||||
optimizer.zero_grad()
|
||||
|
||||
logits = model.forward(inputs)
|
||||
output = loss.forward(logits, labels)
|
||||
output.backward()
|
||||
optimizer.step()
|
||||
|
||||
return output.item()
|
||||
|
||||
|
||||
def predict(model, inputs):
|
||||
inputs = Variable(inputs)
|
||||
logits = model.forward(inputs.to(device))
|
||||
return logits.data.cpu().numpy()
|
||||
|
||||
|
||||
torch.manual_seed(0)
|
||||
|
||||
Xtrain = torch.from_numpy(Xtrain).float().to(device)
|
||||
Ytrain = torch.from_numpy(Ytrain).float().to(device)
|
||||
Xtest = torch.from_numpy(Xtest).float().to(device)
|
||||
Ytest = torch.from_numpy(Ytest).float().to(device)
|
||||
|
||||
model = Net().to(device)
|
||||
|
||||
|
||||
optimizer = optim.Adagrad(model.parameters(), lr=0.01)
|
||||
|
||||
epochs = 80
|
||||
batchSize = 20
|
||||
n_batches = Xtrain.size()[0]
|
||||
|
||||
costs = []
|
||||
test_accuracies = []
|
||||
print("Starting training ")
|
||||
for i in range(epochs):
|
||||
cost = 0.0
|
||||
for j in range(n_batches):
|
||||
Xbatch = Xtrain[j*batchSize:(j+1)*batchSize]
|
||||
Ybatch = Ytrain[j*batchSize:(j+1)*batchSize]
|
||||
cost += train(model, loss, optimizer, Xbatch, Ybatch)
|
||||
|
||||
loss1 = 0.0
|
||||
loss2 = 0.0
|
||||
loss3 = 0.0
|
||||
loss4 = 0.0
|
||||
max_1 = 0.0
|
||||
max_2 = 0.0
|
||||
max_3 = 0.0
|
||||
max_4 = 0.0
|
||||
list_1 = []
|
||||
list_2 = []
|
||||
list_3 = []
|
||||
list_4 = []
|
||||
print('predicting...')
|
||||
Ypred = predict(model, Xtest)
|
||||
for k in range(0, len(Ytest)):
|
||||
# print(y_hat[i])
|
||||
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
|
||||
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
|
||||
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
|
||||
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
|
||||
list_1.append(l1)
|
||||
list_2.append(l2)
|
||||
list_3.append(l3)
|
||||
list_4.append(l4)
|
||||
max_1 = max(max_1, l1)
|
||||
max_2 = max(max_2, l2)
|
||||
max_3 = max(max_3, l3)
|
||||
max_4 = max(max_4, l4)
|
||||
loss1 += l1
|
||||
loss2 += l2
|
||||
loss3 += l3
|
||||
loss4 += l4
|
||||
loss1 /= len(Ytest)
|
||||
loss2 /= len(Ytest)
|
||||
loss3 /= len(Ytest)
|
||||
loss4 /= len(Ytest)
|
||||
total_loss = loss1 + loss2 + loss3 + loss4
|
||||
total_loss /= 4.0
|
||||
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
|
||||
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
|
||||
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
|
||||
print("Epoch: %d, acc: %.3f" % (i, total_loss))
|
||||
|
||||
costs.append(cost/n_batches)
|
||||
test_accuracies.append(round(total_loss, 3))
|
||||
torch.save(model.state_dict(), "LPC_NN.pt")
|
||||
|
||||
print(test_accuracies)
|
||||
@@ -0,0 +1,114 @@
|
||||
from __future__ import print_function, division
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from torch.autograd import Variable
|
||||
import torch.nn.functional as F
|
||||
from torch import optim
|
||||
import numpy as np
|
||||
|
||||
test_data = np.load("timitTest.npy")
|
||||
Xtest = test_data[:,5:].astype(np.float32)
|
||||
Ytest = test_data[:,1:5].astype(np.float32)
|
||||
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda" if use_cuda else "cpu")
|
||||
_, D = Xtest.shape
|
||||
print(D)
|
||||
|
||||
class Net(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super(Net, self).__init__()
|
||||
self.Dense1 = nn.Linear(D, 1024)
|
||||
self.Dense2 = nn.Linear(1024, 512)
|
||||
self.Dense3 = nn.Linear(512, 256)
|
||||
self.out = nn.Linear(256, 4)
|
||||
|
||||
def forward(self, x):
|
||||
x = torch.sigmoid(self.Dense1(x))
|
||||
x = torch.sigmoid(self.Dense2(x))
|
||||
x = torch.sigmoid(self.Dense3(x))
|
||||
return self.out(x)
|
||||
|
||||
def scaledLoss(output, target):
|
||||
scale = torch.tensor([2.0, 1.0, .5, .1]).to(device)
|
||||
loss = torch.abs(output - target)
|
||||
scaled = loss*scale
|
||||
return torch.mean(scaled)
|
||||
|
||||
#loss = nn.L1Loss()
|
||||
|
||||
def train(model, optimizer, inputs, labels):
|
||||
inputs = Variable(inputs.to(device))
|
||||
labels = Variable(labels.to(device))
|
||||
optimizer.zero_grad()
|
||||
|
||||
logits = model.forward(inputs)
|
||||
output = scaledLoss(logits, labels)
|
||||
output.backward()
|
||||
optimizer.step()
|
||||
|
||||
return output.item()
|
||||
|
||||
|
||||
def predict(model, inputs):
|
||||
inputs = Variable(inputs)
|
||||
logits = model.forward(inputs.to(device))
|
||||
return logits.data.cpu().numpy()
|
||||
|
||||
|
||||
torch.manual_seed(0)
|
||||
|
||||
Xtest = torch.from_numpy(Xtest).float().to(device)
|
||||
Ytest = torch.from_numpy(Ytest).float().to(device)
|
||||
|
||||
model = Net().to(device)
|
||||
|
||||
|
||||
optimizer = optim.Adagrad(model.parameters(), lr=0.01)
|
||||
|
||||
model.load_state_dict(torch.load("LPC_NN_scaledLoss.pt"))
|
||||
model.eval()
|
||||
loss1 = 0.0
|
||||
loss2 = 0.0
|
||||
loss3 = 0.0
|
||||
loss4 = 0.0
|
||||
max_1 = 0.0
|
||||
max_2 = 0.0
|
||||
max_3 = 0.0
|
||||
max_4 = 0.0
|
||||
list_1 = []
|
||||
list_2 = []
|
||||
list_3 = []
|
||||
list_4 = []
|
||||
print('predicting...')
|
||||
Ypred = predict(model, Xtest)
|
||||
for k in range(0, len(Ytest)):
|
||||
# print(y_hat[i])
|
||||
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
|
||||
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
|
||||
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
|
||||
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
|
||||
list_1.append(l1)
|
||||
list_2.append(l2)
|
||||
list_3.append(l3)
|
||||
list_4.append(l4)
|
||||
max_1 = max(max_1, l1)
|
||||
max_2 = max(max_2, l2)
|
||||
max_3 = max(max_3, l3)
|
||||
max_4 = max(max_4, l4)
|
||||
loss1 += l1
|
||||
loss2 += l2
|
||||
loss3 += l3
|
||||
loss4 += l4
|
||||
loss1 /= len(Ytest)
|
||||
loss2 /= len(Ytest)
|
||||
loss3 /= len(Ytest)
|
||||
loss4 /= len(Ytest)
|
||||
total_loss = loss1 + loss2 + loss3 + loss4
|
||||
total_loss /= 4.0
|
||||
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
|
||||
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
|
||||
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
|
||||
print("acc: %.3f" % (total_loss))
|
||||
|
||||
Binary file not shown.
@@ -0,0 +1 @@
|
||||
|
||||
@@ -39,8 +39,8 @@ cd ~/torch; bash install-deps;
|
||||
```
|
||||
luarocks install rnn
|
||||
```
|
||||
The Estimation model can be downloaded here and because of size constraints the Tracking model can be abtained by download from this link
|
||||
[tracking_model.mat] (https://drive.google.com/open?id=0Bxkc5_D0JjpiZWx4eTU1d0hsVXc)
|
||||
The Estimation model can be downloaded here and because of size constraints the Tracking model can be abtained by download from this link
|
||||
[tracking_model.dat.gz](https://drive.google.com/open?id=1-BwlbbHykIV52c-SL1ofcppxZ5pTTXai)
|
||||
|
||||
## How to use:
|
||||
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
from __future__ import print_function, division
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from torch.autograd import Variable
|
||||
import torch.nn.functional as F
|
||||
from torch import optim
|
||||
import numpy as np
|
||||
torch.manual_seed(1)
|
||||
|
||||
trainY = np.load("norm_cnn_timit_train_Y.npy")
|
||||
testY = np.load("norm_cnn_timit_test_Y.npy")
|
||||
Xtrain = np.load("norm_cnn_timit_train_X.npy").astype(np.float32)
|
||||
Ytrain = trainY[:,1:5].astype(np.float32)
|
||||
Xtest = np.load("norm_cnn_timit_test_X.npy").astype(np.float32)
|
||||
Ytest = testY[:,1:5].astype(np.float32)
|
||||
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda" if use_cuda else "cpu")
|
||||
D = Xtrain.shape[1]
|
||||
K = len(Ytrain)
|
||||
|
||||
print(D, K)
|
||||
|
||||
class Net(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super(Net, self).__init__()
|
||||
self.Conv1 = nn.Conv2d(in_channels=1, out_channels=96, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv2 = nn.Conv2d(in_channels=96, out_channels=32, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5, 5), stride=1, padding=0)
|
||||
self.Dense5 = nn.Linear(43*38*64, 512)
|
||||
self.out = nn.Linear(512, 4)
|
||||
|
||||
def forward(self, x):
|
||||
in_size = x.size(0)
|
||||
x = F.relu(self.Conv1(x))
|
||||
x = F.relu(self.Conv2(x))
|
||||
x = F.max_pool2d(x, kernel_size=2, stride=1)
|
||||
x = F.relu(self.Conv3(x))
|
||||
x = F.relu(self.Conv4(x))
|
||||
x = F.max_pool2d(x, kernel_size=2, stride=1)
|
||||
#print(in_size)
|
||||
x = x.view(x.size(0), -1)
|
||||
x = F.relu(self.Dense5(x))
|
||||
return self.out(x)
|
||||
|
||||
|
||||
def train(model, loss, optimizer, inputs, labels):
|
||||
inputs = Variable(inputs.to(device))
|
||||
labels = Variable(labels.to(device))
|
||||
optimizer.zero_grad()
|
||||
|
||||
logits = model.forward(inputs)
|
||||
output = loss.forward(logits, labels)
|
||||
output.backward()
|
||||
optimizer.step()
|
||||
|
||||
return output.item()
|
||||
|
||||
|
||||
def predict(model, inputs):
|
||||
inputs = Variable(inputs)
|
||||
with torch.no_grad():
|
||||
logits = model.forward(inputs.to(device))
|
||||
return logits.data.cpu().numpy()
|
||||
|
||||
|
||||
Xtrain = torch.from_numpy(Xtrain).float().to(device)
|
||||
Ytrain = torch.from_numpy(Ytrain).float().to(device)
|
||||
Xtest = torch.from_numpy(Xtest).float().to(device)
|
||||
Ytest = torch.from_numpy(Ytest).float().to(device)
|
||||
|
||||
|
||||
model = Net().to(device)
|
||||
loss = nn.L1Loss()
|
||||
optimizer = optim.Adagrad(model.parameters())
|
||||
|
||||
epochs = 80
|
||||
batchSize = 32
|
||||
n_batches = int(np.floor(Xtrain.size()[0]/batchSize))
|
||||
|
||||
costs = []
|
||||
test_accuracies = []
|
||||
print("Starting training ")
|
||||
for i in range(epochs):
|
||||
cost = 0.0
|
||||
for j in range(n_batches):
|
||||
#print(j, '/', n_batches)
|
||||
Xbatch = Xtrain[j*batchSize:(j+1)*batchSize]
|
||||
Ybatch = Ytrain[j*batchSize:(j+1)*batchSize]
|
||||
cost += train(model, loss, optimizer, Xbatch, Ybatch)
|
||||
|
||||
loss1 = 0.0
|
||||
loss2 = 0.0
|
||||
loss3 = 0.0
|
||||
loss4 = 0.0
|
||||
max_1 = 0.0
|
||||
max_2 = 0.0
|
||||
max_3 = 0.0
|
||||
max_4 = 0.0
|
||||
list_1 = []
|
||||
list_2 = []
|
||||
list_3 = []
|
||||
list_4 = []
|
||||
print('predicting...')
|
||||
Ypred = predict(model, Xtest)
|
||||
for k in range(0, len(Ytest)):
|
||||
# print(y_hat[i])
|
||||
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
|
||||
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
|
||||
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
|
||||
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
|
||||
list_1.append(l1)
|
||||
list_2.append(l2)
|
||||
list_3.append(l3)
|
||||
list_4.append(l4)
|
||||
max_1 = max(max_1, l1)
|
||||
max_2 = max(max_2, l2)
|
||||
max_3 = max(max_3, l3)
|
||||
max_4 = max(max_4, l4)
|
||||
loss1 += l1
|
||||
loss2 += l2
|
||||
loss3 += l3
|
||||
loss4 += l4
|
||||
loss1 /= len(Ytest)
|
||||
loss2 /= len(Ytest)
|
||||
loss3 /= len(Ytest)
|
||||
loss4 /= len(Ytest)
|
||||
total_loss = loss1 + loss2 + loss3 + loss4
|
||||
total_loss /= 4.0
|
||||
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
|
||||
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
|
||||
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
|
||||
print("Epoch: %d, acc: %.3f" % (i, total_loss))
|
||||
|
||||
costs.append(cost/n_batches)
|
||||
test_accuracies.append(round(total_loss, 3))
|
||||
torch.save(model.state_dict(), "CNN_estimate.pt")
|
||||
|
||||
print(test_accuracies)
|
||||
@@ -0,0 +1,121 @@
|
||||
from __future__ import print_function, division
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from torch.autograd import Variable
|
||||
import torch.nn.functional as F
|
||||
from torch import optim
|
||||
import numpy as np
|
||||
torch.manual_seed(1)
|
||||
|
||||
testY = np.load("norm_cnn_timit_test_Y.npy")
|
||||
Xtest = np.load("norm_cnn_timit_test_X.npy").astype(np.float32)
|
||||
Ytest = testY[:,1:5].astype(np.float32)
|
||||
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda" if use_cuda else "cpu")
|
||||
D = Xtest.shape
|
||||
print(D)
|
||||
|
||||
print(Xtest.shape[1], len(Ytest))
|
||||
|
||||
|
||||
class Net(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super(Net, self).__init__()
|
||||
self.Conv1 = nn.Conv2d(in_channels=1, out_channels=96, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv2 = nn.Conv2d(in_channels=96, out_channels=32, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), stride=1, padding=0)
|
||||
self.Conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5, 5), stride=1, padding=0)
|
||||
self.Dense5 = nn.Linear(43*38*64, 512)
|
||||
self.out = nn.Linear(512, 4)
|
||||
|
||||
def forward(self, x):
|
||||
in_size = x.size(0)
|
||||
x = F.relu(self.Conv1(x))
|
||||
x = F.relu(self.Conv2(x))
|
||||
x = F.max_pool2d(x, kernel_size=2, stride=1)
|
||||
x = F.relu(self.Conv3(x))
|
||||
x = F.relu(self.Conv4(x))
|
||||
x = F.max_pool2d(x, kernel_size=2, stride=1)
|
||||
#print(in_size)
|
||||
x = x.view(x.size(0), -1)
|
||||
x = F.relu(self.Dense5(x))
|
||||
return self.out(x)
|
||||
|
||||
|
||||
def train(model, loss, optimizer, inputs, labels):
|
||||
inputs = Variable(inputs.to(device))
|
||||
labels = Variable(labels.to(device))
|
||||
optimizer.zero_grad()
|
||||
|
||||
logits = model.forward(inputs)
|
||||
output = loss.forward(logits, labels)
|
||||
output.backward()
|
||||
optimizer.step()
|
||||
|
||||
return output.item()
|
||||
|
||||
|
||||
def predict(model, inputs):
|
||||
inputs = Variable(inputs)
|
||||
with torch.no_grad():
|
||||
logits = model.forward(inputs.to(device))
|
||||
return logits.data.cpu().numpy()
|
||||
|
||||
Xtest = torch.from_numpy(Xtest).float().to(device)
|
||||
Ytest = torch.from_numpy(Ytest).float().to(device)
|
||||
|
||||
|
||||
model = Net().to(device)
|
||||
loss = nn.L1Loss()
|
||||
optimizer = optim.Adagrad(model.parameters())
|
||||
|
||||
model.load_state_dict(torch.load("CNN_estimate.pt"))
|
||||
model.eval()
|
||||
loss1 = 0.0
|
||||
loss2 = 0.0
|
||||
loss3 = 0.0
|
||||
loss4 = 0.0
|
||||
max_1 = 0.0
|
||||
max_2 = 0.0
|
||||
max_3 = 0.0
|
||||
max_4 = 0.0
|
||||
list_1 = []
|
||||
list_2 = []
|
||||
list_3 = []
|
||||
list_4 = []
|
||||
print('predicting...')
|
||||
Ypred1 = predict(model, Xtest[:1000])
|
||||
Ypred2 = predict(model, Xtest[1000:2000])
|
||||
Ypred3 = predict(model, Xtest[2000:])
|
||||
Ypred = np.concatenate((Ypred1, Ypred2, Ypred3))
|
||||
for k in range(0, len(Ytest)):
|
||||
# print(y_hat[i])
|
||||
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
|
||||
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
|
||||
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
|
||||
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
|
||||
list_1.append(l1)
|
||||
list_2.append(l2)
|
||||
list_3.append(l3)
|
||||
list_4.append(l4)
|
||||
max_1 = max(max_1, l1)
|
||||
max_2 = max(max_2, l2)
|
||||
max_3 = max(max_3, l3)
|
||||
max_4 = max(max_4, l4)
|
||||
loss1 += l1
|
||||
loss2 += l2
|
||||
loss3 += l3
|
||||
loss4 += l4
|
||||
loss1 /= len(Ytest)
|
||||
loss2 /= len(Ytest)
|
||||
loss3 /= len(Ytest)
|
||||
loss4 /= len(Ytest)
|
||||
total_loss = loss1 + loss2 + loss3 + loss4
|
||||
total_loss /= 4.0
|
||||
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
|
||||
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
|
||||
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
|
||||
print("acc: %.3f" % (total_loss))
|
||||
|
||||
Reference in New Issue
Block a user