Source code for polyglotdb.corpus.utterance

from uuid import uuid1

from ..query.annotations import SplitQuery
from ..query.base.func import Max, Min
from ..exceptions import GraphQueryError
from ..io.importer import utterance_data_to_csvs, import_utterance_csv, create_utterance_csvs, \
    utterance_enriched_data_to_csvs, import_utterance_enrichment_csvs
from .pause import PauseContext


[docs] class UtteranceContext(PauseContext): """ Class that contains methods for dealing specifically with utterances """ def reset_utterances(self): """ Remove all utterance annotations. """ try: q = SplitQuery(self, self.utterance) q.delete() self.hierarchy.remove_annotation_type('utterance') self.encode_hierarchy() except GraphQueryError: pass @property def has_utterances(self): return 'utterance' in self.hierarchy.annotation_types def encode_utterances(self, min_pause_length=0.5, min_utterance_length=0, call_back=None, stop_check=None): """ Encode utterance annotations based on minimum pause length and minimum utterance length. See `get_pauses` for more information about the algorithm. Once this function is run, utterances will be queryable like other annotation types. Parameters ---------- min_pause_length : float, defaults to 0.5 Time in seconds that is the minimum duration of a pause to count as an utterance boundary min_utterance_length : float, defaults to 0.0 Time in seconds that is the minimum duration of a stretch of speech to count as an utterance """ self.reset_utterances() self.hierarchy.add_annotation_type('utterance', above=self.word_name, below=None) self.encode_hierarchy() discourses = self.discourses if call_back is not None: call_back(0, len(discourses)) create_utterance_csvs(self) for i, d in enumerate(discourses): if stop_check is not None and stop_check(): return if call_back is not None: call_back(i) call_back('Parsing utterances for discourse {} of {} ({})...'.format(i, len(discourses), d)) utt_data = self.get_utterance_ids(d, min_pause_length, min_utterance_length) speaker_data = {} for s, utterances in utt_data.items(): speaker_data = [] prev_id = None for u in utterances: cur_id = uuid1() row = {'id': cur_id, 'prev_id': prev_id, 'begin_word_id': u[0], 'end_word_id': u[1]} speaker_data.append(row) prev_id = cur_id utterance_data_to_csvs(self, s, d, speaker_data) import_utterance_csv(self, call_back, stop_check) for m in self.hierarchy.acoustics: self.reassess_utterances(m) if m == 'pitch': self.hierarchy.add_token_properties(self, 'utterance', [('pitch_last_edited', int)]) self.encode_hierarchy() if stop_check is not None and stop_check(): return if call_back is not None: call_back(i + 1) call_back('Finished!') def get_utterance_ids(self, discourse, min_pause_length=0.5, min_utterance_length=0): """ Algorithm to find utterance boundaries in a discourse. Pauses with duration less than the minimum will not count as utterance boundaries. Utterances that are shorter than the minimum utterance length (such as 'okay' surrounded by silence) will be merged with the closest utterance. Parameters ---------- discourse : str String identifier for a discourse min_pause_length : float, defaults to 0.5 Time in seconds that is the minimum duration of a pause to count as an utterance boundary min_utterance_length : float, defaults to 0.0 Time in seconds that is the minimum duration of a stretch of speech to count as an utterance """ speakers = self.get_speakers_in_discourse(discourse) word_type = self.word_name speaker_utts = {} for s in speakers: utterances = [] statement = '''MATCH p = (prev_node_word:{word_type}:speech:{corpus})-[:precedes_pause*1..]->(foll_node_word:{word_type}:speech:{corpus}), (prev_node_word)-[:spoken_in]->(d:Discourse:{corpus}), (prev_node_word)-[:spoken_by]->(s:Speaker:{corpus}) WHERE d.name = $discourse AND s.name = $speaker WITH nodes(p)[1..-1] as ns,foll_node_word, prev_node_word WHERE foll_node_word.begin - prev_node_word.end >= $node_pause_duration AND NONE (x in ns where x:speech) WITH foll_node_word, prev_node_word RETURN prev_node_word.end AS begin, prev_node_word.id AS begin_id, foll_node_word.begin AS end, foll_node_word.id AS end_id, foll_node_word.begin - prev_node_word.end AS duration ORDER BY begin'''.format(corpus=self.cypher_safe_name, word_type=word_type) results = list(self.execute_cypher(statement, node_pause_duration=min_pause_length, discourse=discourse, speaker=s)) collapsed_results = [] for i, r in enumerate(results): if len(collapsed_results) == 0: collapsed_results.append(r) continue if r['begin'] == collapsed_results[-1]['end']: collapsed_results[-1]['end'] = r['end'] else: collapsed_results.append(r) statement = '''MATCH (s:Speaker:{corpus})<-[:spoken_by]-(w:{word_type}:{corpus}:speech)-[:spoken_in]->(d:Discourse:{corpus}) where d.name = $discourse AND s.name = $speaker with max(w.end) as max_end, min(w.begin) as min_begin, collect(w) as words with [x in words where x.begin = min_begin or x.end = max_end | x] as c UNWIND c as w return w.id as id, w.begin as begin, w.end as end order by w.begin '''.format(corpus=self.cypher_safe_name, word_type=word_type) end_words = list(self.execute_cypher(statement, discourse=discourse, speaker=s)) if len(end_words) == 0: speaker_utts[s] = [] continue if len(results) < 2: begin = end_words[0]['begin'] begin_id = end_words[0]['id'] if len(results) == 0: if len(end_words) == 1: ind = 0 else: ind = 1 speaker_utts[s] = [(begin_id, end_words[ind]['id'])] continue if results[0]['begin'] == 0: speaker_utts[s] = [(results[0]['end_id'], end_words[1]['id'])] continue if results[0]['end'] == end_words[1]['end']: speaker_utts[s] = [(begin_id, end_words[1]['end_id'])] continue if results[0]['begin'] != 0: current = 0 current_id = end_words[0]['id'] else: current = None current_id = None min_begin = 1000 max_begin = 0 prev = None for i, r in enumerate(collapsed_results): if current is not None: if current < min_begin: min_begin = current if r['begin'] - current > min_utterance_length: utterances.append((current_id, r['begin_id'])) elif i == len(results) - 1: utterances[-1] = (utterances[-1][0], r['begin_id']) elif len(utterances) != 0: dist_to_prev = current - prev dist_to_foll = r['end'] - r['begin'] if dist_to_prev <= dist_to_foll: utterances[-1] = (utterances[-1][0], r['begin_id']) prev = current current = r['end'] current_id = r['end_id'] if current < end_words[1]['end']: if end_words[1]['end'] - current > min_utterance_length: utterances.append((current_id, end_words[1]['id'])) else: utterances[-1] = (utterances[-1][0], end_words[1]['id']) speaker_utts[s] = utterances return speaker_utts def get_utterances(self, discourse, min_pause_length=0.5, min_utterance_length=0): """ Algorithm to find utterance boundaries in a discourse. Pauses with duration less than the minimum will not count as utterance boundaries. Utterances that are shorter than the minimum utterance length (such as 'okay' surrounded by silence) will be merged with the closest utterance. Parameters ---------- discourse : str String identifier for a discourse min_pause_length : float, defaults to 0.5 Time in seconds that is the minimum duration of a pause to count as an utterance boundary min_utterance_length : float, defaults to 0.0 Time in seconds that is the minimum duration of a stretch of speech to count as an utterance """ word_type = self.word_name statement = '''MATCH p = (prev_node_word:{word_type}:speech:{corpus})-[:precedes_pause*1..]->(foll_node_word:{word_type}:speech:{corpus}), (prev_node_word)-[:spoken_in]->(d:Discourse:{corpus}) WHERE d.name = $discourse WITH nodes(p)[1..-1] as ns,foll_node_word, prev_node_word WHERE foll_node_word.begin - prev_node_word.end >= $node_pause_duration AND NONE (x in ns where x:speech) WITH foll_node_word, prev_node_word RETURN prev_node_word.end AS begin, foll_node_word.begin AS end, foll_node_word.begin - prev_node_word.end AS duration ORDER BY begin'''.format(corpus=self.cypher_safe_name, word_type=word_type) results = list(self.execute_cypher(statement, node_pause_duration=min_pause_length, discourse=discourse)) collapsed_results = [] for i, r in enumerate(results): if len(collapsed_results) == 0: collapsed_results.append(r) continue if r['begin'] == collapsed_results[-1]['end']: collapsed_results[-1]['end'] = r['end'] else: collapsed_results.append(r) utterances = [] word = getattr(self, word_type) q = self.query_graph(word).filter(word.discourse.name == discourse) times = q.aggregate(Min(word.begin), Max(word.end)) if len(results) < 2: begin = times['min_begin'] if len(results) == 0: return [(begin, times['max_end'])] if results[0]['begin'] == 0: return [(results[0]['end'], times['max_end'])] if results[0]['end'] == times['max_end']: return [(begin, results[0]['end'])] if results[0]['begin'] != 0: current = 0 else: current = None for i, r in enumerate(collapsed_results): if current is not None: if r['begin'] - current > min_utterance_length: utterances.append((current, r['begin'])) elif i == len(results) - 1: utterances[-1] = (utterances[-1][0], r['begin']) elif len(utterances) != 0: dist_to_prev = current - utterances[-1][1] dist_to_foll = r['end'] - r['begin'] if dist_to_prev <= dist_to_foll: utterances[-1] = (utterances[-1][0], r['begin']) current = r['end'] if current < times['max_end']: if times['max_end'] - current > min_utterance_length: utterances.append((current, times['max_end'])) else: utterances[-1] = (utterances[-1][0], times['max_end']) if utterances[-1][1] > times['max_end']: utterances[-1] = (utterances[-1][0], times['max_end']) if utterances[0][0] < times['min_begin']: utterances[0] = (times['min_begin'], utterances[0][1]) return utterances def encode_utterance_position(self, call_back=None, stop_check=None): """ Encodes position_in_utterance for a word """ w_type = self.word_name if self.config.query_behavior == 'speaker': statement = '''MATCH (node_utterance:utterance:speech:{corpus_name})-[:spoken_by]->(speaker:Speaker:{corpus_name}), (node_word_in_node_utterance:{w_type}:{corpus_name})-[:contained_by]->(node_utterance) WHERE speaker.name = $split_name WITH node_utterance, node_word_in_node_utterance ORDER BY node_word_in_node_utterance.begin WITH node_utterance,collect(node_word_in_node_utterance) as nodes WITH node_utterance,nodes, range(0, size(nodes)) as pos UNWIND pos as p WITH node_utterance, p, nodes[p] as n SET n.position_in_utterance = p + 1 '''.format(w_type=w_type, corpus_name=self.cypher_safe_name) split_names = self.speakers elif self.config.query_behavior == 'discourse': statement = '''MATCH (node_utterance:utterance:speech:{corpus_name})-[:spoken_in]->(discourse:Discourse:{corpus_name}), (node_word_in_node_utterance:{w_type}:{corpus_name})-[:contained_by]->(node_utterance) WHERE discourse.name = $split_name WITH node_utterance, node_word_in_node_utterance ORDER BY node_word_in_node_utterance.begin WITH node_utterance, collect(node_word_in_node_utterance) as nodes WITH node_utterance, nodes, range(0, size(nodes)) as pos UNWIND pos as p WITH node_utterance, p, nodes[p] as n SET n.position_in_utterance = p + 1 '''.format(w_type=w_type, corpus_name=self.cypher_safe_name) split_names = self.discourses else: statement = '''MATCH (node_utterance:utterance:speech:{corpus_name}), (node_word_in_node_utterance:{w_type}:{corpus_name})-[:contained_by]->(node_utterance) WITH node_utterance, node_word_in_node_utterance ORDER BY node_word_in_node_utterance.begin WITH node_utterance, collect(node_word_in_node_utterance) as nodes WITH node_utterance, nodes, range(0, size(nodes)) as pos UNWIND pos as p WITH node_utterance, p, nodes[p] as n SET n.position_in_utterance = p + 1 '''.format(w_type=w_type, corpus_name=self.cypher_safe_name) split_names = None if split_names is None: if call_back is not None: call_back('Encoding utterance position...') call_back(0, 0) self.execute_cypher(statement) else: if call_back is not None: call_back(0, len(split_names)) for i, s in enumerate(split_names): if stop_check is not None and stop_check(): return if call_back is not None: call_back(i) call_back('Encoding utterance positions for {} {} of {} ({})...'.format(self.config.query_behavior, i, len(split_names), s)) self.execute_cypher(statement, split_name=s) self.hierarchy.add_token_properties(self, w_type, [('position_in_utterance', float)]) def reset_utterance_position(self): """resets position_in_utterance""" self.reset_property(self.word_name, 'position_in_utterance') def encode_speech_rate(self, subset_label, call_back=None, stop_check=None): """ Encodes speech rate Parameters ---------- subset_label : str the name of the subset to encode """ self.encode_rate('utterance', self.phone_name, 'speech_rate', subset=subset_label) def reset_speech_rate(self): """ resets speech_rate """ self.reset_property('utterance', 'speech_rate') def enrich_utterances(self, utterance_data, type_data=None): """ adds properties to lexicon, adds properties to hierarchy Parameters ---------- utterance_data : dict the data to enrich with type_data : dict default to None """ if type_data is None: type_data = {k: type(v) for k, v in next(iter(utterance_data.values())).items()} # self.add_type_properties('utterance', type_data) utterance_enriched_data_to_csvs(self, utterance_data) import_utterance_enrichment_csvs(self, type_data) self.hierarchy.add_type_properties(self, 'utterance', type_data.items()) self.encode_hierarchy()