Source code for polyglotdb.corpus.structured


from ..query import value_for_cypher
from ..query.annotations.query import SplitQuery
from ..query.metadata.query import MetaDataQuery
from ..structure import Hierarchy
from .base import BaseContext


def generate_cypher_property_list(property_set):
    """
    Generates a Cypher claus for setting properties

    Parameters
    ----------
    property_set : list
        List of tuples of form (`property_name`, `property_value`)

    Returns
    -------
    str
        Cypher string for setting properties
    """
    props = []
    for name, t in property_set:
        if name == 'id':
            continue
        v = ''
        if t == int:
            v = 0
        elif t == float:
            v = 0.0
        elif t in (list, tuple, set):
            v = []
        props.append('{}: {}'.format(name, value_for_cypher(v)))
    return ', '.join(props)


[docs] class StructuredContext(BaseContext): """ Class that contains methods for dealing specifically with metadata for the corpus """ def generate_hierarchy(self): """ Get hierarchy schema information from the Neo4j database Returns ------- :class:`~polyglotdb.structure.Hierarchy` the structure of the corpus """ hierarchy_statement = '''MATCH path = (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(nt), (c)-[:spoken_by]->(s:Speaker), (c)-[:spoken_in]->(d:Discourse) where c.name = $corpus_name WITH c, n, nt, path, s, d OPTIONAL MATCH (n)<-[:annotates]-(subs) return c, n, labels(n) as neo4j_labels, nt, path, collect(subs) as subs, s, d order by size(nodes(path))''' results = self.execute_cypher(hierarchy_statement, corpus_name=self.corpus_name) sup = None data = {} subs = {} token_properties = {} type_properties = {} type_subsets = {} token_subsets = {} speaker_properties = set() discourse_properties = set() acoustics = set() for r in results: if not acoustics: if r['c'].get('pitch', False): acoustics.add('pitch') if r['c'].get('formants', False): acoustics.add('formants') if r['c'].get('intensity', False): acoustics.add('intensity') if not speaker_properties: for k, v in r['s'].items(): speaker_properties.add((k, type(v))) if not discourse_properties: for k, v in r['d'].items(): discourse_properties.add((k, type(v))) at = list(r['neo4j_labels'])[0] data[at] = sup sup = at if r['subs'] is not None: subs[at] = set([x['type'] for x in r['subs']]) token_subsets[at] = set() type_subsets[at] = set() token_properties[at] = set([('id', type(''))]) type_properties[at] = set() for k, v in r['n'].items(): if k == 'subsets': token_subsets[at].update(v) else: token_properties[at].add((k, type(v))) for k, v in r['nt'].items(): if k == 'subsets': type_subsets[at].update(v) else: type_properties[at].add((k, type(v))) h = Hierarchy(data) h.subannotations = subs h.subset_types = type_subsets h.token_properties = token_properties h.subset_tokens = token_subsets h.type_properties = type_properties h.speaker_properties = speaker_properties h.discourse_properties = discourse_properties h.corpus_name = self.corpus_name return h def query_metadata(self, annotation): """ Start a query over metadata Parameters ---------- annotation : :class:`~polyglotdb.query.base.attributes.Node` Returns ------- :class:`~polyglotdb.query.metadata.query.MetaDataQuery` MetaDataQuery object """ return MetaDataQuery(self, annotation) def refresh_hierarchy(self): """ Save the Neo4j database schema to the disk """ h = self.generate_hierarchy() h.corpus_name = self.corpus_name self.hierarchy = h self.cache_hierarchy() def reset_hierarchy(self): """ Delete the Hierarchy schema in the Neo4j database """ self.execute_cypher('''MATCH (c:Corpus)<-[:contained_by*]-(n)-[:is_a]->(t), (c)-[:spoken_by]->(s:Speaker), (c)-[:spoken_in]->(d:Discourse) WHERE c.name = $corpus WITH n, t, c, s, d OPTIONAL MATCH (t)<-[:annotates]-(a) WITH n, t, c, s, d, a OPTIONAL MATCH (c)-[:has_acoustics]->(ac) DETACH DELETE a, t, n, s, d, ac''', corpus=self.corpus_name) def encode_hierarchy(self): """ Sync the current Hierarchy to the Neo4j database and to the disk """ self.reset_hierarchy() hierarchy_template = '''({super})<-[:contained_by]-({sub})-[:is_a]->({sub_type})''' subannotation_template = '''({super})<-[:annotates]-({sub})''' speaker_template = '''(c)-[:spoken_by]->(s:Speaker {%s})''' discourse_template = '''(c)-[:spoken_in]->(d:Discourse {%s})''' acoustic_template = '''(c)-[:has_acoustics]->(%s:%s {%s})''' statement = '''MATCH (c:Corpus) WHERE c.name = $corpus_name with c MERGE {merge_statement}''' merge_statements = [] speaker_props = generate_cypher_property_list(self.hierarchy.speaker_properties) discourse_props = generate_cypher_property_list(self.hierarchy.discourse_properties) for a in self.hierarchy.acoustics: acoustic_props = generate_cypher_property_list(self.hierarchy.acoustic_properties[a]) merge_statements.append(acoustic_template % (a, a, acoustic_props)) merge_statements.append(speaker_template % speaker_props) merge_statements.append(discourse_template % discourse_props) for at in self.hierarchy.highest_to_lowest: sup = self.hierarchy[at] if sup is None: sup = 'c' else: sup = '{}'.format(sup) try: if ('duration', float) not in self.hierarchy.token_properties[at]: self.hierarchy.token_properties[at].add(('duration', float)) token_props = generate_cypher_property_list(self.hierarchy.token_properties[at]) if token_props: token_props = ', ' + token_props token_props += ', duration: 0.0' except KeyError: token_props = '' try: type_props = generate_cypher_property_list(self.hierarchy.type_properties[at]) if type_props: type_props = ', ' + type_props else: type_props = '' except KeyError: type_props = '' try: type_subsets = sorted(self.hierarchy.subset_types[at]) except KeyError: type_subsets = [] try: token_subsets = sorted(self.hierarchy.subset_tokens[at]) except KeyError: token_subsets = [] try: subannotations = sorted(self.hierarchy.subannotations[at]) except KeyError: subannotations = [] sub = "{0}:{0} {{label: '', subsets: {2}, begin:0, end: 0{1}}}".format(at, token_props, token_subsets) sub_type = "{0}_type:{0}_type {{label: '', subsets: {2}{1}}}".format(at, type_props, type_subsets) merge_statements.append(hierarchy_template.format(super=sup, sub=sub, sub_type=sub_type)) for sa in subannotations: sa = "{0}:{0} {{label: '', begin:0, type: '{0}', end: 0}}".format(sa) merge_statements.append(subannotation_template.format(super=at, sub=sa)) statement = statement.format(merge_statement='\nMERGE '.join(merge_statements)) self.execute_cypher(statement, corpus_name=self.corpus_name) self.cache_hierarchy() def encode_position(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes position of lower type in higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ lower = getattr(self, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) higher = getattr(getattr(lower, higher_annotation_type), lower_annotation_type) if subset is not None: higher = higher.filter_by_subset(subset) q = SplitQuery(self, lower) q.splitter = 'discourse' q.cache(higher.position.column_name(name)) self.hierarchy.add_token_properties(self, lower_annotation_type, [(name, float)]) self.encode_hierarchy() def encode_rate(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes the rate of the lower type in the higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ higher = getattr(self, higher_annotation_type) lower = getattr(higher, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) q = SplitQuery(self, higher) q.splitter = 'discourse' q.cache(lower.rate.column_name(name)) self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)]) self.encode_hierarchy() def encode_count(self, higher_annotation_type, lower_annotation_type, name, subset=None): """ Encodes the rate of the lower type in the higher type Parameters ---------- higher_annotation_type : str what the higher annotation is (utterance, word) lower_annotation_type : str what the lower annotation is (word, phone, syllable) name : str the column name subset : str the annotation subset """ higher = getattr(self, higher_annotation_type) lower = getattr(higher, lower_annotation_type) if subset is not None: lower = lower.filter_by_subset(subset) q = SplitQuery(self, higher) q.splitter = 'discourse' q.cache(lower.count.column_name(name)) self.hierarchy.add_token_properties(self, higher_annotation_type, [(name, float)]) self.encode_hierarchy() def reset_property(self, annotation_type, name): """ Removes property from hierarchy Parameters ---------- annotation_type : str what is being removed name : str the column name """ q = self.query_graph(getattr(self, annotation_type)) q.set_properties(**{name: None}) self.hierarchy.remove_token_properties(self, annotation_type, [name]) self.encode_hierarchy()