Source code for polyglotdb.structure

from polyglotdb.exceptions import GraphQueryError, HierarchyError
from polyglotdb.query.annotations.attributes import AnnotationNode, PauseAnnotation


[docs] class Hierarchy(object): """ Class containing information about how a corpus is structured. Hierarchical data is stored in the form of a dictionary with keys for linguistic types, and values for the linguistic type that contains them. If no other type contains a given type, its value is ``None``. Subannotation data is stored in the form of a dictionary with keys for linguistic types, and values of sets of types of subannotations. Parameters ---------- data : dict Information about the hierarchy of linguistic types corpus_name : str Name of the corpus """ get_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type) RETURN n.subsets as subsets""" set_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type) SET n.subsets = $subsets""" get_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(n:{type}) RETURN n.subsets as subsets""" set_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(n:{type}) SET n.subsets = $subsets""" def __init__(self, data=None, corpus_name=None): if data is None: data = {} self._data = data self.corpus_name = corpus_name self.subannotations = {} self.subannotation_properties = {} self.subset_types = {} self.token_properties = {} self.subset_tokens = {} self.type_properties = {} self.acoustic_properties = {} self.speaker_properties = {("name", str)} self.discourse_properties = { ("name", str), ("file_path", str), ("low_freq_file_path", str), ("vowel_file_path", str), ("consonant_file_path", str), ("duration", float), ("sampling_rate", int), ("num_channels", int), } def __getattr__(self, key): if key == "pause": return PauseAnnotation(corpus=self.corpus_name, hierarchy=self) if key + "s" in self.annotation_types: key += "s" # FIXME if key in self.annotation_types: return AnnotationNode(key, corpus=self.corpus_name, hierarchy=self) raise ( GraphQueryError( "The graph does not have any annotations of type '{}'. Possible types are: {}".format( key, ", ".join(sorted(self.annotation_types)) ) ) ) def __getstate__(self): return self.to_json() def __setstate__(self, state): self.from_json(state) def __str__(self): return str(self.to_json()) @property def annotation_types(self): """ Get a list of all the annotation types in the hierarchy Returns ------- list All annotation types in the hierarchy """ return list(self._data.keys()) @property def acoustics(self): """ Get all currently encoded acoustic measurements in the corpus Returns ------- list All encoded acoustic measures """ return sorted(self.acoustic_properties.keys()) def to_json(self): """ Convert the Hierarchy object to a dictionary for JSON serialization Returns ------- dict All necessary information for the Hierarchy object """ data = {"_data": self._data} data["corpus_name"] = self.corpus_name data["acoustic_properties"] = { k: sorted((name, t()) for name, t in v) for k, v in self.acoustic_properties.items() } data["subannotations"] = {k: sorted(v) for k, v in self.subannotations.items()} data["subannotation_properties"] = { k: sorted((name, t()) for name, t in v) for k, v in self.subannotation_properties.items() } data["subset_types"] = {k: sorted(v) for k, v in self.subset_types.items()} data["subset_tokens"] = {k: sorted(v) for k, v in self.subset_tokens.items()} data["token_properties"] = { k: sorted((name, t()) for name, t in v) for k, v in self.token_properties.items() } data["type_properties"] = { k: sorted((name, t()) for name, t in v) for k, v in self.type_properties.items() } data["speaker_properties"] = sorted((name, t()) for name, t in self.speaker_properties) data["discourse_properties"] = sorted((name, t()) for name, t in self.discourse_properties) return data def from_json(self, json): """ Set all properties from a dictionary deserialized from JSON Parameters ---------- json : dict Object information """ self._data = json["_data"] self.corpus_name = json["corpus_name"] self.acoustic_properties = { k: set((name, type(t)) for name, t in v) for k, v in json.get("acoustic_properties", {}).items() } self.subannotations = {k: set(v) for k, v in json["subannotations"].items()} self.subannotation_properties = { k: set((name, type(t)) for name, t in v) for k, v in json["subannotation_properties"].items() } self.subset_types = {k: set(v) for k, v in json["subset_types"].items()} self.subset_tokens = {k: set(v) for k, v in json["subset_tokens"].items()} self.token_properties = { k: set((name, type(t)) for name, t in v) for k, v in json["token_properties"].items() } self.type_properties = { k: set((name, type(t)) for name, t in v) for k, v in json["type_properties"].items() } self.speaker_properties = set((name, type(t)) for name, t in json["speaker_properties"]) self.discourse_properties = set( (name, type(t)) for name, t in json["discourse_properties"] ) def add_type_subsets(self, corpus_context, annotation_type, subsets): """ Adds type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type: str Annotation type to add subsets for subsets : iterable List of subsets to add for the annotation type """ statement = self.get_type_subset_template.format(type=annotation_type) res = list( corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) ) try: cur_subsets = res[0]["subsets"] except (IndexError, AttributeError): cur_subsets = [] updated = set(cur_subsets + subsets) statement = self.set_type_subset_template.format(type=annotation_type) corpus_context.execute_cypher( statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name ) self.subset_types[annotation_type] = updated corpus_context.cache_hierarchy() def remove_type_subsets(self, corpus_context, annotation_type, subsets): """ Removes type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type: str Annotation type to remove subsets for subsets : iterable List of subsets to remove for the annotation type """ statement = self.get_type_subset_template.format(type=annotation_type) res = list( corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) ) try: cur_subsets = res[0]["subsets"] except (IndexError, AttributeError): cur_subsets = [] updated = set(cur_subsets) - set(subsets) statement = self.set_type_subset_template.format(type=annotation_type) corpus_context.execute_cypher( statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name ) self.subset_types[annotation_type] = updated corpus_context.cache_hierarchy() def add_token_subsets(self, corpus_context, annotation_type, subsets): """ Adds token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type: str Annotation type to add subsets for subsets : iterable List of subsets to add for the annotation tokens """ statement = self.get_token_subset_template.format(type=annotation_type) res = list( corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) ) try: cur_subsets = res[0]["subsets"] except (IndexError, AttributeError): cur_subsets = [] updated = set(cur_subsets + subsets) statement = self.set_token_subset_template.format(type=annotation_type) corpus_context.execute_cypher( statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name ) self.subset_tokens[annotation_type] = updated corpus_context.cache_hierarchy() def remove_token_subsets(self, corpus_context, annotation_type, subsets): """ Removes token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type: str Annotation type to remove subsets for subsets : iterable List of subsets to remove for the annotation tokens """ statement = self.get_token_subset_template.format(type=annotation_type) res = list( corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) ) try: cur_subsets = res[0]["subsets"] except (IndexError, AttributeError): cur_subsets = [] updated = set(cur_subsets) - set(subsets) statement = self.set_token_subset_template.format(type=annotation_type) corpus_context.execute_cypher( statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name ) self.subset_tokens[annotation_type] = updated corpus_context.cache_hierarchy() def add_annotation_type(self, annotation_type, above=None, below=None): """ Adds an annotation type to the Hierarchy object along with default type and token properties for the new annotation type Parameters ---------- annotation_type : str Annotation type to add above : str Annotation type that is contained by the new annotation type, leave out if new annotation type is at the bottom of the hierarchy below : str Annotation type that contains the new annotation type, leave out if new annotation type is at the top of the hierarchy """ self._data[above] = annotation_type self._data[annotation_type] = below self.token_properties[annotation_type] = { ("id", str), ("label", str), ("begin", float), ("end", float), ("duration", float), } self.type_properties[annotation_type] = {("label", str)} def remove_annotation_type(self, annotation_type): """ Removes an annotation type from the hierarchy Parameters ---------- annotation_type : str Annotation type to remove """ cur_above = self._data[annotation_type] cur_below = [k for k, v in self._data.items() if v == annotation_type][0] del self._data[annotation_type] self._data[cur_below] = cur_above try: del self.token_properties[annotation_type] except KeyError: pass try: del self.type_properties[annotation_type] except KeyError: pass try: del self.subset_types[annotation_type] except KeyError: pass try: del self.subset_tokens[annotation_type] except KeyError: pass if annotation_type in self.subannotations: for s in self.subannotations[annotation_type]: del self.subannotation_properties[s] del self.subannotations[annotation_type] def add_type_properties(self, corpus_context, annotation_type, properties): """ Adds type properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type : str Annotation type to add type properties for properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "n.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type) SET {sets}""".format( type=annotation_type, sets=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) if annotation_type not in self.type_properties: self.type_properties[annotation_type] = {("id", str)} self.type_properties[annotation_type].update(k for k in properties) corpus_context.cache_hierarchy() def remove_type_properties(self, corpus_context, annotation_type, properties): """ Removes type properties for an annotation type and syncs it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type : str Annotation type to remove type properties for properties : iterable List of property names to remove """ remove_template = "n.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type) REMOVE {removes}""".format( type=annotation_type, removes=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) if annotation_type not in self.type_properties: self.type_properties[annotation_type] = {("id", str)} to_remove = set(x for x in self.type_properties[annotation_type] if x[0] in properties) self.type_properties[annotation_type].difference_update(to_remove) corpus_context.cache_hierarchy() def add_acoustic_properties(self, corpus_context, acoustic_type, properties): """ Add acoustic properties to an encoded acoustic measure. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database acoustic_type : str Acoustic measure to add properties for properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "n.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:has_acoustics]->(n:{type}) SET {sets}""".format( type=acoustic_type, sets=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) if acoustic_type not in self.acoustic_properties: self.acoustic_properties[acoustic_type] = set() self.acoustic_properties[acoustic_type].update(k for k in properties) corpus_context.cache_hierarchy() def remove_acoustic_properties(self, corpus_context, acoustic_type, properties): """ Remove acoustic properties to an encoded acoustic measure. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database acoustic_type : str Acoustic measure to remove properties for properties : iterable List of property names """ remove_template = "n.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:has_acoustics]->(n:{type}) REMOVE {removes}""".format( type=acoustic_type, removes=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) if acoustic_type not in self.acoustic_properties: self.acoustic_properties[acoustic_type] = {} to_remove = set(x for x in self.acoustic_properties[acoustic_type] if x[0] in properties) self.acoustic_properties[acoustic_type].difference_update(to_remove) corpus_context.cache_hierarchy() def add_token_properties(self, corpus_context, annotation_type, properties): """ Adds token properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type : str Annotation type to add token properties for properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "n.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(n:{type}) SET {sets}""".format( type=annotation_type, sets=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) if annotation_type not in self.token_properties: self.token_properties[annotation_type] = {("id", str)} self.token_properties[annotation_type].update(k for k in properties) corpus_context.cache_hierarchy() def remove_token_properties(self, corpus_context, annotation_type, properties): """ Removes token properties for an annotation type and syncs it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type : str Annotation type to remove token properties for properties : iterable List of property names to remove """ remove_template = "n.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(n:{type}) REMOVE {removes}""".format( type=annotation_type, removes=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) if annotation_type not in self.token_properties: self.token_properties[annotation_type] = {("id", str)} to_remove = set(x for x in self.token_properties[annotation_type] if x[0] in properties) self.token_properties[annotation_type].difference_update(to_remove) corpus_context.cache_hierarchy() def add_speaker_properties(self, corpus_context, properties): """ Adds speaker properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "s.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:spoken_by]->(s:Speaker) SET {sets}""".format( sets=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) to_add_names = [x[0] for x in properties] self.speaker_properties = {x for x in self.speaker_properties if x[0] not in to_add_names} self.speaker_properties.update(k for k in properties) corpus_context.cache_hierarchy() def remove_speaker_properties(self, corpus_context, properties): """ Removes speaker properties and syncs it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database properties : iterable List of property names to remove """ remove_template = "s.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:spoken_by]->(s:Speaker) REMOVE {removes}""".format( removes=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) to_remove = set(x for x in self.speaker_properties if x[0] in properties) self.speaker_properties.difference_update(to_remove) corpus_context.cache_hierarchy() def add_discourse_properties(self, corpus_context, properties): """ Adds discourse properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "d.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:spoken_in]->(d:Discourse) SET {sets}""".format( sets=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) to_add_names = [x[0] for x in properties] self.discourse_properties = { x for x in self.discourse_properties if x[0] not in to_add_names } self.discourse_properties.update(k for k in properties) corpus_context.cache_hierarchy() def remove_discourse_properties(self, corpus_context, properties): """ Removes discourse properties and syncs it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database properties : iterable List of property names to remove """ remove_template = "d.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)-[:spoken_in]->(d:Discourse) REMOVE {removes}""".format( removes=", ".join(ps) ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) to_remove = set(x for x in self.discourse_properties if x[0] in properties) self.discourse_properties.difference_update(to_remove) corpus_context.cache_hierarchy() def keys(self): """ Keys (linguistic types) of the hierarchy. Returns ------- generator Keys of the hierarchy """ return self._data.keys() def values(self): """ Values (containing types) of the hierarchy. Returns ------- generator Values of the hierarchy """ return self._data.values() def items(self): """ Key/value pairs for the hierarchy. Returns ------- generator Items of the hierarchy """ return self._data.items() def __getitem__(self, key): return self._data[key] def __setitem__(self, key, value): self._data[key] = value def __delitem__(self, key): del self._data[key] for k, v in self._data.items(): if v == key: self._data[k] = None def __contains__(self, item): return item in self._data def update(self, other): """ Merge Hierarchies together. If other is a dictionary, then only the hierarchical data is updated. Parameters ---------- other : Hierarchy or dict Data to be merged in """ if isinstance(other, dict): self._data.update(other) else: self._data.update(other._data) self.subannotations.update(other.subannotations) self.subannotation_properties.update(other.subannotation_properties) for k, v in other.subannotation_properties.items(): if k not in self.subannotation_properties: self.subannotation_properties[k] = v else: self.subannotation_properties[k] = self.subannotation_properties[k] & v for k, v in other.type_properties.items(): if k not in self.type_properties.items(): self.type_properties[k] = v else: self.type_properties[k] = self.type_properties[k] & v for k, v in other.token_properties.items(): if k not in self.token_properties.items(): self.token_properties[k] = other.token_properties[k] else: self.token_properties[k] = self.token_properties[k] & other.token_properties[k] self.speaker_properties.update(other.speaker_properties) self.discourse_properties.update(other.discourse_properties) @property def lowest(self): """ Get the lowest annotation type of the Hierarchy Returns ------- str Lowest annotation type """ for k in self.keys(): if k not in self.values(): return k @property def highest(self): """ Get the highest annotation type of the Hierarchy Returns ------- str Highest annotation type """ for k, v in self.items(): if v is None: return k @property def highest_to_lowest(self): """ Get a list of annotation types sorted from highest to lowest Returns ------- list Annotation types from highest to lowest """ ats = [self.highest] while len(ats) < len(self.keys()): for k, v in self.items(): if v == ats[-1]: ats.append(k) break return ats @property def lowest_to_highest(self): """ Get a list of annotation types sorted from lowest to highest Returns ------- list Annotation types from lowest to highest """ ats = [self.lowest] while len(ats) < len(self.keys()): ats.append(self[ats[-1]]) return ats def get_lower_types(self, annotation_type): """ Get all annotation types that are lower than the specified annotation type Parameters ---------- annotation_type : str Annotation type from which to get lower annotation types Returns ------- list List of all annotation types that are lower the specified annotation type """ lower = [] found = False for t in self.highest_to_lowest: if t == annotation_type: found = True continue if found: lower.append(t) return lower def get_higher_types(self, annotation_type): """ Get all annotation types that are higher than the specified annotation type Parameters ---------- annotation_type : str Annotation type from which to get higher annotation types Returns ------- list List of all annotation types that are higher the specified annotation type """ higher = [] found = False for t in self.lowest_to_highest: if t == annotation_type: found = True continue if found: higher.append(t) return higher def has_subannotation_type(self, subannotation_type): """ Check whether the Hierarchy has a subannotation type Parameters ---------- subannotation_type : str Name of subannotation to check for Returns ------- bool True if subannotation type is present """ return subannotation_type in self.subannotation_properties def has_subannotation_property(self, subannotation_type, property_name): """ Check whether the Hierarchy has a property associated with a subannotation type Parameters ---------- subannotation_type : str Name of subannotation to check property_name : str Name of the property to check for Returns ------- bool True if subannotation type has the given property name """ if not self.has_subannotation_type(subannotation_type): return False return property_name in [x[0] for x in self.subannotation_properties[subannotation_type]] def add_subannotation_type( self, corpus_context, annotation_type, subannotation_type, properties=None ): """ Adds subannotation type for a given annotation type to the Hierarchy object and syncs it to a Neo4j database. The list of optional properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database annotation_type : str Annotation type to add a subannotation to subannotation_type : str Name of the subannotation type properties : iterable Optional iterable of tuples of the form (property_name, Type) """ if properties is None: properties = [] if subannotation_type in self.subannotation_properties: raise ( HierarchyError( "The subannotation_type {} is already specified for another linguistic type." " Please use a different name.".format(subannotation_type) ) ) if annotation_type not in self.subannotations: self.subannotations[annotation_type] = set() self.subannotations[annotation_type].add(subannotation_type) self.subannotation_properties[subannotation_type] = set(k for k in properties) if properties: set_template = "s.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name WITH a CREATE (a)<-[:annotates]-(s:{s_type}) WITH s SET {sets}""".format( sets=", ".join(ps), a_type=annotation_type, s_type=subannotation_type ) corpus_context.execute_cypher( statement, corpus_name=corpus_context.corpus_name, **kwargs ) else: statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name WITH a MERGE (a)<-[:annotates]-(s:{s_type})""".format( a_type=annotation_type, s_type=subannotation_type ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) corpus_context.cache_hierarchy() def remove_subannotation_type(self, corpus_context, subannotation_type): """ Remove a subannotation type from the Hierarchy object and sync it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database subannotation_type : str Subannotation type to remove """ try: del self.subannotation_properties[subannotation_type] except KeyError: pass for k, v in self.subannotations.items(): if subannotation_type in v: self.subannotations[k] = v - {subannotation_type} statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type}) DETACH DELETE s""".format( s_type=subannotation_type ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) corpus_context.cache_hierarchy() def add_subannotation_properties(self, corpus_context, subannotation_type, properties): """ Adds properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like ``bool``, ``str``, ``list``, or ``float``. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database subannotation_type : str Name of the subannotation type properties : iterable Iterable of tuples of the form (property_name, Type) """ set_template = "s.{0} = ${0}" ps = [] kwargs = {} for k, v in properties: if v == int: v = 0 elif v == list: v = [] elif v == float: v = 0.0 elif v == str: v = "" elif v == bool: v = False elif v is type(None): v = None ps.append(set_template.format(k)) kwargs[k] = v statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type}) SET {sets}""".format( sets=", ".join(ps), s_type=subannotation_type ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs) self.subannotation_properties[subannotation_type].update(k for k in properties) corpus_context.cache_hierarchy() def remove_subannotation_properties(self, corpus_context, subannotation_type, properties): """ Removes properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database. Parameters ---------- corpus_context : :class:`~polyglotdb.corpus.CorpusContext` CorpusContext to use for updating Neo4j database subannotation_type : str Name of the subannotation type properties : iterable List of property names to remove """ remove_template = "s.{0}" ps = [] for k in properties: ps.append(remove_template.format(k)) statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type}) REMOVE {removes}""".format( removes=", ".join(ps), s_type=subannotation_type ) corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name) to_remove = set( x for x in self.subannotation_properties[subannotation_type] if x[0] in properties ) self.subannotation_properties[subannotation_type].difference_update(to_remove) corpus_context.cache_hierarchy() def has_speaker_property(self, key): """ Check for whether speakers have a given property Parameters ---------- key : str Property to check for Returns ------- bool True if speakers have the given property """ for name, t in self.speaker_properties: if name == key: return True return False def has_discourse_property(self, key): """ Check for whether discourses have a given property Parameters ---------- key : str Property to check for Returns ------- bool True if discourses have the given property """ for name, t in self.discourse_properties: if name == key: return True return False def has_token_property(self, annotation_type, key): """ Check whether a given annotation type has a given token property. Parameters ---------- annotation_type : str Annotation type to check for the given token property key : str Property to check for Returns ------- bool True if the annotation type has the given token property """ if annotation_type not in self.token_properties: return False for name, t in self.token_properties[annotation_type]: if name == key: return True return False def has_type_property(self, annotation_type, key): """ Check whether a given annotation type has a given type property. Parameters ---------- annotation_type : str Annotation type to check for the given type property key : str Property to check for Returns ------- bool True if the annotation type has the given type property """ if annotation_type not in self.type_properties: return False for name, t in self.type_properties[annotation_type]: if name == key: return True return False def has_type_subset(self, annotation_type, key): """ Check whether a given annotation type has a given type subset. Parameters ---------- annotation_type : str Annotation type to check for the given type subset key : str Subset to check for Returns ------- bool True if the annotation type has the given type subset """ if annotation_type not in self.subset_types: return False return key in self.subset_types[annotation_type] def has_token_subset(self, annotation_type, key): """ Check whether a given annotation type has a given token subset. Parameters ---------- annotation_type : str Annotation type to check for the given token subset key : str Subset to check for Returns ------- bool True if the annotation type has the given token subset """ if annotation_type not in self.subset_tokens: return False return key in self.subset_tokens[annotation_type] @property def word_name(self): """ Shortcut for returning the annotation type matching "word" Returns ------- str or None Annotation type that begins with "word" """ for at in self.annotation_types: if at.startswith("word"): return at return None @property def phone_name(self): """ Alias function for getting the lowest annotation type Returns ------- str Name of the lowest annotation type """ return self.lowest