Source code for polyglotdb.io.parsers.buckeye

import os
import re

from polyglotdb.exceptions import BuckeyeParseError
from .base import BaseParser, DiscourseData
from .speaker import FilenameSpeakerParser
from ..helper import find_wav_path

FILLERS = {'uh', 'um', 'okay', 'yes', 'yeah', 'oh', 'heh', 'yknow', 'um-huh',
               'uh-uh', 'uh-huh', 'uh-hum', 'mm-hmm'}


def contained_by(word, phone):
    """
    Check whether a word contains a phone based on time points

    Parameters
    ----------
    word : dict
        Word information
    phone : dict
        Phone information

    Returns
    -------
    bool
        True if phone midpoint is within the bounds of the word
    """
    phone_midpoint = phone[1] + (phone[2] - phone[1]) / 2
    word_midpoint = word['begin'] + (word['end'] - word['begin']) / 2
    if (phone_midpoint > word['begin'] and phone_midpoint < word['end']) or (
            word_midpoint > phone[1] and word_midpoint < phone[2]):
        return True
    return False


[docs] class BuckeyeParser(BaseParser): """ Parser for the Buckeye corpus. Has annotation types for word labels, word transcription, word part of speech, and surface transcription labels. Parameters ---------- annotation_tiers: list Annotation types of the files to parse hierarchy : :class:`~polyglotdb.structure.Hierarchy` Details of how linguistic types relate to one another stop_check : callable, optional Function to check whether to halt parsing call_back : callable, optional Function to output progress messages """ _extensions = ['.words'] def __init__(self, annotation_tiers, hierarchy, stop_check=None, call_back=None): super(BuckeyeParser, self).__init__(annotation_tiers, hierarchy, make_transcription=False, make_label=False, stop_check=stop_check, call_back=call_back) self.speaker_parser = FilenameSpeakerParser(3) def parse_discourse(self, word_path, types_only=False): """ Parse a Buckeye file for later importing. Parameters ---------- word_path : str Path to Buckeye .words file types_only : bool Flag for whether to only save type information, ignoring the token information Returns ------- :class:`~polyglotdb.io.discoursedata.DiscourseData` Parsed data """ self.make_transcription = False phone_ext = '' name, ext = os.path.splitext(os.path.split(word_path)[1]) if ext == '.words': phone_ext = '.phones' elif ext == '.WORDS': phone_ext = '.PHONES' phone_path = os.path.splitext(word_path)[0] + phone_ext if self.speaker_parser is not None: speaker = self.speaker_parser.parse_path(word_path) else: speaker = None for a in self.annotation_tiers: a.reset() a.speaker = speaker try: words = read_words(word_path) except Exception as e: print(e) return phones = read_phones(phone_path) if self.call_back is not None: cur = 0 self.call_back("Parsing %s..." % name) self.call_back(0, len(words)) for i, w in enumerate(words): if self.stop_check is not None and self.stop_check(): return if self.call_back is not None: cur += 1 if cur % 20 == 0: self.call_back(cur) annotations = {} word = w['spelling'] if word[0] == '{': continue beg = w['begin'] end = w['end'] found = [] while len(phones): if contained_by(w, phones[0]): cur_phone = phones.pop(0) found.append(cur_phone) elif phones[0][0][0] == '{' or phones[0][1] < beg: phones.pop(0) else: break if not found: ba = ('?', w['begin'], w['end']) found.append(ba) else: beg = found[0][1] if end != found[-1][2]: end = found[-1][2] if i != len(words) - 1: words[i + 1]['begin'] = end self.annotation_tiers[0].add([(word, beg, end)]) if w['transcription'] is None: w['transcription'] = '?' if w['surface_transcription'] is None: w['surface_transcription'] = '?' self.annotation_tiers[1].add([(w['transcription'], beg, end)]) self.annotation_tiers[2].add([(w['surface_transcription'], beg, end)]) self.annotation_tiers[3].add([(w['category'], beg, end)]) self.annotation_tiers[4].add(found) pg_annotations = self._parse_annotations(types_only) data = DiscourseData(name, pg_annotations, self.hierarchy) for a in self.annotation_tiers: a.reset() data.wav_path = find_wav_path(word_path) return data
def read_phones(path): """ From a buckeye file, reads the phone lines, appends label, begin, and end to output Parameters ---------- path : str path to file Returns ------- output : list of tuples each tuple is label, begin, end for a phone """ output = [] with open(path, 'r') as file_handle: header_pattern = re.compile("#\r{0,1}\n") line_pattern = re.compile("\s+\d{3}\s+") label_pattern = re.compile(" {0,1};| {0,1}\+") f = header_pattern.split(file_handle.read())[1] flist = f.splitlines() begin = 0.0 for l in flist: line = line_pattern.split(l.strip()) try: end = float(line[0]) except ValueError: # Missing phone label print('Warning: no label found in line: \'{}\''.format(l)) continue label = label_pattern.split(line[1])[0] output.append((label, begin, end)) begin = end return output def read_words(path): """ From a buckeye file, reads the word info Parameters ---------- path : str path to file Returns ------- output : list of dicts each dict has spelling, begin, end, transcription, surface_transcription, category """ output = [] misparsed_lines = [] with open(path, 'r') as file_handle: f = re.split(r"#\r{0,1}\n", file_handle.read())[1] line_pattern = re.compile("; | \d{3} ") begin = 0.0 flist = f.splitlines() for l in flist: line = line_pattern.split(l.strip()) try: end = float(line[0]) word = line[1].replace(' ', '_') if word[0] != "<" and word[0] != "{": citation = line[2] phonetic = line[3] if len(line) > 4: category = line[4] if word in FILLERS: category = 'UH' else: category = None else: citation = None phonetic = None category = None except IndexError: misparsed_lines.append(l) continue line = {'spelling': word, 'begin': begin, 'end': end, 'transcription': citation, 'surface_transcription': phonetic, 'category': category} output.append(line) begin = end if misparsed_lines: raise (BuckeyeParseError(path, misparsed_lines)) return output