1 Commits

Author SHA1 Message Date
Daniel Richard G e759570f98 Get DeepFormants working again
* Minor syntax tweaks to make the code Python 3 compatible

* Fixes for various NumPy warnings/errors, either due to use of "float"
  where "int" is required, or domain errors on log functions

* Replaced the use of the obsolete Python-2-only scikits.talkbox
  library with a compatible LPC implementation from the Conch project

* Documentation update to indicate that an old version of "rnn" is
  required

* Invoke Lua scripts via "luajit" directly, instead of going through
  the "th" frontend (to reduce the dependency footprint)
2020-01-17 15:57:00 -05:00
12 changed files with 317 additions and 788 deletions
-246
View File
@@ -1,246 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import wave
import os
import math
from scipy.fftpack.realtransforms import dct
from copy import deepcopy
from scipy.fftpack import fft, ifft
from scikits.talkbox.linpred import lpc
np.random.seed(1337)
epsilon = 0.0000000001
def build_data(wav, begin=None, end=None):
wav_in_file = wave.Wave_read(wav)
wav_in_num_samples = wav_in_file.getnframes()
N = wav_in_file.getnframes()
dstr = wav_in_file.readframes(N)
data = np.fromstring(dstr, np.int16)
return data
def periodogram(x, nfft=None, fs=1):
"""Compute the periodogram of the given signal, with the given fft size.
Parameters
----------
x : array-like
input signal
nfft : int
size of the fft to compute the periodogram. If None (default), the
length of the signal is used. if nfft > n, the signal is 0 padded.
fs : float
Sampling rate. By default, is 1 (normalized frequency. e.g. 0.5 is the
Nyquist limit).
Returns
-------
pxx : array-like
The psd estimate.
fgrid : array-like
Frequency grid over which the periodogram was estimated.
Examples
--------
Generate a signal with two sinusoids, and compute its periodogram:
>>> fs = 1000
>>> x = np.sin(2 * np.pi * 0.1 * fs * np.linspace(0, 0.5, 0.5*fs))
>>> x += np.sin(2 * np.pi * 0.2 * fs * np.linspace(0, 0.5, 0.5*fs))
>>> px, fx = periodogram(x, 512, fs)
Notes
-----
Only real signals supported for now.
Returns the one-sided version of the periodogram.
Discrepency with matlab: matlab compute the psd in unit of power / radian /
sample, and we compute the psd in unit of power / sample: to get the same
result as matlab, just multiply the result from talkbox by 2pi"""
x = np.atleast_1d(x)
n = x.size
if x.ndim > 1:
raise ValueError("Only rank 1 input supported for now.")
if not np.isrealobj(x):
raise ValueError("Only real input supported for now.")
if not nfft:
nfft = n
if nfft < n:
raise ValueError("nfft < signal size not supported yet")
pxx = np.abs(fft(x, nfft)) ** 2
if nfft % 2 == 0:
pn = nfft / 2 + 1
else:
pn = (nfft + 1) / 2
fgrid = np.linspace(0, fs * 0.5, pn)
return pxx[:pn] / (n * fs), fgrid
def arspec(x, order, nfft=None, fs=1):
"""Compute the spectral density using an AR model.
An AR model of the signal is estimated through the Yule-Walker equations;
the estimated AR coefficient are then used to compute the spectrum, which
can be computed explicitely for AR models.
Parameters
----------
x : array-like
input signal
order : int
Order of the LPC computation.
nfft : int
size of the fft to compute the periodogram. If None (default), the
length of the signal is used. if nfft > n, the signal is 0 padded.
fs : float
Sampling rate. By default, is 1 (normalized frequency. e.g. 0.5 is the
Nyquist limit).
Returns
-------
pxx : array-like
The psd estimate.
fgrid : array-like
Frequency grid over which the periodogram was estimated.
"""
x = np.atleast_1d(x)
n = x.size
if x.ndim > 1:
raise ValueError("Only rank 1 input supported for now.")
if not np.isrealobj(x):
raise ValueError("Only real input supported for now.")
if not nfft:
nfft = n
a, e, k = lpc(x, order)
# This is not enough to deal correctly with even/odd size
if nfft % 2 == 0:
pn = nfft / 2 + 1
else:
pn = (nfft + 1) / 2
px = 1 / np.fft.fft(a, nfft)[:pn]
pxx = np.real(np.conj(px) * px)
pxx /= fs / e
fx = np.linspace(0, fs * 0.5, pxx.size)
return pxx, fx
def arspecs(input_wav, order, Atal=False):
epsilon = 0.0000000001
data = input_wav
ar = []
ars = arspec(data, order, 4096)
for k, l in zip(ars[0], ars[1]):
ar.append(math.log(math.sqrt((k ** 2) + (l ** 2))))
for val in range(0, len(ar)):
if ar[val] == 0.0:
ar[val] = deepcopy(epsilon)
mspec1 = np.log10(ar)
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
ar = dct(mspec1, type=2, norm='ortho', axis=-1)
return ar[:30]
def specPS(input_wav, pitch):
N = len(input_wav)
samps = N / pitch
if samps == 0:
samps = 1
frames = N / samps
data = input_wav[0:frames]
specs = periodogram(data, nfft=4096)
for i in range(1, int(samps)):
data = input_wav[frames * i:frames * (i + 1)]
peri = periodogram(data, nfft=4096)
for sp in range(len(peri[0])):
specs[0][sp] += peri[0][sp]
for s in range(len(specs[0])):
specs[0][s] /= float(samps)
peri = []
for k, l in zip(specs[0], specs[1]):
if k == 0 and l == 0:
peri.append(epsilon)
else:
peri.append(math.log(math.sqrt((k ** 2) + (l ** 2))))
# Filter the spectrum through the triangle filterbank
mspec = np.log10(peri)
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
ceps = dct(mspec, type=2, norm='ortho', axis=-1)
return ceps[:50]
def build_single_feature_row(data):
lpcs = [8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
arr = []
periodo = specPS(data, 50)
arr.extend(periodo)
for j in lpcs:
ars = arspecs(data, j)
arr.extend(ars)
for i in range(len(arr)):
if np.isnan(np.float(arr[i])):
arr[i] = 0.0
return arr
def get_y():
data = np.load('timit.npy')
timit = []
for row in data:
y = open('Y/' + str(row[0]).replace("timit", "VTRFormants") + ".y").readline().split()
arr = []
arr.append(float(y[0]))
arr.append(float(y[1]))
arr.append(float(y[2]))
arr.append(float(y[3]))
arr.extend(row)
timit.append(arr)
nump = np.asarray(timit)
np.save('timit_train_arspec',nump)
return
def build_timit_data():
arcep_mat = []
path = 'X_test/'
for file in [f for f in os.listdir(path) if f.endswith('.wav')]:
name = file.replace('.wav', '')
y = open('Y_test' + '/' + str(name).replace("timit", "VTRFormants") + ".y").readline().split()
X = build_data(path + file)
arr = [name]
arr.append(float(y[0]))
arr.append(float(y[1]))
arr.append(float(y[2]))
arr.append(float(y[3]))
arr.extend(build_single_feature_row(X))
arcep_mat.append(arr)
nump = np.asarray(arcep_mat)
np.save('timitTest',nump)
arcep_mat = []
path = 'X/'
for file in [f for f in os.listdir(path) if f.endswith('.wav')]:
name = file.replace('.wav', '')
y = open('Y/' + str(name).replace("timit", "VTRFormants") + ".y").readline().split()
X = build_data(path + file)
arr = [name]
arr.append(float(y[0]))
arr.append(float(y[1]))
arr.append(float(y[2]))
arr.append(float(y[3]))
arr.extend(build_single_feature_row(X))
arcep_mat.append(arr)
nump = np.asarray(arcep_mat)
np.save('timitTrain',nump)
return
build_timit_data()
-135
View File
@@ -1,135 +0,0 @@
from __future__ import print_function, division
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
from torch import optim
import numpy as np
train_data = np.load("timitTrain.npy")
test_data = np.load("timitTest.npy")
Xtrain = train_data[:,5:].astype(np.float32)
Ytrain = train_data[:,1:5].astype(np.float32)
Xtest = test_data[:,5:].astype(np.float32)
Ytest = test_data[:,1:5].astype(np.float32)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
_, D = Xtrain.shape
K = len(Ytrain)
print(D, K)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.Dense1 = nn.Linear(D, 1024)
self.Dense2 = nn.Linear(1024, 512)
self.Dense3 = nn.Linear(512, 256)
self.out = nn.Linear(256, 4)
def forward(self, x):
x = torch.sigmoid(self.Dense1(x))
x = torch.sigmoid(self.Dense2(x))
x = torch.sigmoid(self.Dense3(x))
return self.out(x)
loss = nn.L1Loss()
def train(model, loss, optimizer, inputs, labels):
inputs = Variable(inputs.to(device))
labels = Variable(labels.to(device))
optimizer.zero_grad()
logits = model.forward(inputs)
output = loss.forward(logits, labels)
output.backward()
optimizer.step()
return output.item()
def predict(model, inputs):
inputs = Variable(inputs)
logits = model.forward(inputs.to(device))
return logits.data.cpu().numpy()
torch.manual_seed(0)
Xtrain = torch.from_numpy(Xtrain).float().to(device)
Ytrain = torch.from_numpy(Ytrain).float().to(device)
Xtest = torch.from_numpy(Xtest).float().to(device)
Ytest = torch.from_numpy(Ytest).float().to(device)
model = Net().to(device)
optimizer = optim.Adagrad(model.parameters(), lr=0.01)
epochs = 80
batchSize = 20
n_batches = Xtrain.size()[0]
costs = []
test_accuracies = []
print("Starting training ")
for i in range(epochs):
cost = 0.0
for j in range(n_batches):
Xbatch = Xtrain[j*batchSize:(j+1)*batchSize]
Ybatch = Ytrain[j*batchSize:(j+1)*batchSize]
cost += train(model, loss, optimizer, Xbatch, Ybatch)
loss1 = 0.0
loss2 = 0.0
loss3 = 0.0
loss4 = 0.0
max_1 = 0.0
max_2 = 0.0
max_3 = 0.0
max_4 = 0.0
list_1 = []
list_2 = []
list_3 = []
list_4 = []
print('predicting...')
Ypred = predict(model, Xtest)
for k in range(0, len(Ytest)):
# print(y_hat[i])
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
list_1.append(l1)
list_2.append(l2)
list_3.append(l3)
list_4.append(l4)
max_1 = max(max_1, l1)
max_2 = max(max_2, l2)
max_3 = max(max_3, l3)
max_4 = max(max_4, l4)
loss1 += l1
loss2 += l2
loss3 += l3
loss4 += l4
loss1 /= len(Ytest)
loss2 /= len(Ytest)
loss3 /= len(Ytest)
loss4 /= len(Ytest)
total_loss = loss1 + loss2 + loss3 + loss4
total_loss /= 4.0
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
print("Epoch: %d, acc: %.3f" % (i, total_loss))
costs.append(cost/n_batches)
test_accuracies.append(round(total_loss, 3))
torch.save(model.state_dict(), "LPC_NN.pt")
print(test_accuracies)
-114
View File
@@ -1,114 +0,0 @@
from __future__ import print_function, division
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
from torch import optim
import numpy as np
test_data = np.load("timitTest.npy")
Xtest = test_data[:,5:].astype(np.float32)
Ytest = test_data[:,1:5].astype(np.float32)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
_, D = Xtest.shape
print(D)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.Dense1 = nn.Linear(D, 1024)
self.Dense2 = nn.Linear(1024, 512)
self.Dense3 = nn.Linear(512, 256)
self.out = nn.Linear(256, 4)
def forward(self, x):
x = torch.sigmoid(self.Dense1(x))
x = torch.sigmoid(self.Dense2(x))
x = torch.sigmoid(self.Dense3(x))
return self.out(x)
def scaledLoss(output, target):
scale = torch.tensor([2.0, 1.0, .5, .1]).to(device)
loss = torch.abs(output - target)
scaled = loss*scale
return torch.mean(scaled)
#loss = nn.L1Loss()
def train(model, optimizer, inputs, labels):
inputs = Variable(inputs.to(device))
labels = Variable(labels.to(device))
optimizer.zero_grad()
logits = model.forward(inputs)
output = scaledLoss(logits, labels)
output.backward()
optimizer.step()
return output.item()
def predict(model, inputs):
inputs = Variable(inputs)
logits = model.forward(inputs.to(device))
return logits.data.cpu().numpy()
torch.manual_seed(0)
Xtest = torch.from_numpy(Xtest).float().to(device)
Ytest = torch.from_numpy(Ytest).float().to(device)
model = Net().to(device)
optimizer = optim.Adagrad(model.parameters(), lr=0.01)
model.load_state_dict(torch.load("LPC_NN_scaledLoss.pt"))
model.eval()
loss1 = 0.0
loss2 = 0.0
loss3 = 0.0
loss4 = 0.0
max_1 = 0.0
max_2 = 0.0
max_3 = 0.0
max_4 = 0.0
list_1 = []
list_2 = []
list_3 = []
list_4 = []
print('predicting...')
Ypred = predict(model, Xtest)
for k in range(0, len(Ytest)):
# print(y_hat[i])
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
list_1.append(l1)
list_2.append(l2)
list_3.append(l3)
list_4.append(l4)
max_1 = max(max_1, l1)
max_2 = max(max_2, l2)
max_3 = max(max_3, l3)
max_4 = max(max_4, l4)
loss1 += l1
loss2 += l2
loss3 += l3
loss4 += l4
loss1 /= len(Ytest)
loss2 /= len(Ytest)
loss3 /= len(Ytest)
loss4 /= len(Ytest)
total_loss = loss1 + loss2 + loss3 + loss4
total_loss /= 4.0
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
print("acc: %.3f" % (total_loss))
BIN
View File
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
+8 -9
View File
@@ -24,9 +24,7 @@ Download the code. The code is based on signal processing package in Python call
Dependencies:
Run these lines in a terminal to install everything necessary for feature extraction.
```
sudo apt-get install python-numpy python-scipy python-nose python-pip
sudo pip install scikits.talkbox
sudo apt-get install python3-numpy python3-scipy python3-nose
```
Next for the installation of Torch for loading the models run this.
```
@@ -37,30 +35,31 @@ cd ~/torch; bash install-deps;
./install.sh
```
```
luarocks install rnn
git clone https://github.com/Element-Research/rnn.git old-rnn
cd old-rnn; luarocks make rocks/rnn-scm-1.rockspec
```
The Estimation model can be downloaded here and because of size constraints the Tracking model can be abtained by download from this link
[tracking_model.dat.gz](https://drive.google.com/open?id=1-BwlbbHykIV52c-SL1ofcppxZ5pTTXai)
The Estimation model can be downloaded here and because of size constraints the Tracking model can be obtained by download from this link:
[tracking_model.mat](https://drive.google.com/open?id=0Bxkc5_D0JjpiZWx4eTU1d0hsVXc)
## How to use:
For vowel formant estimation, call the main script in a terminal with the following inputs: wav file, formant output filename, and the vowel begin and end times:
```
python formants.py data/Example.wav data/ExamplePredictions.csv --begin 1.2 --end 1.3
python3 formants.py data/Example.wav data/ExamplePredictions.csv --begin 1.2 --end 1.3
```
or the vowel begin and end times can be taken from a TextGrid file (here the name of the TextGrid is Example.TextGrid and the vowel is taken from a tier called "VOWEL"):
```
python formants.py data/Example.wav data/examplePredictions.csv --textgrid_filename data/Example.TextGrid \
python3 formants.py data/Example.wav data/examplePredictions.csv --textgrid_filename data/Example.TextGrid \
--textgrid_tier VOWEL
```
For formant tracking, just call the script with the wav file and output filename:
```
python formants.py data/Example.wav data/ExamplePredictions.csv
python3 formants.py data/Example.wav data/ExamplePredictions.csv
```
+17 -15
View File
@@ -9,9 +9,9 @@ from os.path import isfile, join
import math
from scipy.fftpack.realtransforms import dct
from scipy.signal import lfilter, hamming
from copy import deepcopy
from scipy.fftpack import fft, ifft
from scikits.talkbox.linpred import lpc
#from scikits.talkbox.linpred import lpc # obsolete
from helpers.conch_lpc import lpc
import shutil
from helpers.utilities import *
@@ -88,9 +88,9 @@ def periodogram(x, nfft=None, fs=1):
pxx = np.abs(fft(x, nfft)) ** 2
if nfft % 2 == 0:
pn = nfft / 2 + 1
pn = nfft // 2 + 1
else:
pn = (nfft + 1 )/ 2
pn = (nfft + 1) // 2
fgrid = np.linspace(0, fs * 0.5, pn)
return pxx[:pn] / (n * fs), fgrid
@@ -137,9 +137,9 @@ def arspec(x, order, nfft=None, fs=1):
# This is not enough to deal correctly with even/odd size
if nfft % 2 == 0:
pn = nfft / 2 + 1
pn = nfft // 2 + 1
else:
pn = (nfft + 1 )/ 2
pn = (nfft + 1) // 2
px = 1 / np.fft.fft(a, nfft)[:pn]
pxx = np.real(np.conj(px) * px)
@@ -200,7 +200,6 @@ def preemp(input, p):
def arspecs(input_wav,order,Atal=False):
epsilon = 0.0000000001
data = input_wav
if Atal:
ar = atal(data, order, 30)
@@ -211,8 +210,10 @@ def arspecs(input_wav,order,Atal=False):
for k, l in zip(ars[0], ars[1]):
ar.append(math.log(math.sqrt((k**2)+(l**2))))
for val in range(0,len(ar)):
if ar[val] == 0.0:
ar[val] = deepcopy(epsilon)
if ar[val] < 0.0:
ar[val] = np.nan
elif ar[val] == 0.0:
ar[val] = epsilon
mspec1 = np.log10(ar)
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
ar = dct(mspec1, type=2, norm='ortho', axis=-1)
@@ -221,10 +222,10 @@ def arspecs(input_wav,order,Atal=False):
def specPS(input_wav,pitch):
N = len(input_wav)
samps = N/pitch
samps = N // pitch
if samps == 0:
samps = 1
frames = N/samps
frames = N // samps
data = input_wav[0:frames]
specs = periodogram(data,nfft=4096)
for i in range(1,int(samps)):
@@ -236,10 +237,11 @@ def specPS(input_wav,pitch):
specs[0][s] /= float(samps)
peri = []
for k, l in zip(specs[0], specs[1]):
if k == 0 and l == 0:
peri.append(epsilon)
else:
peri.append(math.log(math.sqrt((k ** 2) + (l ** 2))))
m = math.sqrt((k ** 2) + (l ** 2))
if m > 0: m = math.log(m)
if m == 0: m = epsilon
elif m < 0: m = np.nan
peri.append(m)
# Filter the spectrum through the triangle filterbank
mspec = np.log10(peri)
# Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain)
+4 -4
View File
@@ -9,19 +9,19 @@ import shutil
def predict_from_times(wav_filename, preds_filename, begin, end):
tmp_features_filename = tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + ".txt"
print tmp_features_filename
print(tmp_features_filename)
if begin > 0.0 or end > 0.0:
features.create_features(wav_filename, tmp_features_filename, begin, end)
easy_call("th load_estimation_model.lua " + tmp_features_filename + ' ' + preds_filename)
easy_call("luajit load_estimation_model.lua " + tmp_features_filename + ' ' + preds_filename)
else:
features.create_features(wav_filename, tmp_features_filename)
easy_call("th load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename)
easy_call("luajit load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename)
def predict_from_textgrid(wav_filename, preds_filename, textgrid_filename, textgrid_tier):
print wav_filename
print(wav_filename)
if os.path.exists(preds_filename):
os.remove(preds_filename)
+2 -2
View File
@@ -4,12 +4,12 @@ if [ $# -eq 2 ]
then
tempfile=`mktemp -t txt`
python extract_features.py $1 $tempfile
th load_estimation_model.lua $tempfile $2
luajit load_estimation_model.lua $tempfile $2
elif [ $# -eq 4 ]
then
tempfile=`mktemp -t txt`
python extract_features.py $1 $tempfile --begin $3 --end $4
th load_estimation_model.lua $tempfile $2
luajit load_estimation_model.lua $tempfile $2
else
echo "$0 wav_filename pred_csv_filename [begin_time end_time]"
fi
+286
View File
@@ -0,0 +1,286 @@
# This file has been copied (with minor changes) from Michael
# McAuliffe's Conch project, to provide a compatible replacement
# implementation of the lpc() function from the obsolete Python-2-only
# scikits.talkbox library.
#
# Conch repository: https://github.com/mmcauliffe/Conch-sounds
# Source: https://github.com/mmcauliffe/Conch-sounds/blob/master/conch/analysis/formants/lpc.py
# Copyright (c) 2015 Michael McAuliffe
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#import librosa
import numpy as np
import scipy as sp
from scipy.signal import lfilter
from scipy.fftpack import fft, ifft
from scipy.signal import gaussian
#from ..helper import nextpow2
#from ..functions import BaseAnalysisFunction
# Source: https://github.com/mmcauliffe/Conch-sounds/blob/master/conch/analysis/helper.py
def nextpow2(x):
"""Return the first integer N such that 2**N >= abs(x)"""
return np.ceil(np.log2(np.abs(x)))
def lpc_ref(signal, order):
"""Compute the Linear Prediction Coefficients.
Return the order + 1 LPC coefficients for the signal. c = lpc(x, k) will
find the k+1 coefficients of a k order linear filter:
xp[n] = -c[1] * x[n-2] - ... - c[k-1] * x[n-k-1]
Such as the sum of the squared-error e[i] = xp[i] - x[i] is minimized.
Parameters
----------
signal: array_like
input signal
order : int
LPC order (the output will have order + 1 items)
Notes
----
This is just for reference, as it is using the direct inversion of the
toeplitz matrix, which is really slow"""
if signal.ndim > 1:
raise ValueError("Array of rank > 1 not supported yet")
if order > signal.size:
raise ValueError("Input signal must have a lenght >= lpc order")
if order > 0:
p = order + 1
r = np.zeros(p, 'float32')
# Number of non zero values in autocorrelation one needs for p LPC
# coefficients
nx = np.min([p, signal.size])
x = np.correlate(signal, signal, 'full')
r[:nx] = x[signal.size - 1:signal.size + order]
phi = np.dot(sp.linalg.inv(sp.linalg.toeplitz(r[:-1])), -r[1:])
return np.concatenate(([1.], phi))
else:
return np.ones(1, dtype='float32')
# @jit
def levinson_1d(r, order):
"""Levinson-Durbin recursion, to efficiently solve symmetric linear systems
with toeplitz structure.
Parameters
---------
r : array-like
input array to invert (since the matrix is symmetric Toeplitz, the
corresponding pxp matrix is defined by p items only). Generally the
autocorrelation of the signal for linear prediction coefficients
estimation. The first item must be a non zero real.
Notes
----
This implementation is in python, hence unsuitable for any serious
computation. Use it as educational and reference purpose only.
Levinson is a well-known algorithm to solve the Hermitian toeplitz
equation:
_ _
-R[1] = R[0] R[1] ... R[p-1] a[1]
: : : : * :
: : : _ * :
-R[p] = R[p-1] R[p-2] ... R[0] a[p]
_
with respect to a ( is the complex conjugate). Using the special symmetry
in the matrix, the inversion can be done in O(p^2) instead of O(p^3).
"""
r = np.atleast_1d(r)
if r.ndim > 1:
raise ValueError("Only rank 1 are supported for now.")
n = r.size
if n < 1:
raise ValueError("Cannot operate on empty array !")
elif order > n - 1:
raise ValueError("Order should be <= size-1")
if not np.isreal(r[0]):
raise ValueError("First item of input must be real.")
elif not np.isfinite(1 / r[0]):
raise ValueError("First item should be != 0")
# Estimated coefficients
a = np.empty(order + 1, 'float32')
# temporary array
t = np.empty(order + 1, 'float32')
# Reflection coefficients
k = np.empty(order, 'float32')
a[0] = 1.
e = r[0]
for i in range(1, order + 1):
acc = r[i]
for j in range(1, i):
acc += a[j] * r[i - j]
k[i - 1] = -acc / e
a[i] = k[i - 1]
for j in range(order):
t[j] = a[j]
for j in range(1, i):
a[j] += k[i - 1] * np.conj(t[i - j])
e *= 1 - k[i - 1] * np.conj(k[i - 1])
return a, e, k
# @jit
def _acorr_last_axis(x, nfft, maxlag):
a = np.real(ifft(np.abs(fft(x, n=nfft) ** 2)))
return a[..., :maxlag + 1] / x.shape[-1]
# @jit
def acorr_lpc(x, axis=-1):
"""Compute autocorrelation of x along the given axis.
This compute the biased autocorrelation estimator (divided by the size of
input signal)
Notes
-----
The reason why we do not use acorr directly is for speed issue."""
if not np.isrealobj(x):
raise ValueError("Complex input not supported yet")
maxlag = x.shape[axis]
nfft = int(2 ** nextpow2(2 * maxlag - 1))
if axis != -1:
x = np.swapaxes(x, -1, axis)
a = _acorr_last_axis(x, nfft, maxlag)
if axis != -1:
a = np.swapaxes(a, -1, axis)
return a
# @jit
def lpc(signal, order, axis=-1):
"""Compute the Linear Prediction Coefficients.
Return the order + 1 LPC coefficients for the signal. c = lpc(x, k) will
find the k+1 coefficients of a k order linear filter:
xp[n] = -c[1] * x[n-2] - ... - c[k-1] * x[n-k-1]
Such as the sum of the squared-error e[i] = xp[i] - x[i] is minimized.
Parameters
----------
signal: array_like
input signal
order : int
LPC order (the output will have order + 1 items)
Returns
-------
a : array-like
the solution of the inversion.
e : array-like
the prediction error.
k : array-like
reflection coefficients.
Notes
-----
This uses Levinson-Durbin recursion for the autocorrelation matrix
inversion, and fft for the autocorrelation computation.
For small order, particularly if order << signal size, direct computation
of the autocorrelation is faster: use levinson and correlate in this case."""
n = signal.shape[axis]
if order > n:
raise ValueError("Input signal must have length >= order")
r = acorr_lpc(signal, axis)
return levinson_1d(r, order)
def process_frame(X, window, num_formants, new_sr):
X = X * window
A, e, k = lpc(X, num_formants * 2)
rts = np.roots(A)
rts = rts[np.where(np.imag(rts) >= 0)]
angz = np.arctan2(np.imag(rts), np.real(rts))
frqs = angz * (new_sr / (2 * np.pi))
frq_inds = np.argsort(frqs)
frqs = frqs[frq_inds]
bw = -1 / 2 * (new_sr / (2 * np.pi)) * np.log(np.abs(rts[frq_inds]))
return frqs, bw
def lpc_formants(signal, sr, num_formants, max_freq, time_step,
win_len, window_shape='gaussian'):
output = {}
new_sr = 2 * max_freq
alpha = np.exp(-2 * np.pi * 50 * (1 / new_sr))
proc = lfilter([1., -alpha], 1, signal)
if sr > new_sr:
proc = librosa.resample(proc, sr, new_sr)
nperseg = int(win_len * new_sr)
nperstep = int(time_step * new_sr)
if window_shape == 'gaussian':
window = gaussian(nperseg + 2, 0.45 * (nperseg - 1) / 2)[1:nperseg + 1]
else:
window = np.hanning(nperseg + 2)[1:nperseg + 1]
indices = np.arange(int(nperseg / 2), proc.shape[0] - int(nperseg / 2) + 1, nperstep)
num_frames = len(indices)
for i in range(num_frames):
if nperseg % 2 != 0:
X = proc[indices[i] - int(nperseg / 2):indices[i] + int(nperseg / 2) + 1]
else:
X = proc[indices[i] - int(nperseg / 2):indices[i] + int(nperseg / 2)]
frqs, bw = process_frame(X, window, num_formants, new_sr)
formants = []
for j, f in enumerate(frqs):
if f < 50:
continue
if f > max_freq - 50:
continue
formants.append((np.asscalar(f), np.asscalar(bw[j])))
missing = num_formants - len(formants)
if missing:
formants += [(None, None)] * missing
output[indices[i] / new_sr] = formants
return output
#class FormantTrackFunction(BaseAnalysisFunction):
# def __init__(self, num_formants=5, max_frequency=5000,
# time_step=0.01, window_length=0.025, window_shape='gaussian'):
# super(FormantTrackFunction, self).__init__()
# self.arguments = [num_formants, max_frequency, time_step, window_length, window_shape]
# self._function = lpc_formants
# self.requires_file = False
-141
View File
@@ -1,141 +0,0 @@
from __future__ import print_function, division
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
from torch import optim
import numpy as np
torch.manual_seed(1)
trainY = np.load("norm_cnn_timit_train_Y.npy")
testY = np.load("norm_cnn_timit_test_Y.npy")
Xtrain = np.load("norm_cnn_timit_train_X.npy").astype(np.float32)
Ytrain = trainY[:,1:5].astype(np.float32)
Xtest = np.load("norm_cnn_timit_test_X.npy").astype(np.float32)
Ytest = testY[:,1:5].astype(np.float32)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
D = Xtrain.shape[1]
K = len(Ytrain)
print(D, K)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.Conv1 = nn.Conv2d(in_channels=1, out_channels=96, kernel_size=(3, 3), stride=1, padding=0)
self.Conv2 = nn.Conv2d(in_channels=96, out_channels=32, kernel_size=(3, 3), stride=1, padding=0)
self.Conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), stride=1, padding=0)
self.Conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5, 5), stride=1, padding=0)
self.Dense5 = nn.Linear(43*38*64, 512)
self.out = nn.Linear(512, 4)
def forward(self, x):
in_size = x.size(0)
x = F.relu(self.Conv1(x))
x = F.relu(self.Conv2(x))
x = F.max_pool2d(x, kernel_size=2, stride=1)
x = F.relu(self.Conv3(x))
x = F.relu(self.Conv4(x))
x = F.max_pool2d(x, kernel_size=2, stride=1)
#print(in_size)
x = x.view(x.size(0), -1)
x = F.relu(self.Dense5(x))
return self.out(x)
def train(model, loss, optimizer, inputs, labels):
inputs = Variable(inputs.to(device))
labels = Variable(labels.to(device))
optimizer.zero_grad()
logits = model.forward(inputs)
output = loss.forward(logits, labels)
output.backward()
optimizer.step()
return output.item()
def predict(model, inputs):
inputs = Variable(inputs)
with torch.no_grad():
logits = model.forward(inputs.to(device))
return logits.data.cpu().numpy()
Xtrain = torch.from_numpy(Xtrain).float().to(device)
Ytrain = torch.from_numpy(Ytrain).float().to(device)
Xtest = torch.from_numpy(Xtest).float().to(device)
Ytest = torch.from_numpy(Ytest).float().to(device)
model = Net().to(device)
loss = nn.L1Loss()
optimizer = optim.Adagrad(model.parameters())
epochs = 80
batchSize = 32
n_batches = int(np.floor(Xtrain.size()[0]/batchSize))
costs = []
test_accuracies = []
print("Starting training ")
for i in range(epochs):
cost = 0.0
for j in range(n_batches):
#print(j, '/', n_batches)
Xbatch = Xtrain[j*batchSize:(j+1)*batchSize]
Ybatch = Ytrain[j*batchSize:(j+1)*batchSize]
cost += train(model, loss, optimizer, Xbatch, Ybatch)
loss1 = 0.0
loss2 = 0.0
loss3 = 0.0
loss4 = 0.0
max_1 = 0.0
max_2 = 0.0
max_3 = 0.0
max_4 = 0.0
list_1 = []
list_2 = []
list_3 = []
list_4 = []
print('predicting...')
Ypred = predict(model, Xtest)
for k in range(0, len(Ytest)):
# print(y_hat[i])
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
list_1.append(l1)
list_2.append(l2)
list_3.append(l3)
list_4.append(l4)
max_1 = max(max_1, l1)
max_2 = max(max_2, l2)
max_3 = max(max_3, l3)
max_4 = max(max_4, l4)
loss1 += l1
loss2 += l2
loss3 += l3
loss4 += l4
loss1 /= len(Ytest)
loss2 /= len(Ytest)
loss3 /= len(Ytest)
loss4 /= len(Ytest)
total_loss = loss1 + loss2 + loss3 + loss4
total_loss /= 4.0
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
print("Epoch: %d, acc: %.3f" % (i, total_loss))
costs.append(cost/n_batches)
test_accuracies.append(round(total_loss, 3))
torch.save(model.state_dict(), "CNN_estimate.pt")
print(test_accuracies)
-121
View File
@@ -1,121 +0,0 @@
from __future__ import print_function, division
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
from torch import optim
import numpy as np
torch.manual_seed(1)
testY = np.load("norm_cnn_timit_test_Y.npy")
Xtest = np.load("norm_cnn_timit_test_X.npy").astype(np.float32)
Ytest = testY[:,1:5].astype(np.float32)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
D = Xtest.shape
print(D)
print(Xtest.shape[1], len(Ytest))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.Conv1 = nn.Conv2d(in_channels=1, out_channels=96, kernel_size=(3, 3), stride=1, padding=0)
self.Conv2 = nn.Conv2d(in_channels=96, out_channels=32, kernel_size=(3, 3), stride=1, padding=0)
self.Conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), stride=1, padding=0)
self.Conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5, 5), stride=1, padding=0)
self.Dense5 = nn.Linear(43*38*64, 512)
self.out = nn.Linear(512, 4)
def forward(self, x):
in_size = x.size(0)
x = F.relu(self.Conv1(x))
x = F.relu(self.Conv2(x))
x = F.max_pool2d(x, kernel_size=2, stride=1)
x = F.relu(self.Conv3(x))
x = F.relu(self.Conv4(x))
x = F.max_pool2d(x, kernel_size=2, stride=1)
#print(in_size)
x = x.view(x.size(0), -1)
x = F.relu(self.Dense5(x))
return self.out(x)
def train(model, loss, optimizer, inputs, labels):
inputs = Variable(inputs.to(device))
labels = Variable(labels.to(device))
optimizer.zero_grad()
logits = model.forward(inputs)
output = loss.forward(logits, labels)
output.backward()
optimizer.step()
return output.item()
def predict(model, inputs):
inputs = Variable(inputs)
with torch.no_grad():
logits = model.forward(inputs.to(device))
return logits.data.cpu().numpy()
Xtest = torch.from_numpy(Xtest).float().to(device)
Ytest = torch.from_numpy(Ytest).float().to(device)
model = Net().to(device)
loss = nn.L1Loss()
optimizer = optim.Adagrad(model.parameters())
model.load_state_dict(torch.load("CNN_estimate.pt"))
model.eval()
loss1 = 0.0
loss2 = 0.0
loss3 = 0.0
loss4 = 0.0
max_1 = 0.0
max_2 = 0.0
max_3 = 0.0
max_4 = 0.0
list_1 = []
list_2 = []
list_3 = []
list_4 = []
print('predicting...')
Ypred1 = predict(model, Xtest[:1000])
Ypred2 = predict(model, Xtest[1000:2000])
Ypred3 = predict(model, Xtest[2000:])
Ypred = np.concatenate((Ypred1, Ypred2, Ypred3))
for k in range(0, len(Ytest)):
# print(y_hat[i])
l1 = np.abs(float(Ytest[k, 0]) - Ypred[k, 0])
l2 = np.abs(float(Ytest[k, 1]) - Ypred[k, 1])
l3 = np.abs(float(Ytest[k, 2]) - Ypred[k, 2])
l4 = np.abs(float(Ytest[k, 3]) - Ypred[k, 3])
list_1.append(l1)
list_2.append(l2)
list_3.append(l3)
list_4.append(l4)
max_1 = max(max_1, l1)
max_2 = max(max_2, l2)
max_3 = max(max_3, l3)
max_4 = max(max_4, l4)
loss1 += l1
loss2 += l2
loss3 += l3
loss4 += l4
loss1 /= len(Ytest)
loss2 /= len(Ytest)
loss3 /= len(Ytest)
loss4 /= len(Ytest)
total_loss = loss1 + loss2 + loss3 + loss4
total_loss /= 4.0
print('median: %.3f %.3f %.3f %.3f' % (np.median(list_1), np.median(list_2), np.median(list_3), np.median(list_4)))
print('max loss: %.3f %.3f %.3f %.3f' % (max_1, max_2, max_3, max_4))
print('Real test score: %.3f %.3f %.3f %.3f' % (loss1, loss2, loss3, loss4))
print("acc: %.3f" % (total_loss))