Source code for polyglotdb.corpus.summarized

from polyglotdb.corpus.phonological import PhonologicalContext
from polyglotdb.exceptions import GraphQueryError
from polyglotdb.query.base.func import Average


[docs] class SummarizedContext(PhonologicalContext): """ Class that contains methods for dealing specifically with summary measures for linguistic items """ def get_measure(self, data_name, statistic, annotation_type, by_speaker=False, speaker=None): """ abstract function to get statistic for the data_name of an annotation_type Parameters ---------- data_name : str the aspect to summarize (duration, pitch, formants, etc) statistic : str how to summarize (mean, stdev, median, etc) annotation_type : str the annotation to summarize by_speaker : boolean whether to summarize by speaker or not speaker : str the specific speaker to encode baseline duration for (only for baseline duration) """ baseline = False column = statistic + "_" + data_name percent = "" if data_name == "duration": num_prop = "p.end - p.begin" m = "" if statistic == "mean": m = "avg" elif statistic == "stdev": m = statistic elif statistic == "median": m = "percentileDisc" percent = ", .5" elif statistic == "baseline": baseline = True result = self.baseline_duration(annotation_type, speaker) else: raise ( AttributeError( "The statistic {} is not a valid option. Options are mean, median, stdev, or baseline".format( statistic ) ) ) if not self.hierarchy.has_type_property("utterance", "label"): self.encode_utterances() if speaker is not None: statement = ( "MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) " "where s.name = '{speaker}' " "RETURN p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format( corpus_name=self.cypher_safe_name, annotation_type=annotation_type, measure=m, num_prop=num_prop, percent=percent, speaker=speaker, column=column, ) ) if by_speaker: statement = ( "MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) " "RETURN s.name as speaker, p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format( corpus_name=self.cypher_safe_name, annotation_type=annotation_type, measure=m, num_prop=num_prop, percent=percent, column=column, ) ) else: statement = "MATCH (p:{annotation_type}:{corpus_name}) RETURN p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format( corpus_name=self.cypher_safe_name, annotation_type=annotation_type, measure=m, num_prop=num_prop, percent=percent, column=column, ) if not baseline: result = [] res = self.execute_cypher(statement) for item in res: result.append(item) return result def baseline_duration(self, annotation, speaker=None): """ Get the baseline duration of each word in corpus. Baseline duration is determined by summing the average durations of constituent phones for a word. If there is no underlying transcription available, the longest duration is considered the baseline. Parameters ---------- speaker : str a speaker name, if desired (defaults to None) Returns ------- word_totals : dict a dictionary of words and baseline durations """ index = "label" if annotation == "utterance": ## TODO: find a good key for utterances (labels too long anyway and are None) index = "id" if not self.hierarchy.has_type_property("utterance", "label"): raise AttributeError(f"Annotation type '{annotation}' not found.") if annotation == "syllable": if not self.hierarchy.has_type_property("syllable", "label"): raise AttributeError(f"Annotation type '{annotation}' not found.") speaker_statement = f""" MATCH (m:phone:{self.cypher_safe_name})-[:spoken_by]->(s:Speaker:{self.cypher_safe_name}) where s.name = '{speaker}' with m.{index} as target, avg(m.end-m.begin) as dur with target,dur match (p:phone:{self.cypher_safe_name}) where p.{index} = target set p.average_duration = dur with p as phone match(n:{annotation}:{self.cypher_safe_name}) where phone.begin>=n.begin and phone.end<=n.end with n,phone with n, n.{index} as target, sum(phone.average_duration) as baseline set n.baseline_duration = baseline return n.{index}, n.baseline_duration""" statement = f""" MATCH (m:phone:{self.cypher_safe_name}) with m.{index} as target, avg(m.end-m.begin) as dur with target,dur match (p:phone:{self.cypher_safe_name}) where p.{index} = target set p.average_duration = dur with p as phone match(n:{annotation}:{self.cypher_safe_name}) where phone.begin>=n.begin and phone.end<=n.end with n,phone with n, n.{index} as target, sum(phone.average_duration) as baseline set n.baseline_duration = baseline return n.{index} as label, n.baseline_duration as baseline_duration""" if speaker is not None: statement = speaker_statement res = self.execute_cypher(statement) result = {} for c in res: result.update({c["label"]: c["baseline_duration"]}) return result # SPEAKER def average_speech_rate(self): """ Get the average speech rate for each speaker in a corpus Returns ------- result: list the average speech rate by speaker """ if "utterance" not in self.annotation_types: raise GraphQueryError("Utterances must be encodes to calculate average speech rate.") if "syllable" not in self.annotation_types: raise GraphQueryError("Syllables must be encodes to calculate average speech rate.") q = self.query_graph(self.utterance) res = q.group_by(self.utterance.speaker.name.column_name("name")).aggregate( Average(self.utterance.syllable.rate) ) return res def make_dict(self, data, speaker=False, label=None): """ turn data results into a dictionary for encoding Parameters ---------- data : list a list returned by cypher Returns ------- finaldict : dict a dictionary in the format for enrichment """ finalDict = {} if not speaker: if isinstance(data, list) and len(data[0]) == 2: for i, r in enumerate(data): finalDict.update({r[0]: {str(data[1].keys()[1]): r[1]}}) else: for r in data.keys(): finalDict.update({r: {"baseline_duration": data[r]}}) if speaker: keys = data[0].keys() speaker = data[0].values()[0] prop = keys[2] firstDict = {x[label]: x[prop] for x in data} speakerDict = self.make_speaker_annotations_dict(firstDict, speaker, prop) return speakerDict return finalDict def encode_measure(self, property_name, statistic, annotation_type, by_speaker=False): """ Compute and save an aggregate measure for annotation types Available statistic names: * mean/average/avg * sd/stdev Parameters ---------- property_name : str Name of the property statistic : str Name of the statistic to use for aggregation annotation_type : str Name of the annotation type by_speaker : bool Flag for whether to compute aggregation by speaker """ if property_name == "duration": property = "a.end - a.begin" else: property = "a.{}".format(property_name) if statistic.lower() in ["mean", "average", "avg"]: func = "avg" name = "mean" elif statistic.lower() in ["sd", "stdev"]: func = "stdev" name = "sd" if by_speaker: statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})<-[:is_a]-(a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) with a_type, s, {func}({property}) as value MERGE (a_type)-[r:spoken_by]->(s) with r, value set r.{func_name}_{property_name} = value""" else: statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})<-[:is_a]-(a:{annotation_type}:{corpus_name}) with a_type, {func}({property}) as value set a_type.{func_name}_{property_name} = value """ self.execute_cypher( statement.format( corpus_name=self.cypher_safe_name, annotation_type=annotation_type, property=property, func_name=name, func=func, property_name=property_name, ) ) self.hierarchy.add_type_properties( self, annotation_type, [("_".join([name, property_name]), float)] ) self.encode_hierarchy() def encode_baseline(self, annotation_type, property_name, by_speaker=False): """ Encode a baseline measure of a property, that is, the expected value of a higher annotation given the average property value of the phones that make it up. For instance, the expected duration of a word or syllable given its phonological content. Parameters ---------- annotation_type : str Name of annotation type to compute for property_name : str Property of phones to compute based off of (i.e., ``duration``) by_speaker : bool Flag for whether to use by-speaker means """ if by_speaker: exists_statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) RETURN 1 LIMIT 1""".format( annotation_type=annotation_type, corpus_name=self.cypher_safe_name ) if len(list(self.execute_cypher(exists_statement))) == 0: self.encode_measure(property_name, "mean", "phone", by_speaker) statement = """MATCH (a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) with a, s MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s) WITH a, sum(r.mean_{property_name}) as baseline SET a.baseline_{property_name}_by_speaker = baseline""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, property_name=property_name, annotation_type=annotation_type, ) self.execute_cypher(statement) self.hierarchy.add_token_properties( self, annotation_type, [("baseline_{}_by_speaker".format(property_name), float)], ) else: if not self.hierarchy.has_type_property("phone", "mean_" + property_name): self.encode_measure(property_name, "mean", "phone", by_speaker) statement = """MATCH (a:{annotation_type}:{corpus_name}) with a MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name}) WITH a, sum(pt.mean_{property_name}) as baseline SET a.baseline_{property_name} = baseline""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, property_name=property_name, annotation_type=annotation_type, ) self.execute_cypher(statement) self.hierarchy.add_token_properties( self, annotation_type, [("baseline_{}".format(property_name), float)] ) self.encode_hierarchy() def encode_relativized(self, annotation_type, property_name, by_speaker=False): """ Compute and save to the database a relativized measure (i.e., the property value z-scored using a mean and standard deviation computed from the corpus). The computation of means and standard deviations can be by-speaker. Parameters ---------- annotation_type : str Name of the annotation type property_name : str Name of the property to relativize by_speaker : bool Flag to use by-speaker means and standard deviations """ if property_name == "duration": property_descriptor = "(p.end - p.begin)" else: property_descriptor = "p.{}".format(property_name) if by_speaker: exists_statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) RETURN 1 LIMIT 1""".format( annotation_type=annotation_type, corpus_name=self.cypher_safe_name ) res = list(self.execute_cypher(exists_statement)) if len(res) == 0: self.encode_measure(property_name, "mean", "phone", by_speaker) self.encode_measure(property_name, "sd", "phone", by_speaker) else: try: res[0]["mean_{}".format(property_name)] except KeyError: self.encode_measure(property_name, "mean", "phone", by_speaker) try: res[0]["sd_{}".format(property_name)] except KeyError: self.encode_measure(property_name, "sd", "phone", by_speaker) if annotation_type == self.phone_name: statement = """MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) with p, s MATCH (p)-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s) WITH p, avg(case when r.sd_{property_name} > 0 THEN ({property_descriptor} - r.mean_{property_name}) / r.sd_{property_name} ELSE 0 END) as relativized SET p.relativized_{property_name}_by_speaker = relativized""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, annotation_type=annotation_type, property_name=property_name, property_descriptor=property_descriptor, ) else: statement = """MATCH (a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) with a, s MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s) WITH a, avg(case when r.sd_{property_name} > 0 THEN ({property_descriptor} - r.mean_{property_name}) / r.sd_{property_name} ELSE 0 END) as relativized SET a.relativized_{property_name}_by_speaker = relativized""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, annotation_type=annotation_type, property_name=property_name, property_descriptor=property_descriptor, ) self.execute_cypher(statement) self.hierarchy.add_token_properties( self, annotation_type, [("relativized_{}_by_speaker".format(property_name), float)], ) else: if not self.hierarchy.has_type_property("phone", "mean_{}".format(property_name)): self.encode_measure(property_name, "mean", "phone", by_speaker) if not self.hierarchy.has_type_property("phone", "sd_{}".format(property_name)): self.encode_measure(property_name, "sd", "phone", by_speaker) if annotation_type == self.phone_name: statement = """MATCH (p:{annotation_type}:{corpus_name}) with p MATCH (p)-[:is_a]->(pt:{phone_name}_type:{corpus_name}) WITH p, avg(case when pt.sd_{property_name} > 0 THEN ({property_descriptor} - pt.mean_{property_name}) / pt.sd_{property_name} ELSE 0 END) as relativized SET p.relativized_{property_name} = relativized""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, annotation_type=annotation_type, property_name=property_name, property_descriptor=property_descriptor, ) else: statement = """MATCH (a:{annotation_type}:{corpus_name}) with a MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name}) WITH a, avg(case when pt.sd_{property_name} > 0 THEN ({property_descriptor} - pt.mean_{property_name}) / pt.sd_{property_name} ELSE 0 END) as relativized SET a.relativized_{property_name} = relativized""".format( corpus_name=self.cypher_safe_name, phone_name=self.phone_name, annotation_type=annotation_type, property_name=property_name, property_descriptor=property_descriptor, ) self.execute_cypher(statement) self.hierarchy.add_token_properties( self, annotation_type, [("relativized_{}".format(property_name), float)] ) self.encode_hierarchy()