diff --git a/README.md b/README.md index a1a0d3d..6864ffb 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,7 @@ -DeepFormants -============ +# DeepFormants - PyTorch Shua Dissen (shua.dissen@gmail.com) -Joseph Keshet (joseph.keshet@biu.ac.il) - +Joseph Keshet (joseph.keshet@biu.ac.il) DeepFormants is a software package for formant tracking and estimation, using two algorithms based on deep networks. It works as follows: * The user provides a wav file with an initial stop consonant. @@ -14,8 +12,6 @@ DeepFormants is a software package for formant tracking and estimation, using tw This is a beta version of DeepFormants. Any reports of bugs, comments on how to improve the software or documentation, or questions are greatly appreciated, and should be sent to the authors at the addresses given above. ---- - ## Installation instructions diff --git a/extract_features.py b/extract_features.py index b9dc120..2b074a1 100644 --- a/extract_features.py +++ b/extract_features.py @@ -5,6 +5,8 @@ import numpy as np import wave import os import math + +from inaSpeechSegmenter.features import to_wav from scipy.fftpack.realtransforms import dct from scipy.signal import lfilter, hamming from scipy.fftpack import fft, ifft @@ -16,8 +18,8 @@ epsilon = 0.0000000001 prefac = .97 -def build_data(wav,begin=None,end=None): - wav_in_file = wave.Wave_read(wav) +def build_data(wav, begin=None,end=None): + wav_in_file = wave.Wave_read(str(wav)) wav_in_num_samples = wav_in_file.getnframes() N = wav_in_file.getnframes() dstr = wav_in_file.readframes(N) @@ -264,14 +266,13 @@ def build_single_feature_row(data, Atal): def create_features(input_wav_filename, feature_filename, begin=None, end=None, Atal=False): - tmp_wav16_filename = generate_tmp_filename("wav") - easy_call("sox " + input_wav_filename + " -c 1 -r 16000 " + tmp_wav16_filename) - X = build_data(tmp_wav16_filename, begin, end) + wav = to_wav(input_wav_filename) + X = build_data(wav, begin, end) if begin is not None and end is not None: arr = [input_wav_filename] arr.extend(build_single_feature_row(X, Atal)) np.savetxt(feature_filename, np.asarray([arr]), delimiter=",", fmt="%s") - os.remove(tmp_wav16_filename) + os.remove(wav) return arr arcep_mat = [] for i in range(len(X)): @@ -280,7 +281,7 @@ def create_features(input_wav_filename, feature_filename, begin=None, end=None, arcep_mat.append(arr) np.savetxt(feature_filename, np.asarray(arcep_mat), delimiter=",", fmt="%s") - os.remove(tmp_wav16_filename) + os.remove(wav) return arcep_mat diff --git a/formants.py b/formants.py index bd692f1..62eca27 100644 --- a/formants.py +++ b/formants.py @@ -11,18 +11,19 @@ def predict_from_times(wav_filename, preds_filename, begin, end, csv_export=True print("Input Array Path: " + tmp_features_filename) predictions = None - if begin > 0.0 or end > 0.0: - print(wav_filename + " interval " + str(begin) + "-" + str(end) + ":") - features.create_features(wav_filename, tmp_features_filename, begin, end) - predictions = load_estimation_model(tmp_features_filename, preds_filename, begin, end, csv_export=csv_export) + # if begin > 0.0 or end > 0.0: + print(wav_filename + " interval " + str(begin) + "-" + str(end) + ":") + features.create_features(wav_filename, tmp_features_filename, begin, end) + predictions = load_estimation_model(tmp_features_filename, preds_filename, begin, end, csv_export=csv_export) #easy_call("luajit load_estimation_model.lua " + tmp_features_filename + ' ' + preds_filename) - else: - features.create_features(wav_filename, tmp_features_filename) - easy_call("luajit load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename) + # else: + # features.create_features(wav_filename, tmp_features_filename) + # easy_call("luajit load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename) delete_temp_files() return predictions + def predict_from_textgrid(wav_filename, preds_filename, textgrid_filename, textgrid_tier): print(wav_filename) @@ -37,13 +38,13 @@ def predict_from_textgrid(wav_filename, preds_filename, textgrid_filename, textg # extract tier names tier_names = textgrid.tierNames() - + if textgrid_tier in tier_names: # run over all intervals in the tier tier_index = tier_names.index(textgrid_tier) textgrid_tier = textgrid[tier_index] else: # process first tier textgrid_tier = textgrid[0] - + for interval in textgrid_tier: if re.search(r'\S', interval.mark()): tmp_features_filename = generate_tmp_filename("features") diff --git a/helpers/conch_lpc.py b/helpers/conch_lpc.py index 1e02cf4..ff1d3f7 100644 --- a/helpers/conch_lpc.py +++ b/helpers/conch_lpc.py @@ -27,21 +27,22 @@ # THE SOFTWARE. #import librosa +import librosa import numpy as np import scipy as sp +from numba import njit from scipy.signal import lfilter from scipy.fftpack import fft, ifft -from scipy.signal import gaussian +from scipy.signal.windows import gaussian -#from ..helper import nextpow2 -#from ..functions import BaseAnalysisFunction -# Source: https://github.com/mmcauliffe/Conch-sounds/blob/master/conch/analysis/helper.py -def nextpow2(x): +@njit +def next_pow_2(x: float) -> int: """Return the first integer N such that 2**N >= abs(x)""" return np.ceil(np.log2(np.abs(x))) + def lpc_ref(signal, order): """Compute the Linear Prediction Coefficients. @@ -175,7 +176,7 @@ def acorr_lpc(x, axis=-1): raise ValueError("Complex input not supported yet") maxlag = x.shape[axis] - nfft = int(2 ** nextpow2(2 * maxlag - 1)) + nfft = int(2 ** next_pow_2(2 * maxlag - 1)) if axis != -1: x = np.swapaxes(x, -1, axis) diff --git a/load_estimation_model.py b/load_estimation_model.py index 9891158..b122174 100644 --- a/load_estimation_model.py +++ b/load_estimation_model.py @@ -2,6 +2,7 @@ import torch import torch.nn as nn from functools import reduce + class LambdaBase(nn.Sequential): def __init__(self, fn, *args): super(LambdaBase, self).__init__(*args) @@ -13,57 +14,60 @@ class LambdaBase(nn.Sequential): output.append(module(input)) return output if output else input + class Lambda(LambdaBase): def forward(self, input): return self.lambda_func(self.forward_prepare(input)) + class LambdaMap(LambdaBase): def forward(self, input): - return list(map(self.lambda_func,self.forward_prepare(input))) + return list(map(self.lambda_func, self.forward_prepare(input))) + class LambdaReduce(LambdaBase): def forward(self, input): - return reduce(self.lambda_func,self.forward_prepare(input)) + return reduce(self.lambda_func, self.forward_prepare(input)) def load_estimation_model(inputfilename, outputfilename, begin, end, csv_export=True): - with open(inputfilename, "r") as rf: - contents = rf.read() - contents = contents.split(",") + with open(inputfilename, "r") as rf: + contents = rf.read() + contents = contents.split(",") - data = torch.Tensor(1,350) - name = "" - for i in range(len(contents)): - if i == 0: - name = contents[i].strip() - else: - val = float(contents[i].strip()) - data[0][i-1] = val + data = torch.Tensor(1, 350) + name = "" + for i in range(len(contents)): + if i == 0: + name = contents[i].strip() + else: + val = float(contents[i].strip()) + data[0][i - 1] = val - model = nn.Sequential( # Sequential, - nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(350,1024)), # Linear, - nn.Sigmoid(), - nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(1024,512)), # Linear, - nn.Sigmoid(), - nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(512,256)), # Linear, - nn.Sigmoid(), - nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(256,4)), # Linear, - ) + model = nn.Sequential( + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(350, 1024)), + nn.Sigmoid(), + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(1024, 512)), + nn.Sigmoid(), + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(512, 256)), + nn.Sigmoid(), + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(256, 4)), + ) - model.load_state_dict(torch.load("em.pth")) - my_prediction = model.forward(data) + model.load_state_dict(torch.load("em.pth")) + my_prediction = model.forward(data) - prediction_dict = {} - prediction_dict["F1"] = 1000 * float(my_prediction[0][0]) - prediction_dict["F2"] = 1000 * float(my_prediction[0][1]) - prediction_dict["F3"] = 1000 * float(my_prediction[0][2]) - prediction_dict["F4"] = 1000 * float(my_prediction[0][3]) + prediction_dict = {} + prediction_dict["F1"] = 1000 * float(my_prediction[0][0]) + prediction_dict["F2"] = 1000 * float(my_prediction[0][1]) + prediction_dict["F3"] = 1000 * float(my_prediction[0][2]) + prediction_dict["F4"] = 1000 * float(my_prediction[0][3]) - if csv_export: - with open(outputfilename, "w") as wf: - wf.write("NAME,begin,end,F1,F2,F3,F4\n") - wf.write(name + "," + str(begin) + "," + str(end) + "," + \ - str(prediction_dict["F1"]) + "," + str(prediction_dict["F2"]) + "," + \ - str(prediction_dict["F3"]) + "," + str(prediction_dict["F4"]) + "\n") - - return prediction_dict + if csv_export: + with open(outputfilename, "w") as wf: + wf.write("NAME,begin,end,F1,F2,F3,F4\n") + wf.write(name + "," + str(begin) + "," + str(end) + "," + \ + str(prediction_dict["F1"]) + "," + str(prediction_dict["F2"]) + "," + \ + str(prediction_dict["F3"]) + "," + str(prediction_dict["F4"]) + "\n") + + return prediction_dict