Source code for polyglotdb.acoustics.formants.refined

import math
import os
import numpy as np

from conch import analyze_segments

from ..segments import generate_vowel_segments
from .helper import generate_variable_formants_point_function, get_mahalanobis, get_mean_SD, \
    save_formant_point_data, extract_and_save_formant_tracks


def read_prototypes(vowel_prototypes_path):
    """Reads pre-measured means and covariance matrices from a file.
    """
    # print ('READING PROTOTYPES FROM /phon/SPADE/test_priors.csv')
    # print ('READING PROTOTYPES FROM /phon/SPADE/ral_prototypes.csv')
    means_covar_d = {}

    with open(vowel_prototypes_path) as means_covar_file:
        means_covar_lines = means_covar_file.readlines()
        means_covar_header = means_covar_lines.pop(0)
        prototype_parameters = means_covar_header.strip().split(',')
        prototype_parameters = [p.split('_')[0] for p in prototype_parameters if not p in ['type', 'phone']]
        print(
            'READING PROTOTYPES FROM ' + vowel_prototypes_path + ' with parameters ' + ', '.join(prototype_parameters))
        for line in means_covar_lines:
            splitline = line.strip().split(',')
            means_covar_info_type = splitline[0]
            means_covar_phone = splitline[1]
            means_covar_values = [float(v) for v in splitline[2:]]

            if not means_covar_phone in means_covar_d:
                means_covar_d[means_covar_phone] = [[], []]

            if means_covar_info_type == 'means':
                means_covar_d[means_covar_phone][0] = means_covar_values
            elif means_covar_info_type == 'matrix':
                means_covar_d[means_covar_phone][1].append(means_covar_values)

    return means_covar_d, prototype_parameters


[docs] def analyze_formant_points_refinement(corpus_context, vowel_label='vowel', duration_threshold=0, num_iterations=1, call_back=None, stop_check=None, vowel_prototypes_path='', drop_formant=False, multiprocessing=True, output_tracks=False ): """Extracts F1, F2, F3 and B1, B2, B3. Parameters ---------- corpus_context : :class:`~polyglot.corpus.context.CorpusContext` The CorpusContext object of the corpus. vowel_label : str The subset of phones to analyze. duration_threshold : float, optional Segments with length shorter than this value (in milliseconds) will not be analyzed. num_iterations : int, optional How many times the algorithm should iterate before returning values. output_tracks : bool, optional Whether to save only the formant values as a point at 0.33 if false or have a track over the entire vowel duration if true. Returns ------- prototype_metadata : dict Means of F1, F2, F3, B1, B2, B3 and covariance matrices per vowel class. """ if not corpus_context.hierarchy.has_type_subset('phone', vowel_label) and not corpus_context.hierarchy.has_token_subset('phone', vowel_label): raise Exception('Phones do not have a "{}" subset.'.format(vowel_label)) # ------------- Step 2: Varying formants ------------- # Encodes vowel inventory into a phone class if it's specified use_vowel_prototypes = vowel_prototypes_path and os.path.exists(vowel_prototypes_path) base_formant_columns = ['F1', 'F2', 'F3', 'B1', 'B2', 'B3'] if use_vowel_prototypes: vowel_prototype_metadata, prototype_parameters = read_prototypes(vowel_prototypes_path) else: prototype_parameters = base_formant_columns # Gets segment mapping of phones that are vowels segment_mapping = generate_vowel_segments(corpus_context, duration_threshold=duration_threshold, padding=0.1, vowel_label=vowel_label) best_data = {} # we used to have just columns, a list of output columns and prototype columns. Now these are not the same thing # so we have extra_columns (a list of columns in the output but not the prototypes) and prototype_parameters (a list of columns in the prototypes) # columns = ['F1', 'F2', 'F3', 'B1', 'B2', 'B3'] # extra_columns = ['A1', 'A2', 'A3', 'Ax'] output_columns = ['F1', 'F2', 'F3', 'B1', 'B2', 'B3', 'A1', 'A2', 'A3', 'Ax', 'A1A2diff', 'A2A3diff'] # print ('columns:', columns) # print ('extra_columns:', extra_columns) # print('output_columns:', output_columns) log_output = [] # Measure with varying levels of formants min_formants = 4 # Off by one error, due to how Praat measures it from F0 # This really measures with 3 formants: F1, F2, F3. And so on. if drop_formant: max_formants = 8 else: max_formants = 7 default_formant = 5 formant_function = generate_variable_formants_point_function(corpus_context, min_formants, max_formants) best_prototype_metadata = {} # For each vowel token, collect the formant measurements # Pick the best track that is closest to the averages gotten from prototypes total_speaker_vowel_pairs = len(segment_mapping.grouped_mapping('speaker', 'label').items()) for i, ((speaker, vowel), seg) in enumerate(segment_mapping.grouped_mapping('speaker', 'label').items()): if len(seg) == 0: continue print(speaker + ' ' + vowel + ': ' + str(i + 1) + ' of ' + str(total_speaker_vowel_pairs) + ': ' + str( len(seg)) + ' tokens') output = analyze_segments(seg, formant_function, stop_check=stop_check, multiprocessing=multiprocessing) # Analyze the phone if len(seg) < 6: print("Not enough observations of vowel {}, at least 6 are needed, only found {}.".format(vowel, len(seg))) for s, data in output.items(): best_track = data[default_formant] best_data[s] = {k: best_track[k] for j, k in enumerate(base_formant_columns)} continue if drop_formant: # ADD ALL THE LEAVE-ONE-OUT CANDIDATES for s, data in output.items(): new_data = {} ignored_candidates = [] for candidate, measurements in data.items(): try: As = [measurements['A1'], measurements['A2'], measurements['A3'], measurements['A4']] Fs = [math.log2(measurements['F1']), math.log2(measurements['F2']), math.log2(measurements['F3']), math.log2(measurements['F4'])] Farray = np.array([Fs, np.ones(len(Fs))]) [slope, intercept] = np.linalg.lstsq(Farray.T, As)[0] except: try: As = [measurements['A1'], measurements['A2'], measurements['A3']] Fs = [math.log2(measurements['F1']), math.log2(measurements['F2']), math.log2(measurements['F3'])] Farray = np.array([Fs, np.ones(len(Fs))]) [slope, intercept] = np.linalg.lstsq(Farray.T, As)[0] except: try: As = [measurements['A1'], measurements['A2']] Fs = [math.log2(measurements['F1']), math.log2(measurements['F2'])] [slope, intercept] = [0, 0] except: # Lack of formants for these settings ignored_candidates.append(candidate) continue for leave_out in range(1, 1 + min(3, candidate)): new_measurements = {} new_measurements['Ax'] = measurements['A' + str(leave_out)] candidate_name = str(candidate) + 'x' + str(leave_out) if leave_out < len(As) and As[leave_out - 1] < intercept + slope * Fs[leave_out - 1]: this_is_droppable = True else: this_is_droppable = False if this_is_droppable: for parameter in measurements.keys(): if int(parameter[-1]) < leave_out: new_measurements[parameter] = measurements[parameter] elif int(parameter[-1]) > leave_out: new_measurements[parameter[0] + str(int(parameter[-1]) - 1)] = measurements[ parameter] new_data[candidate_name] = new_measurements data[candidate]['Ax'] = data[candidate]['A4'] data = {k: v for k,v in data.items() if k not in ignored_candidates} output[s] = {**data, **new_data} else: for s, data in output.items(): for candidate, measurements in data.items(): output[s][candidate]['Ax'] = output[s][candidate]['A4'] output = {k: v for k,v in output.items() if v} for s, data in output.items(): for candidate, measurements in data.items(): try: output[s][candidate]['A1A2diff'] = data[candidate]['A1'] - data[candidate]['A2'] try: output[s][candidate]['A2A3diff'] = data[candidate]['A2'] - data[candidate]['A3'] except: try: output[s][candidate]['A2A3diff'] = data[candidate]['A2'] except: output[s][candidate]['A2A3diff'] = 0 except: try: output[s][candidate]['A1A2diff'] = data[candidate]['A1'] except: output[s][candidate]['A1A2diff'] = 0 output[s][candidate]['A2A3diff'] = 0 selected_tracks = {} for s, data in output.items(): try: selected_tracks[s] = data[default_formant] except: print(s) print(data) raise if not use_vowel_prototypes: print('no prototypes, using get_mean_SD()') prev_prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters) elif not vowel in vowel_prototype_metadata: print('no prototype for', vowel, 'so using get_mean_SD()') prev_prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters) else: prev_prototype_metadata = vowel_prototype_metadata if num_iterations > 1 and len(seg) < 6: print("Skipping iterations for vowel {}, at least 6 tokens are needed, only found {}.".format(vowel, len(seg))) my_iterations = [0] else: my_iterations = range(num_iterations) for _ in my_iterations: best_numbers = [] selected_tracks = {} prototype_means = prev_prototype_metadata[vowel][0] # Get Mahalanobis distance between every new observation and the sample/means covariance = np.array(prev_prototype_metadata[vowel][1]) inverse_covariance = np.linalg.pinv(covariance) best_number = 5 for s, data in output.items(): best_distance = math.inf best_track = 0 for number, point in data.items(): point = [point[x] if point[x] else 0 for x in prototype_parameters] distance = get_mahalanobis(prototype_means, point, inverse_covariance) if distance < best_distance: # Update "best" measures when new best distance is found best_distance = distance best_track = point best_number = number # selected_tracks[s] = {k: best_track[i] for i, k in enumerate(columns)} selected_tracks[s] = {k: best_track[i] for i, k in enumerate(prototype_parameters)} # best_data[s] = {k: best_track[i] for i, k in enumerate(output_columns)} # best_data[s] = {k: best_track[i] for i, k in enumerate(columns)} best_data[s] = {} for output_column in output_columns: best_data[s][output_column] = output[s][best_number][output_column] best_data[s]['num_formants'] = float(str(best_number).split('x')[0]) best_data[s]['Fx'] = int(str(best_number)[0]) if 'x' in str(best_number): best_data[s]['drop_formant'] = int(str(best_number).split('x')[-1]) else: best_data[s]['drop_formant'] = 0 best_numbers.append(best_number) if len(seg) >= 6: prototype_metadata = get_mean_SD(selected_tracks, prototype_parameters) prev_prototype_metadata = prototype_metadata best_prototype_metadata.update(prototype_metadata) if _ > 0: changed_numbers = 0 for i, bn in enumerate(best_numbers): if bn != last_iteration_best_numbers[i]: changed_numbers += 1 if changed_numbers == 0: break last_iteration_best_numbers = best_numbers log_output.append([speaker, vowel, str(len(output)), str(_ + 1)]) for i in log_output: print('Speaker {} for vowel {} had {} tokens and completed refinement in {} iterations'.format(*i)) if output_tracks: extract_and_save_formant_tracks(corpus_context, best_data, num_formants=True, multiprocessing=multiprocessing, stop_check=stop_check) else: save_formant_point_data(corpus_context, best_data, num_formants=True) return best_prototype_metadata