import os
from .textgrid import TextgridParser
from ..types.parsing import OrthographyTier
from polyglotdb.exceptions import TextGridError
from ..helper import get_n_channels
from polyglotdb.io.helper import find_wav_path
from polyglotdb.io.parsers.base import DiscourseData
from polyglotdb.io.parsers.speaker import DirectorySpeakerParser
[docs]
class AlignerParser(TextgridParser):
"""
Base class for parsing TextGrid output from forced aligners.
Parameters
----------
annotation_tiers : list
List of the annotation tiers to store data from the TextGrid
hierarchy : Hierarchy
Basic hierarchy of the TextGrid
make_transcription : bool
Flag for whether to add a transcription property to words based on phones they contain
stop_check : callable
Function to check for whether parsing should stop
call_back : callable
Function to report progress in parsing
Attributes
----------
word_label : str
Label identifying word tiers
phone_label : str
Label identifying phone tiers
name : str
Name of the aligner the TextGrids are from
speaker_first : bool
Whether speaker names precede tier types in the TextGrid when multiple speakers are present
"""
word_label = 'word'
phone_label = 'phone'
name = 'aligner'
speaker_first = True
def __init__(self, annotation_tiers, hierarchy, make_transcription=True,
stop_check=None, call_back=None):
super(AlignerParser, self).__init__(annotation_tiers, hierarchy, make_transcription,
False, stop_check, call_back)
self.speaker_parser = DirectorySpeakerParser()
def _is_valid(self, tg):
found_word = False
found_phone = False
invalid = True
multiple_speakers = False
for i, tier_name in enumerate(tg.tierNameList):
if ' - ' in tier_name:
multiple_speakers = True
break
if multiple_speakers:
if self.speaker_first:
speakers = {tier_name.split(' - ')[0].strip().replace('/', '_').replace('\\', '_') for tier_name in tg.tierNameList if
' - ' in tier_name}
else:
speakers = {tier_name.split(' - ')[1].strip().replace('/', '_').replace('\\', '_') for tier_name in tg.tierNameList if
' - ' in tier_name}
found_words = {x: False for x in speakers}
found_phones = {x: False for x in speakers}
for i, tier_name in enumerate(tg.tierNameList):
if ' - ' not in tier_name:
continue
if self.speaker_first:
speaker, name = tier_name.split(' - ')
else:
name, speaker = tier_name.split(' - ')
speaker = speaker.strip().replace('/', '_').replace('\\', '_')
name = name.strip()
if name.lower().startswith(self.word_label):
found_words[speaker] = True
elif name.lower().startswith(self.phone_label):
found_phones[speaker] = True
found_word = all(found_words.values())
found_phone = all(found_words.values())
else:
for i, tier_name in enumerate(tg.tierNameList):
if tier_name.lower().startswith(self.word_label):
found_word = True
elif tier_name.lower().startswith(self.phone_label):
found_phone = True
return multiple_speakers, found_word and found_phone
[docs]
def parse_discourse(self, path, types_only=False):
"""
Parse a forced aligned TextGrid file for later importing.
Parameters
----------
path : str
Path to TextGrid file
types_only : bool
Flag for whether to only save type information, ignoring the token information
Returns
-------
:class:`~polyglotdb.io.discoursedata.DiscourseData`
Parsed data from the file
"""
tg = self.load_textgrid(path)
multiple_speakers, is_valid = self._is_valid(tg)
if not is_valid:
raise (TextGridError('This file ({}) cannot be parsed by the {} parser.'.format(path, self.name)))
name = os.path.splitext(os.path.split(path)[1])[0]
# Format 1
if not multiple_speakers:
if self.speaker_parser is not None:
speaker = self.speaker_parser.parse_path(path)
else:
speaker = None
for a in self.annotation_tiers:
a.reset()
a.speaker = speaker
# Parse the tiers
for i, tier_name in enumerate(tg.tierNameList):
ti = tg.tierDict[tier_name]
if tier_name.lower().startswith(self.word_label):
self.annotation_tiers[0].add(( (text.strip(), begin, end) for (begin, end, text) in ti.entryList))
elif tier_name.lower().startswith(self.phone_label):
self.annotation_tiers[1].add(( (text.strip(), begin, end) for (begin, end, text) in ti.entryList))
pg_annotations = self._parse_annotations(types_only)
data = DiscourseData(name, pg_annotations, self.hierarchy)
for a in self.annotation_tiers:
a.reset()
# Format 2
else:
dummy = self.annotation_tiers
self.annotation_tiers = []
wav_path = find_wav_path(path)
speaker_channel_mapping = {}
if wav_path is not None:
n_channels = get_n_channels(wav_path)
if n_channels > 1:
# Figure speaker-channel mapping
n_tiers = 0
for i, tier_name in enumerate(tg.tierNameList):
try:
speaker, type = tier_name.split(' - ')
except ValueError:
continue
n_tiers += 1
ind = 0
cutoffs = [x / n_channels for x in range(1, n_channels)]
for i, tier_name in enumerate(tg.tierNameList):
try:
if self.speaker_first:
speaker, type = tier_name.split(' - ')
else:
type, speaker = tier_name.split(' - ')
speaker = speaker.strip().replace('/', '_').replace('\\', '_')
except ValueError:
continue
if speaker in speaker_channel_mapping:
continue
for i, c in enumerate(cutoffs):
if ind / n_channels < c:
speaker_channel_mapping[speaker] = i
break
else:
speaker_channel_mapping[speaker] = i + 1
ind += 1
# Parse the tiers
for i, tier_name in enumerate(tg.tierNameList):
ti = tg.tierDict[tier_name]
try:
if self.speaker_first:
speaker, type = tier_name.split(' - ')
else:
type, speaker = tier_name.split(' - ')
speaker = speaker.strip().replace('/', '_').replace('\\', '_')
except ValueError:
continue
if type.lower().startswith(self.word_label):
type = 'word'
elif type.lower().startswith(self.phone_label):
type = 'phone'
if len(ti.entryList) == 1 and ti.entryList[0][2].strip() == '':
continue
at = OrthographyTier(type, type)
at.speaker = speaker
at.add(( (text.strip(), begin, end) for (begin, end, text) in ti.entryList))
self.annotation_tiers.append(at)
pg_annotations = self._parse_annotations(types_only)
data = DiscourseData(name, pg_annotations, self.hierarchy)
data.speaker_channel_mapping = speaker_channel_mapping
self.annotation_tiers = dummy
data.wav_path = find_wav_path(path)
return data