import os
from polyglotdb.exceptions import TextGridError
from polyglotdb.io.helper import find_wav_path, get_n_channels
from polyglotdb.io.parsers.base import DiscourseData
from polyglotdb.io.parsers.speaker import DirectorySpeakerParser
from polyglotdb.io.parsers.textgrid import TextgridParser
from polyglotdb.io.types.parsing import OrthographyTier
[docs]
class AlignerParser(TextgridParser):
"""
Base class for parsing TextGrid output from forced aligners.
Parameters
----------
annotation_tiers : list
List of the annotation tiers to store data from the TextGrid
hierarchy : Hierarchy
Basic hierarchy of the TextGrid
make_transcription : bool
Flag for whether to add a transcription property to words based on phones they contain
stop_check : callable
Function to check for whether parsing should stop
call_back : callable
Function to report progress in parsing
Attributes
----------
word_label : str
Label identifying word tiers
phone_label : str
Label identifying phone tiers
name : str
Name of the aligner the TextGrids are from
speaker_first : bool
Whether speaker names precede tier types in the TextGrid when multiple speakers are present
"""
word_label = "word"
phone_label = "phone"
name = "aligner"
speaker_first = True
def __init__(
self,
annotation_tiers,
hierarchy,
make_transcription=True,
stop_check=None,
call_back=None,
):
super(AlignerParser, self).__init__(
annotation_tiers,
hierarchy,
make_transcription,
False,
stop_check,
call_back,
)
self.speaker_parser = DirectorySpeakerParser()
def _is_valid(self, tg):
found_word = False
found_phone = False
multiple_speakers = False
for i, tier_name in enumerate(tg.tierNames):
if " - " in tier_name:
multiple_speakers = True
break
if multiple_speakers:
if self.speaker_first:
speakers = {
tier_name.split(" - ")[0].strip().replace("/", "_").replace("\\", "_")
for tier_name in tg.tierNames
if " - " in tier_name
}
else:
speakers = {
tier_name.split(" - ")[1].strip().replace("/", "_").replace("\\", "_")
for tier_name in tg.tierNames
if " - " in tier_name
}
found_words = {x: False for x in speakers}
found_phones = {x: False for x in speakers}
for i, tier_name in enumerate(tg.tierNames):
if " - " not in tier_name:
continue
if self.speaker_first:
speaker, name = tier_name.split(" - ")
else:
name, speaker = tier_name.split(" - ")
speaker = speaker.strip().replace("/", "_").replace("\\", "_")
name = name.strip()
if name.lower().startswith(self.word_label):
found_words[speaker] = True
elif name.lower().startswith(self.phone_label):
found_phones[speaker] = True
found_word = all(found_words.values())
found_phone = all(found_words.values())
else:
for i, tier_name in enumerate(tg.tierNames):
if tier_name.lower().startswith(self.word_label):
found_word = True
elif tier_name.lower().startswith(self.phone_label):
found_phone = True
return multiple_speakers, found_word and found_phone
[docs]
def parse_discourse(self, path, types_only=False):
"""
Parse a forced aligned TextGrid file for later importing.
Parameters
----------
path : str
Path to TextGrid file
types_only : bool
Flag for whether to only save type information, ignoring the token information
Returns
-------
:class:`~polyglotdb.io.discoursedata.DiscourseData`
Parsed data from the file
"""
tg = self.load_textgrid(path)
multiple_speakers, is_valid = self._is_valid(tg)
if not is_valid:
raise (
TextGridError(
"This file ({}) cannot be parsed by the {} parser.".format(path, self.name)
)
)
name = os.path.splitext(os.path.split(path)[1])[0]
# Format 1
if not multiple_speakers:
if self.speaker_parser is not None:
speaker = self.speaker_parser.parse_path(path)
else:
speaker = None
for a in self.annotation_tiers:
a.reset()
a.speaker = speaker
# Parse the tiers
for i, tier_name in enumerate(tg.tierNames):
ti = tg.getTier(tier_name)
if tier_name.lower().startswith(self.word_label):
self.annotation_tiers[0].add(
((text.strip(), begin, end) for (begin, end, text) in ti.entries)
)
elif tier_name.lower().startswith(self.phone_label):
self.annotation_tiers[1].add(
((text.strip(), begin, end) for (begin, end, text) in ti.entries)
)
pg_annotations = self._parse_annotations(types_only)
data = DiscourseData(name, pg_annotations, self.hierarchy)
for a in self.annotation_tiers:
a.reset()
# Format 2
else:
dummy = self.annotation_tiers
self.annotation_tiers = []
wav_path = find_wav_path(path)
speaker_channel_mapping = {}
if wav_path is not None:
n_channels = get_n_channels(wav_path)
if n_channels > 1:
# Figure speaker-channel mapping
n_tiers = 0
for i, tier_name in enumerate(tg.tierNames):
try:
speaker, type = tier_name.split(" - ")
except ValueError:
continue
n_tiers += 1
ind = 0
cutoffs = [x / n_channels for x in range(1, n_channels)]
for i, tier_name in enumerate(tg.tierNames):
try:
if self.speaker_first:
speaker, type = tier_name.split(" - ")
else:
type, speaker = tier_name.split(" - ")
speaker = speaker.strip().replace("/", "_").replace("\\", "_")
except ValueError:
continue
if speaker in speaker_channel_mapping:
continue
for i, c in enumerate(cutoffs):
if ind / n_channels < c:
speaker_channel_mapping[speaker] = i
break
else:
speaker_channel_mapping[speaker] = i + 1
ind += 1
# Parse the tiers
for i, tier_name in enumerate(tg.tierNames):
ti = tg.getTier(tier_name)
try:
if self.speaker_first:
speaker, type = tier_name.split(" - ")
else:
type, speaker = tier_name.split(" - ")
speaker = speaker.strip().replace("/", "_").replace("\\", "_")
except ValueError:
continue
if type.lower().startswith(self.word_label):
type = "word"
elif type.lower().startswith(self.phone_label):
type = "phone"
if len(ti.entries) == 1 and ti.entries[0][2].strip() == "":
continue
at = OrthographyTier(type, type)
at.speaker = speaker
at.add(((text.strip(), begin, end) for (begin, end, text) in ti.entries))
self.annotation_tiers.append(at)
pg_annotations = self._parse_annotations(types_only)
data = DiscourseData(name, pg_annotations, self.hierarchy)
data.speaker_channel_mapping = speaker_channel_mapping
self.annotation_tiers = dummy
data.wav_path = find_wav_path(path)
return data