from polyglotdb.corpus.base import BaseContext
from polyglotdb.query import value_for_cypher
from polyglotdb.query.annotations.query import SplitQuery
from polyglotdb.query.metadata.query import MetaDataQuery
from polyglotdb.structure import Hierarchy
def generate_cypher_property_list(property_set):
"""
Generates a Cypher claus for setting properties
Parameters
----------
property_set : list
List of tuples of form (`property_name`, `property_value`)
Returns
-------
str
Cypher string for setting properties
"""
props = []
for name, t in property_set:
if name == "id":
continue
v = ""
if t == int:
v = 0
elif t == float:
v = 0.0
elif t in (list, tuple, set):
v = []
props.append(f"{name}: {value_for_cypher(v)}")
return ", ".join(props)
[docs]
class StructuredContext(BaseContext):
"""
Class that contains methods for dealing specifically with metadata for the corpus
"""
def generate_hierarchy(self):
"""
Get hierarchy schema information from the Neo4j database
Returns
-------
:class:`~polyglotdb.structure.Hierarchy`
the structure of the corpus
"""
hierarchy_statement = """MATCH
path = (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(nt),
(c)-[:spoken_by]->(s:Speaker),
(c)-[:spoken_in]->(d:Discourse)
where c.name = $corpus_name
WITH c, n, nt, path, s, d
OPTIONAL MATCH (n)<-[:annotates]-(subs)
return c, n, labels(n) as neo4j_labels, nt, path, collect(subs) as subs, s, d
order by size(nodes(path))"""
results = self.execute_cypher(hierarchy_statement, corpus_name=self.corpus_name)
sup = None
data = {}
subs = {}
token_properties = {}
type_properties = {}
type_subsets = {}
token_subsets = {}
speaker_properties = set()
discourse_properties = set()
acoustics = set()
for r in results:
if not acoustics:
if r["c"].get("pitch", False):
acoustics.add("pitch")
if r["c"].get("formants", False):
acoustics.add("formants")
if r["c"].get("intensity", False):
acoustics.add("intensity")
if not speaker_properties:
for k, v in r["s"].items():
speaker_properties.add((k, type(v)))
if not discourse_properties:
for k, v in r["d"].items():
discourse_properties.add((k, type(v)))
at = list(r["neo4j_labels"])[0]
data[at] = sup
sup = at
if r["subs"] is not None:
subs[at] = {x["type"] for x in r["subs"]}
token_subsets[at] = set()
type_subsets[at] = set()
token_properties[at] = {("id", type(""))}
type_properties[at] = set()
for k, v in r["n"].items():
if k == "subsets":
token_subsets[at].update(v)
else:
token_properties[at].add((k, type(v)))
for k, v in r["nt"].items():
if k == "subsets":
type_subsets[at].update(v)
else:
type_properties[at].add((k, type(v)))
h = Hierarchy(data)
h.subannotations = subs
h.subset_types = type_subsets
h.token_properties = token_properties
h.subset_tokens = token_subsets
h.type_properties = type_properties
h.speaker_properties = speaker_properties
h.discourse_properties = discourse_properties
h.corpus_name = self.corpus_name
return h
def query_metadata(self, annotation):
"""
Start a query over metadata
Parameters
----------
annotation : :class:`~polyglotdb.query.base.attributes.Node`
Returns
-------
:class:`~polyglotdb.query.metadata.query.MetaDataQuery`
MetaDataQuery object
"""
return MetaDataQuery(self, annotation)
def refresh_hierarchy(self):
"""
Save the Neo4j database schema to the disk
"""
h = self.generate_hierarchy()
h.corpus_name = self.corpus_name
self.hierarchy = h
self.cache_hierarchy()
def reset_hierarchy(self):
"""
Delete the Hierarchy schema in the Neo4j database
"""
self.execute_cypher(
"""MATCH (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(t),
(c)-[:spoken_by]->(s:Speaker),
(c)-[:spoken_in]->(d:Discourse)
WHERE c.name = $corpus
WITH n, t, c, s, d
OPTIONAL MATCH (t)<-[:annotates]-(a)
WITH n, t, c, s, d, a
OPTIONAL MATCH (c)-[:has_acoustics]->(ac)
DETACH DELETE a, t, n, s, d, ac""",
corpus=self.corpus_name,
)
def encode_hierarchy(self):
"""
Sync the current Hierarchy to the Neo4j database and to the disk
"""
self.reset_hierarchy()
hierarchy_template = """({super})<-[:contained_by]-({sub})-[:is_a]->({sub_type})"""
subannotation_template = """({super})<-[:annotates]-({sub})"""
speaker_template = """(c)-[:spoken_by]->(s:Speaker {%s})"""
discourse_template = """(c)-[:spoken_in]->(d:Discourse {%s})"""
acoustic_template = """(c)-[:has_acoustics]->(%s:%s {%s})"""
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
with c
MERGE {merge_statement}"""
merge_statements = []
speaker_props = generate_cypher_property_list(self.hierarchy.speaker_properties)
discourse_props = generate_cypher_property_list(self.hierarchy.discourse_properties)
for a in self.hierarchy.acoustics:
acoustic_props = generate_cypher_property_list(self.hierarchy.acoustic_properties[a])
merge_statements.append(acoustic_template % (a, a, acoustic_props))
merge_statements.append(speaker_template % speaker_props)
merge_statements.append(discourse_template % discourse_props)
for at in self.hierarchy.highest_to_lowest:
sup = self.hierarchy[at]
if sup is None:
sup = "c"
else:
sup = "{}".format(sup)
try:
if ("duration", float) not in self.hierarchy.token_properties[at]:
self.hierarchy.token_properties[at].add(("duration", float))
token_props = generate_cypher_property_list(self.hierarchy.token_properties[at])
if token_props:
token_props = ", " + token_props
token_props += ", duration: 0.0"
except KeyError:
token_props = ""
try:
type_props = generate_cypher_property_list(self.hierarchy.type_properties[at])
if type_props:
type_props = ", " + type_props
else:
type_props = ""
except KeyError:
type_props = ""
try:
type_subsets = sorted(self.hierarchy.subset_types[at])
except KeyError:
type_subsets = []
try:
token_subsets = sorted(self.hierarchy.subset_tokens[at])
except KeyError:
token_subsets = []
try:
subannotations = sorted(self.hierarchy.subannotations[at])
except KeyError:
subannotations = []
sub = "{0}:{0} {{label: '', subsets: {2}, begin:0, end: 0{1}}}".format(
at, token_props, token_subsets
)
sub_type = "{0}_type:{0}_type {{label: '', subsets: {2}{1}}}".format(
at, type_props, type_subsets
)
merge_statements.append(
hierarchy_template.format(super=sup, sub=sub, sub_type=sub_type)
)
for sa in subannotations:
sa = "{0}:{0} {{label: '', begin:0, type: '{0}', end: 0}}".format(sa)
merge_statements.append(subannotation_template.format(super=at, sub=sa))
statement = statement.format(merge_statement="\nMERGE ".join(merge_statements))
self.execute_cypher(statement, corpus_name=self.corpus_name)
self.cache_hierarchy()
def encode_position(self, higher_annotation_type, lower_annotation_type, name, subset=None):
"""
Encodes position of lower type in higher type
Parameters
----------
higher_annotation_type : str
what the higher annotation is (utterance, word)
lower_annotation_type : str
what the lower annotation is (word, phone, syllable)
name : str
the column name
subset : str
the annotation subset
"""
lower = getattr(self, lower_annotation_type)
if subset is not None:
lower = lower.filter_by_subset(subset)
higher = getattr(getattr(lower, higher_annotation_type), lower_annotation_type)
if subset is not None:
higher = higher.filter_by_subset(subset)
q = SplitQuery(self, lower)
q.splitter = "discourse"
q.cache(higher.position.column_name(name))
self.hierarchy.add_token_properties(self, lower_annotation_type, [(name, float)])
self.encode_hierarchy()
def encode_rate(self, higher_annotation_type, lower_annotation_type, name, subset=None):
"""
Encodes the rate of the lower type in the higher type
Parameters
----------
higher_annotation_type : str
what the higher annotation is (utterance, word)
lower_annotation_type : str
what the lower annotation is (word, phone, syllable)
name : str
the column name
subset : str
the annotation subset
"""
higher = getattr(self, higher_annotation_type)
lower = getattr(higher, lower_annotation_type)
if subset is not None:
lower = lower.filter_by_subset(subset)
q = SplitQuery(self, higher)
q.splitter = "discourse"
q.cache(lower.rate.column_name(name))
self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)])
self.encode_hierarchy()
def encode_count(self, higher_annotation_type, lower_annotation_type, name, subset=None):
"""
Encodes the rate of the lower type in the higher type
Parameters
----------
higher_annotation_type : str
what the higher annotation is (utterance, word)
lower_annotation_type : str
what the lower annotation is (word, phone, syllable)
name : str
the column name
subset : str
the annotation subset
"""
higher = getattr(self, higher_annotation_type)
lower = getattr(higher, lower_annotation_type)
if subset is not None:
lower = lower.filter_by_subset(subset)
q = SplitQuery(self, higher)
q.splitter = "discourse"
q.cache(lower.count.column_name(name))
self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)])
self.encode_hierarchy()
def reset_property(self, annotation_type, name):
"""
Removes property from hierarchy
Parameters
----------
annotation_type : str
what is being removed
name : str
the column name
"""
q = self.query_graph(getattr(self, annotation_type))
q.set_properties(**{name: None})
self.hierarchy.remove_token_properties(self, annotation_type, [name])
self.encode_hierarchy()