Source code for polyglotdb.io.parsers.buckeye

import os
import re

from polyglotdb.exceptions import BuckeyeParseError
from polyglotdb.io.helper import find_wav_path
from polyglotdb.io.parsers.base import BaseParser, DiscourseData
from polyglotdb.io.parsers.speaker import FilenameSpeakerParser

FILLERS = {
    "uh",
    "um",
    "okay",
    "yes",
    "yeah",
    "oh",
    "heh",
    "yknow",
    "um-huh",
    "uh-uh",
    "uh-huh",
    "uh-hum",
    "mm-hmm",
}


def contained_by(word, phone):
    """
    Check whether a word contains a phone based on time points

    Parameters
    ----------
    word : dict
        Word information
    phone : dict
        Phone information

    Returns
    -------
    bool
        True if phone midpoint is within the bounds of the word
    """
    phone_midpoint = phone[1] + (phone[2] - phone[1]) / 2
    word_midpoint = word["begin"] + (word["end"] - word["begin"]) / 2
    if (phone_midpoint > word["begin"] and phone_midpoint < word["end"]) or (
        word_midpoint > phone[1] and word_midpoint < phone[2]
    ):
        return True
    return False


[docs] class BuckeyeParser(BaseParser): """ Parser for the Buckeye corpus. Has annotation types for word labels, word transcription, word part of speech, and surface transcription labels. Parameters ---------- annotation_tiers: list Annotation types of the files to parse hierarchy : :class:`~polyglotdb.structure.Hierarchy` Details of how linguistic types relate to one another stop_check : callable, optional Function to check whether to halt parsing call_back : callable, optional Function to output progress messages """ _extensions = [".words"] def __init__(self, annotation_tiers, hierarchy, stop_check=None, call_back=None): super(BuckeyeParser, self).__init__( annotation_tiers, hierarchy, make_transcription=False, make_label=False, stop_check=stop_check, call_back=call_back, ) self.speaker_parser = FilenameSpeakerParser(3) def parse_discourse(self, word_path, types_only=False): """ Parse a Buckeye file for later importing. Parameters ---------- word_path : str Path to Buckeye .words file types_only : bool Flag for whether to only save type information, ignoring the token information Returns ------- :class:`~polyglotdb.io.discoursedata.DiscourseData` Parsed data """ self.make_transcription = False phone_ext = "" name, ext = os.path.splitext(os.path.split(word_path)[1]) if ext == ".words": phone_ext = ".phones" elif ext == ".WORDS": phone_ext = ".PHONES" phone_path = os.path.splitext(word_path)[0] + phone_ext if self.speaker_parser is not None: speaker = self.speaker_parser.parse_path(word_path) else: speaker = None for a in self.annotation_tiers: a.reset() a.speaker = speaker try: words = read_words(word_path) except Exception as e: print(e) return phones = read_phones(phone_path) if self.call_back is not None: cur = 0 self.call_back("Parsing %s..." % name) self.call_back(0, len(words)) for i, w in enumerate(words): if self.stop_check is not None and self.stop_check(): return if self.call_back is not None: cur += 1 if cur % 20 == 0: self.call_back(cur) word = w["spelling"] if word[0] == "{": continue beg = w["begin"] end = w["end"] found = [] while len(phones): if contained_by(w, phones[0]): cur_phone = phones.pop(0) found.append(cur_phone) elif phones[0][0][0] == "{" or phones[0][1] < beg: phones.pop(0) else: break if not found: ba = ("?", w["begin"], w["end"]) found.append(ba) else: beg = found[0][1] if end != found[-1][2]: end = found[-1][2] if i != len(words) - 1: words[i + 1]["begin"] = end self.annotation_tiers[0].add([(word, beg, end)]) if w["transcription"] is None: w["transcription"] = "?" if w["surface_transcription"] is None: w["surface_transcription"] = "?" self.annotation_tiers[1].add([(w["transcription"], beg, end)]) self.annotation_tiers[2].add([(w["surface_transcription"], beg, end)]) self.annotation_tiers[3].add([(w["category"], beg, end)]) self.annotation_tiers[4].add(found) pg_annotations = self._parse_annotations(types_only) data = DiscourseData(name, pg_annotations, self.hierarchy) for a in self.annotation_tiers: a.reset() data.wav_path = find_wav_path(word_path) return data
def read_phones(path): """ From a buckeye file, reads the phone lines, appends label, begin, and end to output Parameters ---------- path : str path to file Returns ------- output : list of tuples each tuple is label, begin, end for a phone """ output = [] with open(path, "r") as file_handle: header_pattern = re.compile(r"#\r{0,1}\n") line_pattern = re.compile(r"\s+\d{3}\s+") label_pattern = re.compile(r" {0,1};| {0,1}\+") f = header_pattern.split(file_handle.read())[1] flist = f.splitlines() begin = 0.0 for line in flist: line = line_pattern.split(line.strip()) try: end = float(line[0]) except ValueError: # Missing phone label print(f"Warning: no label found in line: '{line}'") continue label = label_pattern.split(line[1])[0] output.append((label, begin, end)) begin = end return output def read_words(path): """ From a buckeye file, reads the word info Parameters ---------- path : str path to file Returns ------- output : list of dicts each dict has spelling, begin, end, transcription, surface_transcription, category """ output = [] misparsed_lines = [] with open(path, "r") as file_handle: f = re.split(r"#\r{0,1}\n", file_handle.read())[1] line_pattern = re.compile(r"; | \d{3} ") begin = 0.0 flist = f.splitlines() for line in flist: line = line_pattern.split(line.strip()) try: end = float(line[0]) word = line[1].replace(" ", "_") if word[0] != "<" and word[0] != "{": citation = line[2] phonetic = line[3] if len(line) > 4: category = line[4] if word in FILLERS: category = "UH" else: category = None else: citation = None phonetic = None category = None except IndexError: misparsed_lines.append(line) continue line = { "spelling": word, "begin": begin, "end": end, "transcription": citation, "surface_transcription": phonetic, "category": category, } output.append(line) begin = end if misparsed_lines: raise (BuckeyeParseError(path, misparsed_lines)) return output