from polyglotdb.corpus.phonological import PhonologicalContext
from polyglotdb.exceptions import GraphQueryError
from polyglotdb.query.base.func import Average
[docs]
class SummarizedContext(PhonologicalContext):
"""
Class that contains methods for dealing specifically with summary measures for linguistic items
"""
def get_measure(self, data_name, statistic, annotation_type, by_speaker=False, speaker=None):
"""
abstract function to get statistic for the data_name of an annotation_type
Parameters
----------
data_name : str
the aspect to summarize (duration, pitch, formants, etc)
statistic : str
how to summarize (mean, stdev, median, etc)
annotation_type : str
the annotation to summarize
by_speaker : boolean
whether to summarize by speaker or not
speaker : str
the specific speaker to encode baseline duration for (only for baseline duration)
"""
baseline = False
column = statistic + "_" + data_name
percent = ""
if data_name == "duration":
num_prop = "p.end - p.begin"
m = ""
if statistic == "mean":
m = "avg"
elif statistic == "stdev":
m = statistic
elif statistic == "median":
m = "percentileDisc"
percent = ", .5"
elif statistic == "baseline":
baseline = True
result = self.baseline_duration(annotation_type, speaker)
else:
raise (
AttributeError(
"The statistic {} is not a valid option. Options are mean, median, stdev, or baseline".format(
statistic
)
)
)
if not self.hierarchy.has_type_property("utterance", "label"):
self.encode_utterances()
if speaker is not None:
statement = (
"MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) "
"where s.name = '{speaker}' "
"RETURN p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format(
corpus_name=self.cypher_safe_name,
annotation_type=annotation_type,
measure=m,
num_prop=num_prop,
percent=percent,
speaker=speaker,
column=column,
)
)
if by_speaker:
statement = (
"MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name}) "
"RETURN s.name as speaker, p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format(
corpus_name=self.cypher_safe_name,
annotation_type=annotation_type,
measure=m,
num_prop=num_prop,
percent=percent,
column=column,
)
)
else:
statement = "MATCH (p:{annotation_type}:{corpus_name}) RETURN p.label as {annotation_type}, {measure}({num_prop}{percent}) as {column}".format(
corpus_name=self.cypher_safe_name,
annotation_type=annotation_type,
measure=m,
num_prop=num_prop,
percent=percent,
column=column,
)
if not baseline:
result = []
res = self.execute_cypher(statement)
for item in res:
result.append(item)
return result
def baseline_duration(self, annotation, speaker=None):
"""
Get the baseline duration of each word in corpus.
Baseline duration is determined by summing the average durations of constituent phones for a word.
If there is no underlying transcription available, the longest duration is considered the baseline.
Parameters
----------
speaker : str
a speaker name, if desired (defaults to None)
Returns
-------
word_totals : dict
a dictionary of words and baseline durations
"""
index = "label"
if annotation == "utterance":
## TODO: find a good key for utterances (labels too long anyway and are None)
index = "id"
if not self.hierarchy.has_type_property("utterance", "label"):
raise AttributeError(f"Annotation type '{annotation}' not found.")
if annotation == "syllable":
if not self.hierarchy.has_type_property("syllable", "label"):
raise AttributeError(f"Annotation type '{annotation}' not found.")
speaker_statement = f"""
MATCH (m:phone:{self.cypher_safe_name})-[:spoken_by]->(s:Speaker:{self.cypher_safe_name}) where s.name = '{speaker}'
with m.{index} as target, avg(m.end-m.begin) as dur
with target,dur
match (p:phone:{self.cypher_safe_name})
where p.{index} = target set p.average_duration = dur
with p as phone
match(n:{annotation}:{self.cypher_safe_name}) where phone.begin>=n.begin and phone.end<=n.end
with n,phone with n, n.{index} as target, sum(phone.average_duration) as baseline
set n.baseline_duration = baseline return n.{index}, n.baseline_duration"""
statement = f"""
MATCH (m:phone:{self.cypher_safe_name})
with m.{index} as target, avg(m.end-m.begin) as dur
with target,dur match (p:phone:{self.cypher_safe_name})
where p.{index} = target set p.average_duration = dur
with p as phone
match(n:{annotation}:{self.cypher_safe_name}) where phone.begin>=n.begin and phone.end<=n.end
with n,phone
with n, n.{index} as target, sum(phone.average_duration) as baseline
set n.baseline_duration = baseline return n.{index} as label, n.baseline_duration as baseline_duration"""
if speaker is not None:
statement = speaker_statement
res = self.execute_cypher(statement)
result = {}
for c in res:
result.update({c["label"]: c["baseline_duration"]})
return result
# SPEAKER
def average_speech_rate(self):
"""
Get the average speech rate for each speaker in a corpus
Returns
-------
result: list
the average speech rate by speaker
"""
if "utterance" not in self.annotation_types:
raise GraphQueryError("Utterances must be encodes to calculate average speech rate.")
if "syllable" not in self.annotation_types:
raise GraphQueryError("Syllables must be encodes to calculate average speech rate.")
q = self.query_graph(self.utterance)
res = q.group_by(self.utterance.speaker.name.column_name("name")).aggregate(
Average(self.utterance.syllable.rate)
)
return res
def make_dict(self, data, speaker=False, label=None):
"""
turn data results into a dictionary for encoding
Parameters
----------
data : list
a list returned by cypher
Returns
-------
finaldict : dict
a dictionary in the format for enrichment
"""
finalDict = {}
if not speaker:
if isinstance(data, list) and len(data[0]) == 2:
for i, r in enumerate(data):
finalDict.update({r[0]: {str(data[1].keys()[1]): r[1]}})
else:
for r in data.keys():
finalDict.update({r: {"baseline_duration": data[r]}})
if speaker:
keys = data[0].keys()
speaker = data[0].values()[0]
prop = keys[2]
firstDict = {x[label]: x[prop] for x in data}
speakerDict = self.make_speaker_annotations_dict(firstDict, speaker, prop)
return speakerDict
return finalDict
def encode_measure(self, property_name, statistic, annotation_type, by_speaker=False):
"""
Compute and save an aggregate measure for annotation types
Available statistic names:
* mean/average/avg
* sd/stdev
Parameters
----------
property_name : str
Name of the property
statistic : str
Name of the statistic to use for aggregation
annotation_type : str
Name of the annotation type
by_speaker : bool
Flag for whether to compute aggregation by speaker
"""
if property_name == "duration":
property = "a.end - a.begin"
else:
property = "a.{}".format(property_name)
if statistic.lower() in ["mean", "average", "avg"]:
func = "avg"
name = "mean"
elif statistic.lower() in ["sd", "stdev"]:
func = "stdev"
name = "sd"
if by_speaker:
statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})<-[:is_a]-(a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
with a_type, s, {func}({property}) as value
MERGE (a_type)-[r:spoken_by]->(s)
with r, value
set r.{func_name}_{property_name} = value"""
else:
statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})<-[:is_a]-(a:{annotation_type}:{corpus_name})
with a_type, {func}({property}) as value
set a_type.{func_name}_{property_name} = value
"""
self.execute_cypher(
statement.format(
corpus_name=self.cypher_safe_name,
annotation_type=annotation_type,
property=property,
func_name=name,
func=func,
property_name=property_name,
)
)
self.hierarchy.add_type_properties(
self, annotation_type, [("_".join([name, property_name]), float)]
)
self.encode_hierarchy()
def encode_baseline(self, annotation_type, property_name, by_speaker=False):
"""
Encode a baseline measure of a property, that is, the expected value of a higher annotation given the average
property value of the phones that make it up. For instance, the expected duration of a word or syllable given
its phonological content.
Parameters
----------
annotation_type : str
Name of annotation type to compute for
property_name : str
Property of phones to compute based off of (i.e., ``duration``)
by_speaker : bool
Flag for whether to use by-speaker means
"""
if by_speaker:
exists_statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
RETURN 1 LIMIT 1""".format(
annotation_type=annotation_type, corpus_name=self.cypher_safe_name
)
if len(list(self.execute_cypher(exists_statement))) == 0:
self.encode_measure(property_name, "mean", "phone", by_speaker)
statement = """MATCH (a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
with a, s
MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s)
WITH a, sum(r.mean_{property_name}) as baseline
SET a.baseline_{property_name}_by_speaker = baseline""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
property_name=property_name,
annotation_type=annotation_type,
)
self.execute_cypher(statement)
self.hierarchy.add_token_properties(
self,
annotation_type,
[("baseline_{}_by_speaker".format(property_name), float)],
)
else:
if not self.hierarchy.has_type_property("phone", "mean_" + property_name):
self.encode_measure(property_name, "mean", "phone", by_speaker)
statement = """MATCH (a:{annotation_type}:{corpus_name})
with a
MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})
WITH a, sum(pt.mean_{property_name}) as baseline
SET a.baseline_{property_name} = baseline""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
property_name=property_name,
annotation_type=annotation_type,
)
self.execute_cypher(statement)
self.hierarchy.add_token_properties(
self, annotation_type, [("baseline_{}".format(property_name), float)]
)
self.encode_hierarchy()
def encode_relativized(self, annotation_type, property_name, by_speaker=False):
"""
Compute and save to the database a relativized measure (i.e., the property value z-scored using a mean and
standard deviation computed from the corpus). The computation of means and standard deviations can be by-speaker.
Parameters
----------
annotation_type : str
Name of the annotation type
property_name : str
Name of the property to relativize
by_speaker : bool
Flag to use by-speaker means and standard deviations
"""
if property_name == "duration":
property_descriptor = "(p.end - p.begin)"
else:
property_descriptor = "p.{}".format(property_name)
if by_speaker:
exists_statement = """MATCH (a_type:{annotation_type}_type:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
RETURN 1 LIMIT 1""".format(
annotation_type=annotation_type, corpus_name=self.cypher_safe_name
)
res = list(self.execute_cypher(exists_statement))
if len(res) == 0:
self.encode_measure(property_name, "mean", "phone", by_speaker)
self.encode_measure(property_name, "sd", "phone", by_speaker)
else:
try:
res[0]["mean_{}".format(property_name)]
except KeyError:
self.encode_measure(property_name, "mean", "phone", by_speaker)
try:
res[0]["sd_{}".format(property_name)]
except KeyError:
self.encode_measure(property_name, "sd", "phone", by_speaker)
if annotation_type == self.phone_name:
statement = """MATCH (p:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
with p, s
MATCH (p)-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s)
WITH p, avg(case when r.sd_{property_name} > 0 THEN ({property_descriptor} - r.mean_{property_name}) / r.sd_{property_name} ELSE 0 END) as relativized
SET p.relativized_{property_name}_by_speaker = relativized""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
annotation_type=annotation_type,
property_name=property_name,
property_descriptor=property_descriptor,
)
else:
statement = """MATCH (a:{annotation_type}:{corpus_name})-[:spoken_by]->(s:Speaker:{corpus_name})
with a, s
MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})-[r:spoken_by]->(s)
WITH a, avg(case when r.sd_{property_name} > 0 THEN ({property_descriptor} - r.mean_{property_name}) / r.sd_{property_name} ELSE 0 END) as relativized
SET a.relativized_{property_name}_by_speaker = relativized""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
annotation_type=annotation_type,
property_name=property_name,
property_descriptor=property_descriptor,
)
self.execute_cypher(statement)
self.hierarchy.add_token_properties(
self,
annotation_type,
[("relativized_{}_by_speaker".format(property_name), float)],
)
else:
if not self.hierarchy.has_type_property("phone", "mean_{}".format(property_name)):
self.encode_measure(property_name, "mean", "phone", by_speaker)
if not self.hierarchy.has_type_property("phone", "sd_{}".format(property_name)):
self.encode_measure(property_name, "sd", "phone", by_speaker)
if annotation_type == self.phone_name:
statement = """MATCH (p:{annotation_type}:{corpus_name})
with p
MATCH (p)-[:is_a]->(pt:{phone_name}_type:{corpus_name})
WITH p, avg(case when pt.sd_{property_name} > 0 THEN ({property_descriptor} - pt.mean_{property_name}) / pt.sd_{property_name} ELSE 0 END) as relativized
SET p.relativized_{property_name} = relativized""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
annotation_type=annotation_type,
property_name=property_name,
property_descriptor=property_descriptor,
)
else:
statement = """MATCH (a:{annotation_type}:{corpus_name})
with a
MATCH (a)<-[:contained_by*]-(p:{phone_name}:{corpus_name})-[:is_a]->(pt:{phone_name}_type:{corpus_name})
WITH a, avg(case when pt.sd_{property_name} > 0 THEN ({property_descriptor} - pt.mean_{property_name}) / pt.sd_{property_name} ELSE 0 END) as relativized
SET a.relativized_{property_name} = relativized""".format(
corpus_name=self.cypher_safe_name,
phone_name=self.phone_name,
annotation_type=annotation_type,
property_name=property_name,
property_descriptor=property_descriptor,
)
self.execute_cypher(statement)
self.hierarchy.add_token_properties(
self, annotation_type, [("relativized_{}".format(property_name), float)]
)
self.encode_hierarchy()