import os
import re
from polyglotdb.exceptions import BuckeyeParseError
from polyglotdb.io.helper import find_wav_path
from polyglotdb.io.parsers.base import BaseParser, DiscourseData
from polyglotdb.io.parsers.speaker import FilenameSpeakerParser
FILLERS = {
"uh",
"um",
"okay",
"yes",
"yeah",
"oh",
"heh",
"yknow",
"um-huh",
"uh-uh",
"uh-huh",
"uh-hum",
"mm-hmm",
}
def contained_by(word, phone):
"""
Check whether a word contains a phone based on time points
Parameters
----------
word : dict
Word information
phone : dict
Phone information
Returns
-------
bool
True if phone midpoint is within the bounds of the word
"""
phone_midpoint = phone[1] + (phone[2] - phone[1]) / 2
word_midpoint = word["begin"] + (word["end"] - word["begin"]) / 2
if (phone_midpoint > word["begin"] and phone_midpoint < word["end"]) or (
word_midpoint > phone[1] and word_midpoint < phone[2]
):
return True
return False
[docs]
class BuckeyeParser(BaseParser):
"""
Parser for the Buckeye corpus.
Has annotation types for word labels, word transcription, word part of
speech, and surface transcription labels.
Parameters
----------
annotation_tiers: list
Annotation types of the files to parse
hierarchy : :class:`~polyglotdb.structure.Hierarchy`
Details of how linguistic types relate to one another
stop_check : callable, optional
Function to check whether to halt parsing
call_back : callable, optional
Function to output progress messages
"""
_extensions = [".words"]
def __init__(self, annotation_tiers, hierarchy, stop_check=None, call_back=None):
super(BuckeyeParser, self).__init__(
annotation_tiers,
hierarchy,
make_transcription=False,
make_label=False,
stop_check=stop_check,
call_back=call_back,
)
self.speaker_parser = FilenameSpeakerParser(3)
def parse_discourse(self, word_path, types_only=False):
"""
Parse a Buckeye file for later importing.
Parameters
----------
word_path : str
Path to Buckeye .words file
types_only : bool
Flag for whether to only save type information, ignoring the token information
Returns
-------
:class:`~polyglotdb.io.discoursedata.DiscourseData`
Parsed data
"""
self.make_transcription = False
phone_ext = ""
name, ext = os.path.splitext(os.path.split(word_path)[1])
if ext == ".words":
phone_ext = ".phones"
elif ext == ".WORDS":
phone_ext = ".PHONES"
phone_path = os.path.splitext(word_path)[0] + phone_ext
if self.speaker_parser is not None:
speaker = self.speaker_parser.parse_path(word_path)
else:
speaker = None
for a in self.annotation_tiers:
a.reset()
a.speaker = speaker
try:
words = read_words(word_path)
except Exception as e:
print(e)
return
phones = read_phones(phone_path)
if self.call_back is not None:
cur = 0
self.call_back("Parsing %s..." % name)
self.call_back(0, len(words))
for i, w in enumerate(words):
if self.stop_check is not None and self.stop_check():
return
if self.call_back is not None:
cur += 1
if cur % 20 == 0:
self.call_back(cur)
word = w["spelling"]
if word[0] == "{":
continue
beg = w["begin"]
end = w["end"]
found = []
while len(phones):
if contained_by(w, phones[0]):
cur_phone = phones.pop(0)
found.append(cur_phone)
elif phones[0][0][0] == "{" or phones[0][1] < beg:
phones.pop(0)
else:
break
if not found:
ba = ("?", w["begin"], w["end"])
found.append(ba)
else:
beg = found[0][1]
if end != found[-1][2]:
end = found[-1][2]
if i != len(words) - 1:
words[i + 1]["begin"] = end
self.annotation_tiers[0].add([(word, beg, end)])
if w["transcription"] is None:
w["transcription"] = "?"
if w["surface_transcription"] is None:
w["surface_transcription"] = "?"
self.annotation_tiers[1].add([(w["transcription"], beg, end)])
self.annotation_tiers[2].add([(w["surface_transcription"], beg, end)])
self.annotation_tiers[3].add([(w["category"], beg, end)])
self.annotation_tiers[4].add(found)
pg_annotations = self._parse_annotations(types_only)
data = DiscourseData(name, pg_annotations, self.hierarchy)
for a in self.annotation_tiers:
a.reset()
data.wav_path = find_wav_path(word_path)
return data
def read_phones(path):
"""
From a buckeye file, reads the phone lines, appends label, begin, and end to output
Parameters
----------
path : str
path to file
Returns
-------
output : list of tuples
each tuple is label, begin, end for a phone
"""
output = []
with open(path, "r") as file_handle:
header_pattern = re.compile(r"#\r{0,1}\n")
line_pattern = re.compile(r"\s+\d{3}\s+")
label_pattern = re.compile(r" {0,1};| {0,1}\+")
f = header_pattern.split(file_handle.read())[1]
flist = f.splitlines()
begin = 0.0
for line in flist:
line = line_pattern.split(line.strip())
try:
end = float(line[0])
except ValueError: # Missing phone label
print(f"Warning: no label found in line: '{line}'")
continue
label = label_pattern.split(line[1])[0]
output.append((label, begin, end))
begin = end
return output
def read_words(path):
"""
From a buckeye file, reads the word info
Parameters
----------
path : str
path to file
Returns
-------
output : list of dicts
each dict has spelling, begin, end, transcription, surface_transcription, category
"""
output = []
misparsed_lines = []
with open(path, "r") as file_handle:
f = re.split(r"#\r{0,1}\n", file_handle.read())[1]
line_pattern = re.compile(r"; | \d{3} ")
begin = 0.0
flist = f.splitlines()
for line in flist:
line = line_pattern.split(line.strip())
try:
end = float(line[0])
word = line[1].replace(" ", "_")
if word[0] != "<" and word[0] != "{":
citation = line[2]
phonetic = line[3]
if len(line) > 4:
category = line[4]
if word in FILLERS:
category = "UH"
else:
category = None
else:
citation = None
phonetic = None
category = None
except IndexError:
misparsed_lines.append(line)
continue
line = {
"spelling": word,
"begin": begin,
"end": end,
"transcription": citation,
"surface_transcription": phonetic,
"category": category,
}
output.append(line)
begin = end
if misparsed_lines:
raise (BuckeyeParseError(path, misparsed_lines))
return output