Source code for polyglotdb.corpus.structured

from polyglotdb.corpus.base import BaseContext
from polyglotdb.query import value_for_cypher
from polyglotdb.query.annotations.query import SplitQuery
from polyglotdb.query.metadata.query import MetaDataQuery
from polyglotdb.structure import Hierarchy


def generate_cypher_property_list(property_set):
    """
    Generates a Cypher claus for setting properties

    Parameters
    ----------
    property_set : list
        List of tuples of form (`property_name`, `property_value`)

    Returns
    -------
    str
        Cypher string for setting properties
    """
    props = []
    for name, t in property_set:
        if name == "id":
            continue
        v = ""
        if t == int:
            v = 0
        elif t == float:
            v = 0.0
        elif t in (list, tuple, set):
            v = []
        props.append(f"{name}: {value_for_cypher(v)}")
    return ", ".join(props)


[docs] class StructuredContext(BaseContext): """ Class that contains methods for dealing specifically with metadata for the corpus """ def generate_hierarchy(self): """ Get hierarchy schema information from the Neo4j database Returns ------- :class:`~polyglotdb.structure.Hierarchy` the structure of the corpus """ hierarchy_statement = """MATCH path = (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(nt), (c)-[:spoken_by]->(s:Speaker), (c)-[:spoken_in]->(d:Discourse) where c.name = $corpus_name WITH c, n, nt, path, s, d OPTIONAL MATCH (n)<-[:annotates]-(subs) return c, n, labels(n) as neo4j_labels, nt, path, collect(subs) as subs, s, d order by size(nodes(path))""" results = self.execute_cypher(hierarchy_statement, corpus_name=self.corpus_name) sup = None data = {} subs = {} token_properties = {} type_properties = {} type_subsets = {} token_subsets = {} speaker_properties = set() discourse_properties = set() acoustics = set() for r in results: if not acoustics: if r["c"].get("pitch", False): acoustics.add("pitch") if r["c"].get("formants", False): acoustics.add("formants") if r["c"].get("intensity", False): acoustics.add("intensity") if not speaker_properties: for k, v in r["s"].items(): speaker_properties.add((k, type(v))) if not discourse_properties: for k, v in r["d"].items(): discourse_properties.add((k, type(v))) at = list(r["neo4j_labels"])[0] data[at] = sup sup = at if r["subs"] is not None: subs[at] = {x["type"] for x in r["subs"]} token_subsets[at] = set() type_subsets[at] = set() token_properties[at] = {("id", type(""))} type_properties[at] = set() for k, v in r["n"].items(): if k == "subsets": token_subsets[at].update(v) else: token_properties[at].add((k, type(v))) for k, v in r["nt"].items(): if k == "subsets": type_subsets[at].update(v) else: type_properties[at].add((k, type(v))) h = Hierarchy(data) h.subannotations = subs h.subset_types = type_subsets h.token_properties = token_properties h.subset_tokens = token_subsets h.type_properties = type_properties h.speaker_properties = speaker_properties h.discourse_properties = discourse_properties h.corpus_name = self.corpus_name return h def query_metadata(self, annotation): """ Start a query over metadata Parameters ---------- annotation : :class:`~polyglotdb.query.base.attributes.Node` Returns ------- :class:`~polyglotdb.query.metadata.query.MetaDataQuery` MetaDataQuery object """ return MetaDataQuery(self, annotation) def refresh_hierarchy(self): """ Save the Neo4j database schema to the disk """ h = self.generate_hierarchy() h.corpus_name = self.corpus_name self.hierarchy = h self.cache_hierarchy() def reset_hierarchy(self): """ Delete the Hierarchy schema in the Neo4j database """ self.execute_cypher( """MATCH (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(t), (c)-[:spoken_by]->(s:Speaker), (c)-[:spoken_in]->(d:Discourse) WHERE c.name = $corpus WITH n, t, c, s, d OPTIONAL MATCH (t)<-[:annotates]-(a) WITH n, t, c, s, d, a OPTIONAL MATCH (c)-[:has_acoustics]->(ac) DETACH DELETE a, t, n, s, d, ac""", corpus=self.corpus_name, ) def encode_hierarchy(self): """ Sync the current Hierarchy to the Neo4j database and to the disk """ self.reset_hierarchy() hierarchy_template = """({super})<-[:contained_by]-({sub})-[:is_a]->({sub_type})""" subannotation_template = """({super})<-[:annotates]-({sub})""" speaker_template = """(c)-[:spoken_by]->(s:Speaker {%s})""" discourse_template = """(c)-[:spoken_in]->(d:Discourse {%s})""" acoustic_template = """(c)-[:has_acoustics]->(%s:%s {%s})""" statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name with c MERGE {merge_statement}""" merge_statements = [] speaker_props = generate_cypher_property_list(self.hierarchy.speaker_properties) discourse_props = generate_cypher_property_list(self.hierarchy.discourse_properties) for a in self.hierarchy.acoustics: acoustic_props = generate_cypher_property_list(self.hierarchy.acoustic_properties[a]) merge_statements.append(acoustic_template % (a, a, acoustic_props)) merge_statements.append(speaker_template % speaker_props) merge_statements.append(discourse_template % discourse_props) for at in self.hierarchy.highest_to_lowest: sup = self.hierarchy[at] if sup is None: sup = "c" else: sup = "{}".format(sup) try: if ("duration", float) not in self.hierarchy.token_properties[at]: self.hierarchy.token_properties[at].add(("duration", float)) token_props = generate_cypher_property_list(self.hierarchy.token_properties[at]) if token_props: token_props = ", " + token_props token_props += ", duration: 0.0" except KeyError: token_props = "" try: type_props = generate_cypher_property_list(self.hierarchy.type_properties[at]) if type_props: type_props = ", " + type_props else: type_props = "" except KeyError: type_props = "" try: type_subsets = sorted(self.hierarchy.subset_types[at]) except KeyError: type_subsets = [] try: token_subsets = sorted(self.hierarchy.subset_tokens[at]) except KeyError: token_subsets = [] try: subannotations = sorted(self.hierarchy.subannotations[at]) except KeyError: subannotations = [] sub = "{0}:{0} {{label: '', subsets: {2}, begin:0, end: 0{1}}}".format( at, token_props, token_subsets ) sub_type = "{0}_type:{0}_type {{label: '', subsets: {2}{1}}}".format( at, type_props, type_subsets ) merge_statements.append( hierarchy_template.format(super=sup, sub=sub, sub_type=sub_type) ) for sa in subannotations: sa = "{0}:{0} {{label: '', begin:0, type: '{0}', end: 0}}".format(sa) merge_statements.append(subannotation_template.format(super=at, sub=sa)) statement = statement.format(merge_statement="\nMERGE ".join(merge_statements)) self.execute_cypher(statement, corpus_name=self.corpus_name) self.cache_hierarchy() def encode_position(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes position of lower type in higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ lower = getattr(self, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) higher = getattr(getattr(lower, higher_annotation_type), lower_annotation_type) if subset is not None: higher = higher.filter_by_subset(subset) q = SplitQuery(self, lower) q.splitter = "discourse" q.cache(higher.position.column_name(name)) self.hierarchy.add_token_properties(self, lower_annotation_type, [(name, float)]) self.encode_hierarchy() def encode_rate(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes the rate of the lower type in the higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ higher = getattr(self, higher_annotation_type) lower = getattr(higher, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) q = SplitQuery(self, higher) q.splitter = "discourse" q.cache(lower.rate.column_name(name)) self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)]) self.encode_hierarchy() def encode_count(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes the rate of the lower type in the higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ higher = getattr(self, higher_annotation_type) lower = getattr(higher, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) q = SplitQuery(self, higher) q.splitter = "discourse" q.cache(lower.count.column_name(name)) self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)]) self.encode_hierarchy() def reset_property(self, annotation_type, name): """ Removes property from hierarchy Parameters ---------- annotation_type : str what is being removed name : str the column name """ q = self.query_graph(getattr(self, annotation_type)) q.set_properties(**{name: None}) self.hierarchy.remove_token_properties(self, annotation_type, [name]) self.encode_hierarchy()