Source code for polyglotdb.acoustics.formants.refined
import math
import os
import numpy as np
from conch import analyze_segments
from polyglotdb.acoustics.formants.helper import (
extract_and_save_formant_tracks,
generate_variable_formants_point_function,
get_mahalanobis,
get_mean_SD,
save_formant_point_data,
)
from polyglotdb.acoustics.segments import generate_vowel_segments
def read_prototypes(vowel_prototypes_path):
"""Reads pre-measured means and covariance matrices from a file."""
means_covar_d = {}
with open(vowel_prototypes_path) as means_covar_file:
means_covar_lines = means_covar_file.readlines()
means_covar_header = means_covar_lines.pop(0)
prototype_parameters = means_covar_header.strip().split(",")
prototype_parameters = [
p.split("_")[0] for p in prototype_parameters if p not in ["type", "phone"]
]
print(
"READING PROTOTYPES FROM "
+ vowel_prototypes_path
+ " with parameters "
+ ", ".join(prototype_parameters)
)
for line in means_covar_lines:
splitline = line.strip().split(",")
means_covar_info_type = splitline[0]
means_covar_phone = splitline[1]
means_covar_values = [float(v) for v in splitline[2:]]
if means_covar_phone not in means_covar_d:
means_covar_d[means_covar_phone] = [[], []]
if means_covar_info_type == "means":
means_covar_d[means_covar_phone][0] = means_covar_values
elif means_covar_info_type == "matrix":
means_covar_d[means_covar_phone][1].append(means_covar_values)
return means_covar_d, prototype_parameters
[docs]
def analyze_formant_points_refinement(
corpus_context,
vowel_label="vowel",
duration_threshold=0,
num_iterations=1,
call_back=None,
stop_check=None,
vowel_prototypes_path="",
drop_formant=False,
multiprocessing=True,
output_tracks=False,
):
"""Extracts F1, F2, F3 and B1, B2, B3.
Parameters
----------
corpus_context : :class:`~polyglot.corpus.context.CorpusContext`
The CorpusContext object of the corpus.
vowel_label : str
The subset of phones to analyze.
duration_threshold : float, optional
Segments with length shorter than this value (in milliseconds) will not be analyzed.
num_iterations : int, optional
How many times the algorithm should iterate before returning values.
output_tracks : bool, optional
Whether to save only the formant values as a point at 0.33 if false or have a track over the entire
vowel duration if true.
Returns
-------
prototype_metadata : dict
Means of F1, F2, F3, B1, B2, B3 and covariance matrices per vowel class.
"""
if not corpus_context.hierarchy.has_type_subset(
"phone", vowel_label
) and not corpus_context.hierarchy.has_token_subset("phone", vowel_label):
raise Exception('Phones do not have a "{}" subset.'.format(vowel_label))
# ------------- Step 2: Varying formants -------------
# Encodes vowel inventory into a phone class if it's specified
use_vowel_prototypes = vowel_prototypes_path and os.path.exists(vowel_prototypes_path)
base_formant_columns = ["F1", "F2", "F3", "B1", "B2", "B3"]
if use_vowel_prototypes:
vowel_prototype_metadata, prototype_parameters = read_prototypes(vowel_prototypes_path)
else:
prototype_parameters = base_formant_columns
# Gets segment mapping of phones that are vowels
segment_mapping = generate_vowel_segments(
corpus_context,
duration_threshold=duration_threshold,
padding=0.1,
vowel_label=vowel_label,
)
best_data = {}
# we used to have just columns, a list of output columns and prototype columns. Now these are not the same thing
# so we have extra_columns (a list of columns in the output but not the prototypes) and prototype_parameters (a list of columns in the prototypes)
# columns = ['F1', 'F2', 'F3', 'B1', 'B2', 'B3']
# extra_columns = ['A1', 'A2', 'A3', 'Ax']
output_columns = [
"F1",
"F2",
"F3",
"B1",
"B2",
"B3",
"A1",
"A2",
"A3",
"Ax",
"A1A2diff",
"A2A3diff",
]
# print ('columns:', columns)
# print ('extra_columns:', extra_columns)
# print('output_columns:', output_columns)
log_output = []
# Measure with varying levels of formants
min_formants = 4 # Off by one error, due to how Praat measures it from F0
# This really measures with 3 formants: F1, F2, F3. And so on.
if drop_formant:
max_formants = 8
else:
max_formants = 7
default_formant = 5
formant_function = generate_variable_formants_point_function(
corpus_context, min_formants, max_formants
)
best_prototype_metadata = {}
# For each vowel token, collect the formant measurements
# Pick the best track that is closest to the averages gotten from prototypes
total_speaker_vowel_pairs = len(segment_mapping.grouped_mapping("speaker", "label").items())
for i, ((speaker, vowel), seg) in enumerate(
segment_mapping.grouped_mapping("speaker", "label").items()
):
if len(seg) == 0:
continue
print(
speaker
+ " "
+ vowel
+ ": "
+ str(i + 1)
+ " of "
+ str(total_speaker_vowel_pairs)
+ ": "
+ str(len(seg))
+ " tokens"
)
output = analyze_segments(
seg,
formant_function,
stop_check=stop_check,
multiprocessing=multiprocessing,
) # Analyze the phone
if len(seg) < 6:
print(
"Not enough observations of vowel {}, at least 6 are needed, only found {}.".format(
vowel, len(seg)
)
)
for s, data in output.items():
best_track = data[default_formant]
best_data[s] = {k: best_track[k] for j, k in enumerate(base_formant_columns)}
continue
if drop_formant:
# ADD ALL THE LEAVE-ONE-OUT CANDIDATES
for s, data in output.items():
new_data = {}
ignored_candidates = []
for candidate, measurements in data.items():
try:
As = [
measurements["A1"],
measurements["A2"],
measurements["A3"],
measurements["A4"],
]
Fs = [
math.log2(measurements["F1"]),
math.log2(measurements["F2"]),
math.log2(measurements["F3"]),
math.log2(measurements["F4"]),
]
Farray = np.array([Fs, np.ones(len(Fs))])
[slope, intercept] = np.linalg.lstsq(Farray.T, As)[0]
except Exception:
try:
As = [
measurements["A1"],
measurements["A2"],
measurements["A3"],
]
Fs = [
math.log2(measurements["F1"]),
math.log2(measurements["F2"]),
math.log2(measurements["F3"]),
]
Farray = np.array([Fs, np.ones(len(Fs))])
[slope, intercept] = np.linalg.lstsq(Farray.T, As)[0]
except Exception:
try:
As = [measurements["A1"], measurements["A2"]]
Fs = [
math.log2(measurements["F1"]),
math.log2(measurements["F2"]),
]
[slope, intercept] = [0, 0]
except Exception:
# Lack of formants for these settings
ignored_candidates.append(candidate)
continue
for leave_out in range(1, 1 + min(3, candidate)):
new_measurements = {}
new_measurements["Ax"] = measurements["A" + str(leave_out)]
candidate_name = str(candidate) + "x" + str(leave_out)
if (
leave_out < len(As)
and As[leave_out - 1] < intercept + slope * Fs[leave_out - 1]
):
this_is_droppable = True
else:
this_is_droppable = False
if this_is_droppable:
for parameter in measurements.keys():
if int(parameter[-1]) < leave_out:
new_measurements[parameter] = measurements[parameter]
elif int(parameter[-1]) > leave_out:
new_measurements[
parameter[0] + str(int(parameter[-1]) - 1)
] = measurements[parameter]
new_data[candidate_name] = new_measurements
data[candidate]["Ax"] = data[candidate]["A4"]
data = {k: v for k, v in data.items() if k not in ignored_candidates}
output[s] = {**data, **new_data}
else:
for s, data in output.items():
for candidate, measurements in data.items():
output[s][candidate]["Ax"] = output[s][candidate]["A4"]
output = {k: v for k, v in output.items() if v}
for s, data in output.items():
for candidate, measurements in data.items():
try:
output[s][candidate]["A1A2diff"] = (
data[candidate]["A1"] - data[candidate]["A2"]
)
try:
output[s][candidate]["A2A3diff"] = (
data[candidate]["A2"] - data[candidate]["A3"]
)
except Exception:
try:
output[s][candidate]["A2A3diff"] = data[candidate]["A2"]
except Exception:
output[s][candidate]["A2A3diff"] = 0
except Exception:
try:
output[s][candidate]["A1A2diff"] = data[candidate]["A1"]
except Exception:
output[s][candidate]["A1A2diff"] = 0
output[s][candidate]["A2A3diff"] = 0
selected_tracks = {}
for s, data in output.items():
try:
selected_tracks[s] = data[default_formant]
except Exception:
print(s)
print(data)
raise
if not use_vowel_prototypes:
print("no prototypes, using get_mean_SD()")
prev_prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters)
elif vowel not in vowel_prototype_metadata:
print("no prototype for", vowel, "so using get_mean_SD()")
prev_prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters)
else:
prev_prototype_metadata = vowel_prototype_metadata
if num_iterations > 1 and len(seg) < 6:
print(
"Skipping iterations for vowel {}, at least 6 tokens are needed, only found {}.".format(
vowel, len(seg)
)
)
my_iterations = [0]
else:
my_iterations = range(num_iterations)
for iteration in my_iterations:
best_numbers = []
selected_tracks = {}
prototype_means = prev_prototype_metadata[vowel][0]
# Get Mahalanobis distance between every new observation and the sample/means
covariance = np.array(prev_prototype_metadata[vowel][1])
inverse_covariance = np.linalg.pinv(covariance)
best_number = 5
for s, data in output.items():
best_distance = math.inf
best_track = 0
for number, point in data.items():
point = [point[x] if point[x] else 0 for x in prototype_parameters]
distance = get_mahalanobis(prototype_means, point, inverse_covariance)
if (
distance < best_distance
): # Update "best" measures when new best distance is found
best_distance = distance
best_track = point
best_number = number
# selected_tracks[s] = {k: best_track[i] for i, k in enumerate(columns)}
selected_tracks[s] = {k: best_track[i] for i, k in enumerate(prototype_parameters)}
# best_data[s] = {k: best_track[i] for i, k in enumerate(output_columns)}
# best_data[s] = {k: best_track[i] for i, k in enumerate(columns)}
best_data[s] = {}
for output_column in output_columns:
best_data[s][output_column] = output[s][best_number][output_column]
best_data[s]["num_formants"] = float(str(best_number).split("x")[0])
best_data[s]["Fx"] = int(str(best_number)[0])
if "x" in str(best_number):
best_data[s]["drop_formant"] = int(str(best_number).split("x")[-1])
else:
best_data[s]["drop_formant"] = 0
best_numbers.append(best_number)
if len(seg) >= 6:
prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters)
prev_prototype_metadata = prototype_metadata
best_prototype_metadata.update(prototype_metadata)
last_iteration_best_numbers = best_numbers
if iteration > 0:
changed_numbers = 0
for b_i, bn in enumerate(best_numbers):
if bn != last_iteration_best_numbers[b_i]:
changed_numbers += 1
if changed_numbers == 0:
break
log_output.append([speaker, vowel, str(len(output)), str(iteration + 1)])
for s, v, token_count, iteration_count in log_output:
print(
f"Speaker {s} for vowel {v} had {token_count} tokens and completed refinement in {iteration_count} iterations"
)
if output_tracks:
extract_and_save_formant_tracks(
corpus_context,
best_data,
num_formants=True,
multiprocessing=multiprocessing,
stop_check=stop_check,
)
else:
save_formant_point_data(corpus_context, best_data, num_formants=True)
return best_prototype_metadata