Can run from external directory. Outputs CSV. Works with TextGrids.

This commit is contained in:
Joseph Keshet
2016-06-29 21:20:34 -04:00
parent 4fb9cb39d5
commit 5940e013c5
7 changed files with 693 additions and 21 deletions
+1
View File
@@ -0,0 +1 @@
__author__ = 'jkeshet'
+16
View File
@@ -277,3 +277,19 @@ def create_features(input_wav_filename, feature_filename, begin=None, end=None,
np.savetxt(feature_filename, np.asarray(arcep_mat), delimiter=",", fmt="%s") np.savetxt(feature_filename, np.asarray(arcep_mat), delimiter=",", fmt="%s")
return arcep_mat return arcep_mat
if __name__ == "__main__":
# parse arguments
parser = argparse.ArgumentParser(description='Extract features for formants estimation.')
parser.add_argument('wav_file', default='', help="WAV audio filename (single vowel or an whole utternace)")
parser.add_argument('feature_file', default='', help="output feature text file")
parser.add_argument('--begin', help="beginning time in the WAV file", default=0.0, type=float)
parser.add_argument('--end', help="end time in the WAV file", default=-1.0, type=float)
args = parser.parse_args()
if args.begin > 0.0 or args.end > 0.0:
create_features(args.wav_file, args.feature_file, args.begin, args.end)
else:
create_features(args.wav_file, args.feature_file)
+46 -21
View File
@@ -1,25 +1,11 @@
import extract_features as features import extract_features as features
from subprocess import call
import sys
import argparse import argparse
import tempfile from helpers.textgrid import *
from helpers.utilities import *
import shutil
def predict_from_times(wav_filename, preds_filename, begin, end):
def easy_call(command, debug_mode=True):
try:
if debug_mode:
print >>sys.stderr, command
call(command, shell=True)
except Exception as exception:
print "Error: could not execute the following"
print ">>", command
print type(exception) # the exception instance
print exception.args # arguments stored in .args
exit(-1)
def main(wav_filename, preds_filename, begin, end):
tmp_features_filename = tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + ".txt" tmp_features_filename = tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + ".txt"
print tmp_features_filename print tmp_features_filename
@@ -31,14 +17,53 @@ def main(wav_filename, preds_filename, begin, end):
easy_call("th load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename) easy_call("th load_tracking_model.lua " + tmp_features_filename + ' ' + preds_filename)
def predict_from_textgrid(wav_filename, preds_filename, textgrid_filename, textgrid_tier):
print wav_filename
if os.path.exists(preds_filename):
os.remove(preds_filename)
textgrid = TextGrid()
# read TextGrid
textgrid.read(textgrid_filename)
# extract tier names
tier_names = textgrid.tierNames()
if textgrid_tier in tier_names:
tier_index = tier_names.index(textgrid_tier)
# run over all intervals in the tier
for interval in textgrid[tier_index]:
if re.search(r'\S', interval.mark()):
tmp_features_filename = generate_tmp_filename()
tmp_preds = generate_tmp_filename()
features.create_features(wav_filename, tmp_features_filename, interval.xmin(), interval.xmax())
easy_call("th load_estimation_model.lua " + tmp_features_filename + ' ' + tmp_preds)
csv_append_row(tmp_preds, preds_filename)
else: # process first tier
for interval in textgrid[0]:
if re.search(r'\S', interval.mark()):
tmp_features_filename = generate_tmp_filename()
tmp_preds = generate_tmp_filename()
features.create_features(wav_filename, tmp_features_filename, interval.xmin(), interval.xmax())
easy_call("th load_estimation_model.lua " + tmp_features_filename + ' ' + tmp_preds)
csv_append_row(tmp_preds, preds_filename)
if __name__ == "__main__": if __name__ == "__main__":
# parse arguments # parse arguments
parser = argparse.ArgumentParser(description='Extract features for formants estimation.') parser = argparse.ArgumentParser(description='Estimation and tracking of formants.')
parser.add_argument('wav_file', default='', help="WAV audio filename (single vowel or an whole utternace)") parser.add_argument('wav_file', default='', help="WAV audio filename (single vowel or an whole utternace)")
parser.add_argument('formants_file', default='', help="output formant text file") parser.add_argument('formants_file', default='', help="output formant CSV file")
parser.add_argument('--textgrid_filename', default='', help="get beginning and end times from a TextGrid file")
parser.add_argument('--textgrid_tier', default='', help="a tier name with portion to process (default first tier)")
parser.add_argument('--begin', help="beginning time in the WAV file", default=0.0, type=float) parser.add_argument('--begin', help="beginning time in the WAV file", default=0.0, type=float)
parser.add_argument('--end', help="end time in the WAV file", default=-1.0, type=float) parser.add_argument('--end', help="end time in the WAV file", default=-1.0, type=float)
args = parser.parse_args() args = parser.parse_args()
main(args.wav_file, args.formants_file, args.begin, args.end) if args.textgrid_filename:
predict_from_textgrid(args.wav_file, args.formants_file, args.textgrid_filename, args.textgrid_tier)
else:
predict_from_times(args.wav_file, args.formants_file, args.begin, args.end)
+1
View File
@@ -0,0 +1 @@
__author__ = 'jkeshet'
+413
View File
@@ -0,0 +1,413 @@
# This file is a slightly modified version of the textgrid.py module
# (https://github.com/kylebgorman/textgrid/), which was released under the following license:
# (see https://github.com/kylebgorman/textgrid/blob/master/LICENSE)
#
# Copyright (c) 2011-2013 Kyle Gorman, Max Bane, Morgan Sonderegger
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
import logging
import re
class mlf:
"""
read in a HTK .mlf file. iterating over it gives you a list of
TextGrids
"""
def __init__(self, file):
self.__items = []
self.__n = 0
text = open(file, 'r')
text.readline() # get rid of header
while 1: # loop over text
name = text.readline()[1:-1]
if name:
grid = TextGrid()
phon = IntervalTier('phones')
word = IntervalTier('words')
wmrk = ''
wsrt = 0.
wend = 0.
while 1: # loop over the lines in each grid
line = text.readline().rstrip().split()
if len(line) == 4: # word on this baby
pmin = float(line[0]) / 10e6
pmax = float(line[1]) / 10e6
phon.append(Interval(pmin, pmax, line[2]))
if wmrk:
word.append(Interval(wsrt, wend, wmrk))
wmrk = line[3]
wsrt = pmin
wend = pmax
elif len(line) == 3: # just phone
pmin = float(line[0]) / 10e6
pmax = float(line[1]) / 10e6
phon.append(Interval(pmin, pmax, line[2]))
wend = pmax
else: # it's a period
word.append(Interval(wsrt, wend, wmrk))
self.__items.append(grid)
break
grid.append(phon)
grid.append(word)
self.__n += 1
else:
text.close()
break
def __iter__(self):
return iter(self.__items)
def __len__(self):
return self.__n
def __str__(self):
return '<MLF instance with %d TextGrids>' % self.__n
class TextGrid:
""" represents Praat TextGrids as list of different types of tiers """
def __init__(self, name = None):
self.__tiers = []
self.__n = 0
self.__xmin = None
self.__xmax = None
self.__name = name # this is just for the MLF case
def __str__(self):
return '<TextGrid with %d tiers>' % self.__n
def __iter__(self):
return iter(self.__tiers)
def __len__(self):
return self.__n
def __getitem__(self, i):
""" return the (i-1)th tier """
return self.__tiers[i]
# Morgan Sonderegger
def tierNames(self, case=None):
names = [t.name() for t in self.__tiers]
if(case=="lower"):
names = [n.lower() for n in names]
return names
def xmin(self):
return self.__xmin
def xmax(self):
return self.__xmax
def append(self, tier):
self.__tiers.append(tier)
## JosephKeshet
if self.__xmin is None:
self.__xmin = tier.xmin()
else:
self.__xmin = min(tier.xmin(), self.__xmin)
## JosephKeshet
self.__xmax = max(tier.xmax(), self.__xmax)
## JosephKeshet / MS
if self.__xmax is None:
self.__xmax = tier.xmax()
else:
self.__xmax = max(tier.xmax(), self.__xmax)
self.__n += 1
def read(self, file):
""" read TextGrid from Praat .TextGrid file """
text = open(file, 'r')
text.readline() # header crap
text.readline()
text.readline()
self.__xmin = float(text.readline().rstrip().split()[2])
self.__xmax = float(text.readline().rstrip().split()[2])
text.readline()
m = int(text.readline().rstrip().split()[2]) # will be self.__n soon
text.readline()
for i in range(m): # loop over grids
text.readline()
if text.readline().rstrip().split()[2] == '"IntervalTier"':
# inam = text.readline().rstrip().split()[2][1:-1]
inam = text.readline().split('=')[1].strip().strip('"') # Joseph Keshet: handle space in the tier name
imin = float(text.readline().rstrip().split()[2])
imax = float(text.readline().rstrip().split()[2])
itie = IntervalTier(inam, imin, imax) # redundant FIXME
n = int(text.readline().rstrip().split()[3])
for j in range(n):
try:
text.readline().rstrip().split() # header junk
jmin = float(text.readline().rstrip().split()[2])
jmax = float(text.readline().rstrip().split()[2])
# Morgan Sonderegger changed, to account for intervals where label
# begins with spacing
#jmrk = text.readline().rstrip().split()[2][1:-1]
#jmrk = text.readline().split('=')[1].strip().strip('"') # Joseph Keshet: handle space in the
# tier
# name
jmrk = getMark(text)
#
itie.append(Interval(jmin, jmax, jmrk))
except:
logging.error("Unable to parse TextGrid %s." % text.name)
self.append(itie)
else: # pointTier
# inam = text.readline().rstrip().split()[2][1:-1]
inam = text.readline().split('=')[1].strip().strip('"') # Joseph Keshet: handle space in the tier name
imin = float(text.readline().rstrip().split()[2])
imax = float(text.readline().rstrip().split()[2])
itie = PointTier(inam, imin, imax) # redundant FIXME
n = int(text.readline().rstrip().split()[3])
for j in range(n):
text.readline().rstrip() # header junk
jtim = float( text.readline().rstrip().split()[2])
jmrk = text.readline().rstrip().split()[2][1:-1]
itie.append(Point(jtim, jmrk))
self.append(itie)
text.close()
def write(self, text):
""" write it into a text file that Praat can read """
text = open(text, 'w')
text.write('File type = "ooTextFile"\n')
text.write('Object class = "TextGrid"\n\n')
text.write('xmin = %f\n' % self.__xmin)
text.write('xmax = %f\n' % self.__xmax)
text.write('tiers? <exists>\n')
text.write('size = %d\n' % self.__n)
text.write('item []:\n')
for (tier, n) in zip(self.__tiers, range(1, self.__n + 1)):
text.write('\titem [%d]:\n' % n)
if tier.__class__ == IntervalTier:
text.write('\t\tclass = "IntervalTier"\n')
text.write('\t\tname = "%s"\n' % tier.name())
text.write('\t\txmin = %f\n' % tier.xmin())
text.write('\t\txmax = %f\n' % tier.xmax())
text.write('\t\tintervals: size = %d\n' % len(tier))
for (interval, o) in zip(tier, range(1, len(tier) + 1)):
text.write('\t\t\tintervals [%d]:\n' % o)
text.write('\t\t\t\txmin = %f\n' % interval.xmin())
text.write('\t\t\t\txmax = %f\n' % interval.xmax())
text.write('\t\t\t\ttext = "%s"\n' % interval.mark())
else: # PointTier
text.write('\t\tclass = "TextTier"\n')
text.write('\t\tname = "%s"\n' % tier.name())
text.write('\t\txmin = %f\n' % tier.xmin())
text.write('\t\txmax = %f\n' % tier.xmax())
text.write('\t\tpoints: size = %d\n' % len(tier))
for (point, o) in zip(tier, range(1, len(tier) + 1)):
text.write('\t\t\tpoints [%d]:\n' % o)
text.write('\t\t\t\ttime = %f\n' % point.time())
text.write('\t\t\t\tmark = "%s"\n' % point.mark())
text.close()
class IntervalTier:
""" represents IntervalTier as a list plus some features: min/max time,
size, and tier name """
def __init__(self, name = None, xmin = None, xmax = None):
self.__n = 0
self.__name = name
self.__xmin = xmin
self.__xmax = xmax
self.__intervals = []
def __str__(self):
return '<IntervalTier "%s" with %d points>' % (self.__name, self.__n)
def __iter__(self):
return iter(self.__intervals)
def __len__(self):
return self.__n
def __getitem__(self, i):
""" return the (i-1)th interval """
return self.__intervals[i]
def xmin(self):
return self.__xmin
def xmax(self):
return self.__xmax
def name(self):
return self.__name
def append(self, interval):
self.__intervals.append(interval)
self.__xmax = interval.xmax()
self.__n += 1
# Morgan Sonderegger added
def remove(self, interval):
logging.debug("removing %d" % interval.xmin())
self.__intervals.remove(interval)
self.__n -= 1
def read(self, file):
text = open(file, 'r')
text.readline() # header junk
text.readline()
text.readline()
self.__xmin = float(text.readline().rstrip().split()[2])
self.__xmax = float(text.readline().rstrip().split()[2])
self.__n = int(text.readline().rstrip().split()[3])
for i in range(self.__n):
text.readline().rstrip() # header
imin = float(text.readline().rstrip().split()[2])
imax = float(text.readline().rstrip().split()[2])
# imrk = text.readline().rstrip().split()[2].replace('"', '') # txt
imrk = text.readline().split('=')[1].strip().strip('"') # Joseph Keshet: handle space in the mark
self.__intervals.append(Interval(imin, imax, imrk))
text.close()
def write(self, file):
text = open(file, 'w')
text.write('File type = "ooTextFile"\n')
text.write('Object class = "IntervalTier"\n\n')
text.write('xmin = %f\n' % self.__xmin)
text.write('xmax = %f\n' % self.__xmax)
text.write('intervals: size = %d\n' % self.__n)
for (interval, n) in zip(self.__intervals, range(1, self.__n + 1)):
text.write('intervals [%d]:\n' % n)
text.write('\txmin = %f\n' % interval.xmin())
text.write('\txmax = %f\n' % interval.xmax())
text.write('\ttext = "%s"\n' % interval.mark())
text.close()
class PointTier:
""" represents PointTier (also called TextTier for some reason) as a list
plus some features: min/max time, size, and tier name """
def __init__(self, name = None, xmin = None, xmax = None):
self.__n = 0
self.__name = name
self.__xmin = xmin
self.__xmax = xmax
self.__points = []
def __str__(self):
return '<PointTier "%s" with %d points>' % (self.__name, self.__n)
def __iter__(self):
return iter(self.__points)
def __len__(self):
return self.__n
def __getitem__(self, i):
""" return the (i-1)th tier """
return self.__points[i]
def name(self):
return self.__name
def xmin(self):
return self.__xmin
def xmax(self):
return self.__xmax
def append(self, point):
self.__points.append(point)
## MS: points don't have xmax, right?
# self.__xmax = point.xmax()
if self.__xmax is None:
self.__xmax = point.time()
else:
self.__max = max(point.time(), self.__xmax)
## MS: do we then need to do this for xmin as well?
self.__n += 1
def read(self, file):
text = open(file, 'r')
text.readline() # header junk
text.readline()
text.readline()
self.__xmin = float(text.readline().rstrip().split()[2])
self.__xmax = float(text.readline().rstrip().split()[2])
self.__n = int(text.readline().rstrip().split()[3])
for i in range(self.__n):
text.readline().rstrip() # header
itim = float(text.readline().rstrip().split()[2])
imrk = text.readline().rstrip().split()[2].replace('"', '') # txt
self.__points.append(Point(imrk, itim))
text.close()
def write(self, file):
text = open(file, 'w')
text.write('File type = "ooTextFile"\n')
text.write('Object class = "TextTier"\n\n')
text.write('xmin = %f\n' % self.__xmin)
text.write('xmax = %f\n' % self.__xmax)
text.write('points: size = %d\n' % self.__n)
for (point, n) in zip(self.__points, range(1, self.__n + 1)):
text.write('points [%d]:\n' % n)
text.write('\ttime = %f\n' % point.time())
text.write('\tmark = "%s"\n' % point.mark())
text.close()
class Interval:
""" represent an Interval """
def __init__(self, xmin, xmax, mark):
self.__xmin = xmin
self.__xmax = xmax
self.__mark = mark
def __str__(self):
return '<Interval "%s" %f:%f>' % (self.__mark, self.__xmin, self.__xmax)
def xmin(self):
return self.__xmin
def xmax(self):
return self.__xmax
# Morgan Sonderegger added
def bounds(self):
return (self.__xmin, self.__xmax)
def mark(self):
return self.__mark
class Point:
""" represent a Point """
def __init__(self, time, mark):
self.__time = time
self.__mark = mark
def __str__(self):
return '<Point "%s" at %f>' % (self.__mark, self.__time)
def time(self):
return self.__time
def mark(self):
return self.__mark
# Morgan Sonderegger added: account for intervals with writing beginning with whitespace
#def correctLine(line):
def getMark(text):
line = text.readline().rstrip()
a = re.search('(\S+) (=) (".*")', line)
assert(a)
assert(len(a.groups())==3)
return a.groups()[2][1:-1]
+45
View File
@@ -0,0 +1,45 @@
import argparse
import csv
import os
from textgrid import *
if __name__ == "__main__":
# parse arguments
parser = argparse.ArgumentParser(description='Convert a VOT tier fo a TextGrid to a CSV file. The CSV file will '
'contain the filename, the duration of the mark, and the mark name.')
parser.add_argument('textgrid_filename', help="name of an input TextGrid file")
parser.add_argument('csv_filename', help="name of an output CSV file.")
parser.add_argument('tier', help='the tier name of the TextGrid that should be converted to CSV.')
args = parser.parse_args()
out_file = open(args.csv_filename, 'wb')
csv_file = csv.writer(out_file)
csv_file.writerow(['textgrid_file','time','vot','mark'])
# read TextGrid
textgrid = TextGrid()
textgrid.read(args.textgrid_filename)
# extract tier names
tier_names = textgrid.tierNames()
basename = os.path.splitext(os.path.basename(args.textgrid_filename))[0]
# check if the VOT tier is one of the tiers in the TextGrid
if args.tier in tier_names:
tier_index = tier_names.index(args.tier)
# run over all intervals in the tier
for interval in textgrid[tier_index]:
if re.search(r'\S', interval.mark()):
intervals = list()
intervals.append(basename)
intervals.append("{:.3f}".format(interval.xmin()))
intervals.append("{:.3f}".format(interval.xmax()-interval.xmin()))
intervals.append(interval.mark())
csv_file.writerow(intervals)
#print intervals
# close CSV file
out_file.close()
+171
View File
@@ -0,0 +1,171 @@
# Copyright (c) 2014 Joseph Keshet, Morgan Sonderegger, Thea Knowles
#
# This file is part of Autovot, a package for automatic extraction of
# voice onset time (VOT) from audio files.
#
# Autovot is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# Autovot is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with Autovot. If not, see
# <http://www.gnu.org/licenses/>.
#
import subprocess
import random
import logging
import wave
import tempfile
import os
def csv_append_row(tmp_preds, preds_filename, with_headers=True):
if with_headers:
skip_header = True
all_lines = list()
# check if the CSV file exists
if os.path.isfile(preds_filename):
# read it lines
for line in open(preds_filename, 'r'):
all_lines.append(line)
else:
# if the file does not exist it does not have headers and they should be copied
skip_header = False
# check if there is a header
for line in open(tmp_preds, 'r'):
if skip_header:
skip_header = False
else:
all_lines.append(line)
# now dump everything back
with open(preds_filename, 'w') as f:
for line in all_lines:
f.write(line)
def generate_tmp_filename():
return tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + ".txt"
def logging_defaults(logging_level="INFO"):
logging.basicConfig(level=logging_level, format='%(asctime)s.%(msecs)d [%(filename)s] %(levelname)s: %(message)s',
datefmt='%H:%M:%S')
def num_lines(filename):
lines = 0
for _ in open(filename, 'rU'):
lines += 1
return lines
def easy_call(command):
try:
logging.debug(command)
return_code = subprocess.call(command, shell=True)
if return_code == 127 or return_code < 0:
logging.debug('Return code: %d' % return_code)
exit(-1)
except Exception as exception:
logging.error('Could not execute the following:')
logging.error(command)
logging.error('%s - %s' % (type(exception), exception.args))
exit(-1)
def random_shuffle_data(in_features_filename, in_labels_filename, out_features_filename, out_labels_filename):
# open files
in_features = open(in_features_filename, 'rU')
in_labels = open(in_labels_filename, 'rU')
# read infra text header
header = in_labels.readline()
dims = header.split()
# read file lines
lines = list()
for x, y in zip(in_features, in_labels):
lines.append((x, y))
if len(lines) != int(dims[0]):
logging.error("Either the feature file and the label file are not the same length of label file missing a "
"header")
exit(-1)
# close files
in_features.close()
in_labels.close()
# random shuffle the instances
random.shuffle(lines)
# write back the result
out_features = open(out_features_filename, 'w')
out_labels = open(out_labels_filename, 'w')
# write labels header
header = "%s %s\n" % (dims[0], dims[1])
out_labels.write(header)
# write data
for x, y in lines:
out_features.write(x)
out_labels.write(y)
# close files
out_features.close()
out_labels.close()
return len(lines)
def extract_lines(input_filename, output_filename, lines_range, has_header=False):
if lines_range[0] >= lines_range[1]:
logging.error("Range should be causal.")
exit(-1)
input_file = open(input_filename, 'rU')
output_file = open(output_filename, 'w')
if has_header:
header = input_file.readline().strip().split()
new_header = "%d 2\n" % (lines_range[1]-lines_range[0]+1)
output_file.write(new_header)
for line_num, line in enumerate(input_file):
if lines_range[0] <= line_num <= lines_range[1]:
output_file.write(line)
input_file.close()
output_file.close()
def is_textgrid(filename):
try:
file = open(filename, 'rU')
first_line = file.readline()
except:
return False
if "ooTextFile" in first_line:
return True
return False
def is_valid_wav(filename):
# check the sampling rate and number bits of the WAV
try:
wav_file = wave.Wave_read(filename)
except:
return False
if wav_file.getframerate() != 16000 or wav_file.getsampwidth() != 2 or wav_file.getnchannels() != 1 \
or wav_file.getcomptype() != 'NONE':
return False
return True