Source code for polyglotdb.corpus.syllabic

import re
from uuid import uuid1

from polyglotdb.corpus.utterance import UtteranceContext
from polyglotdb.io.helper import make_type_id
from polyglotdb.io.importer import (
    create_nonsyllabic_csvs,
    create_syllabic_csvs,
    import_nonsyl_csv,
    import_syllable_csv,
    import_syllable_enrichment_csvs,
    nonsyls_data_to_csvs,
    syllables_data_to_csvs,
    syllables_enrichment_data_to_csvs,
)
from polyglotdb.syllabification.maxonset import split_nonsyllabic_maxonset, split_ons_coda_maxonset
from polyglotdb.syllabification.probabilistic import (
    norm_count_dict,
    split_nonsyllabic_prob,
    split_ons_coda_prob,
)


def make_label_safe_for_cypher(label):
    """
    Make a given subset name safe for use in Cypher

    Parameters
    ----------
    label : str
        Subset name

    Returns
    -------
    str
        Cypher-safe name
    """
    if not label.startswith("`"):
        label = "`" + label
    if not label.endswith("`"):
        label += "`"
    return label


[docs] class SyllabicContext(UtteranceContext): """ Class that contains methods for dealing specifically with syllables """ def find_onsets(self, syllabic_label="syllabic"): """ Gets syllable onsets across the corpus Parameters ---------- syllabic_label : str Subset to use for syllabic segments (i.e., nuclei) Returns ------- data : dict A dictionary with onset values as keys and frequency values as values """ from collections import Counter data = Counter() syllabic_name = make_label_safe_for_cypher(syllabic_label) for s in self.speakers: discourses = self.get_discourses_of_speaker(s) for d in discourses: statement = f"""match (w:{self.word_name}:{self.cypher_safe_name})-[:spoken_by]->(s:Speaker:{self.cypher_safe_name}), (w)-[:spoken_in]->(d:Discourse:{self.cypher_safe_name}) where (w)<-[:contained_by]-()-[:is_a]->(:{syllabic_name}) AND s.name = $speaker AND d.name = $discourse with w match (n:{self.phone_name}:{self.cypher_safe_name})-[:is_a]->(t:{syllabic_name}:{self.cypher_safe_name}), (n)-[:contained_by]->(w) with w, n order by n.begin with w,collect(n)[0..1] as coll unwind coll as n MATCH (pn:{self.phone_name}:{self.cypher_safe_name})-[:contained_by]->(w) where not (pn)<-[:precedes]-()-[:contained_by]->(w) with w, n,pn match p = shortestPath((pn)-[:precedes*0..10]->(n)) with [x in nodes(p)[0..-1]|x.label] as onset return onset, count(onset) as freq""" res = self.execute_cypher(statement, speaker=s, discourse=d) for r in res: data[tuple(r["onset"])] += r["freq"] return data def find_codas(self, syllabic_label="syllabic"): """ Gets syllable codas across the corpus Parameters ---------- syllabic_label : str Subset to use for syllabic segments (i.e., nuclei) Returns ------- data : dict A dictionary with coda values as keys and frequency values as values """ from collections import Counter data = Counter() syllabic_name = make_label_safe_for_cypher(syllabic_label) for s in self.speakers: discourses = self.get_discourses_of_speaker(s) for d in discourses: statement = f"""match (w:{self.word_name}:{self.cypher_safe_name})-[:spoken_by]->(s:Speaker:{self.cypher_safe_name}), (w)-[:spoken_in]->(d:Discourse:{self.cypher_safe_name}) where (w)<-[:contained_by]-()-[:is_a]->(:{syllabic_name}) AND s.name = $speaker AND d.name = $discourse with w match (n:{self.phone_name}:{self.cypher_safe_name})-[:is_a]->(t:{syllabic_name}:{self.cypher_safe_name}), (n)-[:contained_by]->(w) with w, n order by n.begin DESC with w,collect(n)[0..1] as coll unwind coll as n MATCH (pn:{self.phone_name}:{self.cypher_safe_name})-[:contained_by]->(w) where not (pn)-[:precedes]->()-[:contained_by]->(w) with w, n,pn match p = shortestPath((n)-[:precedes*0..10]->(pn)) with [x in nodes(p)[1..]|x.label] as coda return coda, count(coda) as freq""" res = self.execute_cypher(statement, speaker=s, discourse=d) for r in res: data[tuple(r["coda"])] += r["freq"] return data def encode_syllabic_segments(self, phones): """ Encode a list of phones as 'syllabic' Parameters ---------- phones : list A list of vowels and syllabic consonants """ self.encode_class(phones, "syllabic") def reset_syllables(self, call_back=None, stop_check=None): """ Resets syllables, removes syllable annotation, removes onset, coda, and nucleus labels Parameters ---------- call_back : callable Function to monitor progress stop_check : callable Function the check whether the process should terminate early """ if call_back is not None: call_back("Resetting syllables...") number = self.execute_cypher( f"""MATCH (n:syllable:{self.cypher_safe_name}) return count(*) as number """ )[0]["number"] call_back(0, number) for s in self.speakers: discourses = self.get_discourses_of_speaker(s) for d in discourses: phone_rel_statement = f""" MATCH (p:{self.phone_name}:{self.cypher_safe_name})-[:contained_by]->(s:syllable:{self.cypher_safe_name}), (s)-[:contained_by]->(w:{self.word_name}:{self.cypher_safe_name}), (s)-[:spoken_by]->(sp:Speaker:{self.cypher_safe_name}), (s)-[:spoken_in]->(d:Discourse:{self.cypher_safe_name}) WHERE sp.name = $speaker_name AND d.name = $discourse_name with p,w CREATE (p)-[:contained_by]->(w) """ self.execute_cypher(phone_rel_statement, speaker_name=s, discourse_name=d) phone_label_statement = f""" MATCH (p:{self.phone_name}:{self.cypher_safe_name})-[:spoken_by]->(sp:Speaker:{self.cypher_safe_name}), (p)-[:spoken_in]->(d:Discourse:{self.cypher_safe_name}) WHERE sp.name = $speaker_name AND d.name = $discourse_name with p REMOVE p:onset, p:nucleus, p:coda, p.syllable_position """ self.execute_cypher(phone_label_statement, speaker_name=s, discourse_name=d) num_deleted = 0 deleted = 1000 delete_statement = f""" MATCH (s:syllable:{self.cypher_safe_name})-[:spoken_by]->(sp:Speaker:{self.cypher_safe_name}), (s)-[:spoken_in]->(d:Discourse:{self.cypher_safe_name}) WHERE sp.name = $speaker_name AND d.name = $discourse_name WITH s LIMIT 1000 DETACH DELETE s RETURN count(s) as deleted_count """ while deleted > 0: if stop_check is not None and stop_check(): break deleted = self.execute_cypher( delete_statement, speaker_name=s, discourse_name=d )[0]["deleted_count"] num_deleted += deleted if call_back is not None: call_back(num_deleted) statement = f"""MATCH (st:syllable_type:{self.cypher_safe_name}) WITH st DETACH DELETE st""" self.execute_cypher(statement) try: self.hierarchy.remove_annotation_type("syllable") self.hierarchy.remove_token_subsets( self, self.phone_name, ["onset", "coda", "nucleus"] ) self.hierarchy.remove_token_properties(self, self.phone_name, ["syllable_position"]) # self.reset_to_old_label() self.encode_hierarchy() except KeyError: pass @property def has_syllabics(self): """ Check whether there is a phone subset named ``syllabic`` Returns ------- bool True if ``syllabic`` is found as a phone subset """ return "syllabic" in self.hierarchy.subset_types[self.phone_name] @property def has_syllables(self): """ Check whether the corpus has syllables encoded Returns ------- bool True if the syllables are in the Hierarchy """ return "syllable" in self.hierarchy.annotation_types def encode_syllables( self, algorithm="maxonset", syllabic_label="syllabic", call_back=None, stop_check=None, custom_onsets=None, ): """ Encodes syllables to a corpus Parameters ---------- algorithm : str, defaults to 'maxonset' determines which algorithm will be used to encode syllables syllabic_label : str Subset to use for syllabic segments (i.e., nuclei) call_back : callable Function to monitor progress stop_check : callable Function the check whether the process should terminate early custom_onsets: set, defaults to None A set of custom onsets to use instead of finding them from the corpus. If None, the onsets will be found from the corpus. """ self.reset_syllables(call_back, stop_check) if algorithm == "maxonset" and custom_onsets is not None: phones = set(self.phones) onsets = set() for onset in custom_onsets: if not isinstance(onset, tuple): raise ValueError( f"Each onset must be a tuple, got: {repr(onset)} ({type(onset)})." ) if onset == (): onsets.add(onset) continue for seg in onset: if seg not in phones: print( f"Skipping onset '{onset}' since phone segment '{seg}' is not present in the corpus." ) break onsets.add(onset) else: onsets = self.find_onsets(syllabic_label=syllabic_label) if algorithm == "probabilistic": onsets = norm_count_dict(onsets, onset=True) codas = self.find_codas(syllabic_label=syllabic_label) codas = norm_count_dict(codas, onset=False) elif algorithm == "maxonset": if custom_onsets is None: onsets = sorted(set(onsets.keys())) print(f"Onsets found by max onset: {onsets}") else: raise NotImplementedError statement = """MATCH (n:{}:{}) return n.label as label""".format( self.cypher_safe_name, make_label_safe_for_cypher(syllabic_label) ) res = self.execute_cypher(statement) syllabics = set(x["label"] for x in res) word_type = getattr(self, self.word_name) phone_type = getattr(word_type, self.phone_name) create_syllabic_csvs(self) create_nonsyllabic_csvs(self) process_string = "Processing speaker {} of {} ({})..." if call_back is not None: call_back(0, len(self.speakers)) for speaker_ind, s in enumerate(self.speakers): if stop_check is not None and stop_check(): break if call_back is not None: call_back(speaker_ind) call_back(process_string.format(speaker_ind, len(self.speakers), s)) discourses = self.get_discourses_of_speaker(s) for d in discourses: syllables = [] non_syllables = [] q = self.query_graph(word_type) q = q.filter(word_type.speaker.name == s) q = q.filter(word_type.discourse.name == d) q = q.order_by(word_type.begin) q = q.columns( word_type.id.column_name("id"), phone_type.id.column_name("phone_id"), word_type.begin.column_name("begin"), word_type.label.column_name("label"), word_type.end.column_name("end"), phone_type.label.column_name("phones"), phone_type.begin.column_name("begins"), phone_type.end.column_name("ends"), ) results = q.all() prev_id = None for w in results: phones = w["phones"] phone_ids = w["phone_id"] if not phone_ids: print( "The word {} in file {} ({} to {}) did not have any phones.".format( w["label"], d, w["begin"], w["end"] ) ) continue phone_begins = w["begins"] phone_ends = w["ends"] vow_inds = [i for i, x in enumerate(phones) if x in syllabics] if len(vow_inds) == 0: cur_id = uuid1() if algorithm == "probabilistic": split = split_nonsyllabic_prob(phones, onsets, codas) else: split = split_nonsyllabic_maxonset(phones, onsets) label = ".".join(phones) row = { "id": cur_id, "prev_id": prev_id, "onset_id": phone_ids[0], "break": split, "coda_id": phone_ids[-1], "begin": phone_begins[0], "label": label, "type_id": make_type_id([label], self.corpus_name), "end": phone_ends[-1], } non_syllables.append(row) prev_id = cur_id continue for j, i in enumerate(vow_inds): cur_id = uuid1() cur_vow_id = phone_ids[i] if j == 0: begin_ind = 0 if i != 0: cur_ons_id = phone_ids[begin_ind] else: cur_ons_id = None else: prev_vowel_ind = vow_inds[j - 1] cons_string = phones[prev_vowel_ind + 1 : i] if algorithm == "probabilistic": split = split_ons_coda_prob(cons_string, onsets, codas) else: split = split_ons_coda_maxonset(cons_string, onsets) if split is None: cur_ons_id = None begin_ind = i else: begin_ind = prev_vowel_ind + 1 + split cur_ons_id = phone_ids[begin_ind] if j == len(vow_inds) - 1: end_ind = len(phones) - 1 if i != len(phones) - 1: cur_coda_id = phone_ids[end_ind] else: cur_coda_id = None else: foll_vowel_ind = vow_inds[j + 1] cons_string = phones[i + 1 : foll_vowel_ind] if algorithm == "probabilistic": split = split_ons_coda_prob(cons_string, onsets, codas) else: split = split_ons_coda_maxonset(cons_string, onsets) if split is None: cur_coda_id = None end_ind = i else: end_ind = i + split cur_coda_id = phone_ids[end_ind] begin = phone_begins[begin_ind] end = phone_ends[end_ind] label = ".".join(phones[begin_ind : end_ind + 1]) row = { "id": cur_id, "prev_id": prev_id, "vowel_id": cur_vow_id, "onset_id": cur_ons_id, "label": label, "type_id": make_type_id([label], self.corpus_name), "coda_id": cur_coda_id, "begin": begin, "end": end, } syllables.append(row) prev_id = cur_id syllables_data_to_csvs(self, s, d, syllables) nonsyls_data_to_csvs(self, s, d, non_syllables) import_syllable_csv(self, call_back, stop_check) import_nonsyl_csv(self, call_back, stop_check) if stop_check is not None and stop_check(): return if call_back is not None: call_back("Cleaning up...") for s in self.speakers: discourses = self.get_discourses_of_speaker(s) for d in discourses: self.execute_cypher( """MATCH (s:{corpus_name}:Speaker)<-[:spoken_by]-(n:{corpus_name}:syllable)-[:spoken_in]->(d:{corpus_name}:Discourse) where s.name = $speaker_name AND d.name = $discourse_name and n.prev_id is not Null REMOVE n.prev_id""".format( corpus_name=self.cypher_safe_name ), speaker_name=s, discourse_name=d, ) self.hierarchy.add_annotation_type("syllable", above=self.phone_name, below=self.word_name) self.hierarchy.add_token_subsets(self, self.phone_name, ["onset", "coda", "nucleus"]) self.hierarchy.add_token_properties(self, self.phone_name, [("syllable_position", str)]) self.encode_hierarchy() if call_back is not None: call_back("Finished!") call_back(1, 1) def enrich_syllables(self, syllable_data, type_data=None): """ Sets the data type and syllable data, initializes importers for syllable data, adds features to hierarchy for a phone Parameters ---------- syllable_data : dict the enrichment data type_data : dict By default None """ if type_data is None: type_data = {k: type(v) for k, v in next(iter(syllable_data.values())).items()} syllables_enrichment_data_to_csvs(self, syllable_data) import_syllable_enrichment_csvs(self, type_data) self.hierarchy.add_type_properties(self, "syllable", type_data.items()) self.encode_hierarchy() def _generate_stress_enrichment(self, pattern, clean_phone_label=True): syllable = self.syllable all_syls = self.query_graph(syllable).all() enrich_dict = {} for item in all_syls: syl = item["label"] splitsyl = syl.split(".") nucleus = splitsyl[0] for j, seg in enumerate(splitsyl): if re.search(pattern, seg) is not None: nucleus = seg r = re.search(pattern, nucleus) if r is not None: end = nucleus[r.start(0) : r.end(0)].replace("_", "") nucleus = re.sub(pattern, "", nucleus) fullpatt = str(nucleus) + str(pattern).replace("$", "") if clean_phone_label: syl = re.sub(fullpatt, nucleus, syl) enrich_dict.update({syl: {"stress": end}}) return enrich_dict def _generate_tone_enrichment(self, pattern, clean_phone_label=True): syllable = self.syllable all_syls = self.query_graph(syllable).all() enrich_dict = {} for x in all_syls.cursors: for item in x: syl = item[0]["label"] splitsyl = syl.split(".") nucleus = splitsyl[0] for seg in splitsyl: if re.search(pattern, seg) is not None: nucleus = seg r = re.search(pattern, nucleus) if r is not None: end = nucleus[r.start(0) : r.end(0)].replace("_", "") nucleus = re.sub(pattern, "", nucleus) fullpatt = str(nucleus) + str(pattern).replace("$", "") if clean_phone_label: syl = re.sub(fullpatt, nucleus, syl) enrich_dict.update({syl: {"tone": end}}) return enrich_dict def encode_stress_to_syllables(self, regex=None, clean_phone_label=True): """ Use numbers (0-9) in phone labels as stress property for syllables. If ``clean_phone_label`` is True, the numbers will be removed from the phone labels. Parameters ---------- regex : str Regular expression character set for finding stress in the phone label clean_phone_label : bool Flag for removing regular expression from the phone labels """ if regex is None: regex = "[0-9]" enrich_dict = self._generate_stress_enrichment(regex, clean_phone_label) if clean_phone_label: self.remove_pattern(regex) self.enrich_syllables(enrich_dict) self.encode_hierarchy() def encode_tone_to_syllables(self, regex=None, clean_phone_label=True): """ Use numbers (0-9) in phone labels as tone property for syllables. If ``clean_phone_label`` is True, the numbers will be removed from the phone labels. Parameters ---------- regex : str Regular expression character set for finding tone in the phone label clean_phone_label : bool Flag for removing regular expression from the phone labels """ if regex is None: regex = "[0-9]" enrich_dict = self._generate_tone_enrichment(regex, clean_phone_label) if clean_phone_label: self.remove_pattern(regex) self.enrich_syllables(enrich_dict) self.encode_hierarchy() def encode_stress_from_word_property(self, word_property_name): """ Use a property on words formatted like "0-1-0" to encode stress on syllables. The number of syllables and the position of syllables within a word will also be encoded as a result of this function. Parameters ---------- word_property_name : str Property name of words that contains the stress pattern """ if "syllable" not in self.annotation_types: raise Exception("Syllables have not been encoded.") if not self.hierarchy.has_type_property(self.word_name, word_property_name): raise Exception("Word types do not have a property {}.".format(word_property_name)) if not self.hierarchy.has_type_property(self.word_name, "num_syllables"): self.encode_count("word", "syllable", "num_syllables") if not self.hierarchy.has_type_property("syllable", "position_in_word"): self.encode_position("word", "syllable", "position_in_word") for s in self.speakers: discourses = self.get_discourses_of_speaker(s) for d in discourses: statement = """MATCH (s:syllable:{corpus_name})-[:spoken_by]->(speaker:Speaker:{corpus_name}), (s)-[:spoken_in]->(discourse:Discourse:{corpus_name}), (s)-[:contained_by]->(w:word:{corpus_name})-[:is_a]->(wt:word_type:{corpus_name}) WHERE speaker.name = $speaker_name AND discourse.name = $discourse_name AND wt.{word_property_name} is not null WITH s, w, split(wt.{word_property_name}, '-') as stresses WHERE size(stresses) = w.num_syllables SET s.stress = stresses[s.position_in_word-1]""".format( corpus_name=self.cypher_safe_name, word_property_name=word_property_name, ) self.execute_cypher(statement, speaker_name=s, discourse_name=d) self.hierarchy.add_token_properties(self, "syllable", [("stress", str)]) self.encode_hierarchy()