Source code for polyglotdb.query.annotations.attributes.base

from polyglotdb.exceptions import AnnotationAttributeError, SubsetError
from polyglotdb.query.annotations.elements import (
    EqualClauseElement,
    FollowsClauseElement,
    GtClauseElement,
    GteClauseElement,
    InClauseElement,
    LeftAlignedClauseElement,
    LtClauseElement,
    LteClauseElement,
    NotEqualClauseElement,
    NotInClauseElement,
    NotLeftAlignedClauseElement,
    NotNullClauseElement,
    NotRightAlignedClauseElement,
    NotSubsetClauseElement,
    NullClauseElement,
    PrecedesClauseElement,
    RegexClauseElement,
    RightAlignedClauseElement,
    SubsetClauseElement,
)
from polyglotdb.query.base import CollectionAttribute, CollectionNode, Node, NodeAttribute
from polyglotdb.query.base.helper import key_for_cypher

special_attributes = ["duration", "count", "rate", "position", "subset"]


[docs] class AnnotationAttribute(NodeAttribute): """ Class for information about the attributes of annotations in a graph query Parameters ---------- annotation : AnnotationAttribute Annotation that this attribute refers to label : str Label of the attribute Attributes ---------- annotation : AnnotationAttribute Annotation that this attribute refers to label : str Label of the attribute output_label : str or None User-specified label to use in query results """ collapsing = False def __init__(self, annotation, label): super(AnnotationAttribute, self).__init__(annotation, label) self.acoustic = False def __hash__(self): return hash((self.node, self.label)) def __repr__(self): return "<AnnotationAttribute '{}'>".format(str(self)) def requires_type(self): if self.node.hierarchy is None or self.label in special_attributes: return False return not self.node.hierarchy.has_token_property(self.node.node_type, self.label) def for_cypher(self, type=False): """Returns annotation duration or annotation type if applicable, otherwise annotation name and label""" if self.label == "duration": return "{a}.end - {a}.begin".format(a=self.node.alias) if type or self.requires_type(): return "{}.{}".format(self.node.type_alias, key_for_cypher(self.label)) return "{}.{}".format(self.node.alias, key_for_cypher(self.label)) @property def with_alias(self): """ returns type_alias if there is one alias otherwise """ if self.requires_type(): return self.node.type_alias else: return self.node.alias def __eq__(self, other): try: if self.label == "begin" and other.label == "begin": return LeftAlignedClauseElement(self.node, other.node) elif self.label == "end" and other.label == "end": return RightAlignedClauseElement(self.node, other.node) except AttributeError: pass if self.label == "subset": return SubsetClauseElement(self, other) if other is None: return NullClauseElement(self, other) return EqualClauseElement(self, other) def __ne__(self, other): try: if self.label == "begin" and other.label == "begin": return NotLeftAlignedClauseElement(self.node, other.node) elif self.label == "end" and other.label == "end": return NotRightAlignedClauseElement(self.node, other.node) except AttributeError: pass if self.label == "subset": return NotSubsetClauseElement(self, other) if other is None: return NotNullClauseElement(self, other) return NotEqualClauseElement(self, other) def __gt__(self, other): return GtClauseElement(self, other) def __ge__(self, other): return GteClauseElement(self, other) def __lt__(self, other): return LtClauseElement(self, other) def __le__(self, other): return LteClauseElement(self, other) def in_(self, other): """ Checks if the parameter other has a 'cypher' element executes the query if it does and appends the relevant results or appends parameter other Parameters ---------- other : list attribute will be checked against elements in this list Returns ------- string clause for asserting membership in a filter """ if hasattr(other, "cypher"): results = other.all() t = [] for x in results: t.append(getattr(x, self.label)) else: t = other return InClauseElement(self, t) def not_in_(self, other): """ Checks if the parameter other has a 'cypher' element executes the query if it does and appends the relevant results or appends parameter other Parameters ---------- other : list attribute will be checked against elements in this list Returns ------- string clause for asserting non-membership in a filter """ if hasattr(other, "cypher"): results = other.all() t = [] for x in results: t.append(getattr(x, self.label)) else: t = other return NotInClauseElement(self, t) def regex(self, pattern): """Returns a clause for filtering based on regular expressions.""" return RegexClauseElement(self, pattern) def aliased_for_output(self, type=False): """ creates cypher string for output Returns ------- string string for output """ return "{} AS {}".format(self.for_cypher(type), self.output_alias_for_cypher) def for_type_filter(self): return self.for_cypher(type=True)
[docs] class AnnotationNode(Node): """ Class for annotations referenced in graph queries Parameters ---------- type : str Annotation type pos : int Position in the query, defaults to 0 Attributes ---------- type : str Annotation type pos : int Position in the query previous : :class:`~polyglotdb.graph.attributes.AnnotationAttribute` Returns the Annotation of the same type with the previous position following : :class:`~polyglotdb.graph.attributes.AnnotationAttribute` Returns the Annotation of the same type with the following position """ match_template = """({token_alias})-[:is_a]->({type_alias})""" # template = '''({token_alias})''' begin_template = "{}_{}_begin" end_template = "{}_{}_end" alias_template = "node_{t}" def __init__(self, node_type, corpus=None, hierarchy=None): super(AnnotationNode, self).__init__(node_type, corpus=corpus, hierarchy=hierarchy) def __hash__(self): return hash(self.key) def __eq__(self, other): if not isinstance(other, AnnotationNode): return False if self.key != other.key: return False return True def __str__(self): return "{}_0".format(self.key) def __repr__(self): return "<AnnotationNode object with '{}' type>".format(self.node_type) def for_match(self): """sets 'token_alias' and 'type_alias' keyword arguments for an annotation""" kwargs = { "token_alias": self.define_alias, "type_alias": self.define_type_alias, } return self.match_template.format(**kwargs) def filter_by_subset(self, *args): """adds each item in args to the hierarchy type_labels""" if self.hierarchy is not None: for a in args: if not self.hierarchy.has_type_subset( self.node_type, a ) and not self.hierarchy.has_token_subset(self.node_type, a): raise ( SubsetError( "{} is not a subset of {} types or tokens.".format(a, self.node_type) ) ) self.subset_labels = sorted(set(self.subset_labels + list(args))) return self @property def define_type_alias(self): """Returns a cypher string for getting all type_labels""" label_string = ":{}_type".format(self.node_type) if self.corpus is not None: label_string += ":{}".format(key_for_cypher(self.corpus)) if self.subset_labels: subset_type_labels = [ x for x in self.subset_labels if self.hierarchy.has_type_subset(self.node_type, x) ] if subset_type_labels: label_string += ":" + ":".join(map(key_for_cypher, subset_type_labels)) return "{}{}".format(self.type_alias, label_string) @property def define_alias(self): """Returns a cypher string for getting all token_labels""" label_string = ":{}:speech".format(self.node_type) if self.corpus is not None: label_string += ":{}".format(key_for_cypher(self.corpus)) if self.subset_labels: subset_token_labels = [ x for x in self.subset_labels if self.hierarchy.has_token_subset(self.node_type, x) ] if subset_token_labels: label_string += ":" + ":".join(map(key_for_cypher, subset_token_labels)) return "{}{}".format(self.alias, label_string) @property def type_alias(self): """Returns a cypher formatted string of type alias""" return key_for_cypher("type_" + self.alias.replace("`", "")) @property def alias(self): """Returns a cypher formatted string of keys and prefixes""" return key_for_cypher(self.alias_template.format(t=self.key)) @property def with_alias(self): """Returns alias""" return self.alias @property def labels_alias(self): """Returns alias""" return "labels({}) as {}".format(self.alias, key_for_cypher(self.alias + "_labels")) @property def withs(self): """Returns a list of alias and type_alias""" return [self.alias, self.type_alias, self.labels_alias] def precedes(self, other_annotation): return PrecedesClauseElement(self, other_annotation) def follows(self, other_annotation): return FollowsClauseElement(self, other_annotation) def __getattr__(self, key): if key == "current": return self elif key in ["previous", "following"]: from .precedence import FollowingAnnotation, PreviousAnnotation if key == "previous": return PreviousAnnotation(self, -1) else: return FollowingAnnotation(self, 1) elif key in ["previous_pause", "following_pause"]: from .pause import FollowingPauseAnnotation, PreviousPauseAnnotation node = self if self.node_type != self.hierarchy.word_name: node = getattr(self, self.hierarchy.word_name) if key == "previous_pause": return PreviousPauseAnnotation(node) else: return FollowingPauseAnnotation(node) elif key.startswith("previous"): p, key = key.split("_", 1) p = self.previous return getattr(p, key) elif key.startswith("following"): p, key = key.split("_", 1) f = self.following return getattr(f, key) elif key == "follows_pause": from .pause import FollowsPauseAttribute return FollowsPauseAttribute(self) elif key == "precedes_pause": from .pause import PrecedesPauseAttribute return PrecedesPauseAttribute(self) elif key == "speaker": from .speaker import SpeakerAnnotation return SpeakerAnnotation(self) elif key == "discourse": from .discourse import DiscourseAnnotation return DiscourseAnnotation(self) elif key in self.hierarchy.acoustics: from .acoustic import AcousticAttribute return AcousticAttribute(self, key) elif self.hierarchy is not None and key in self.hierarchy.get_higher_types(self.node_type): from .hierarchical import HierarchicalAnnotation prev_node = self cur_node = HierarchicalAnnotation( prev_node, AnnotationNode(key, corpus=self.corpus, hierarchy=self.hierarchy), ) return cur_node elif self.hierarchy is not None and key in self.hierarchy.get_lower_types(self.node_type): from .path import SubPathAnnotation return SubPathAnnotation(self, AnnotationNode(key, corpus=self.corpus)) elif ( self.hierarchy is not None and self.node_type in self.hierarchy.subannotations and key in self.hierarchy.subannotations[self.node_type] ): from .subannotation import SubAnnotation return SubAnnotation(self, AnnotationNode(key, corpus=self.corpus)) else: if ( key not in special_attributes and self.hierarchy is not None and not self.hierarchy.has_token_property(self.node_type, key) and not self.hierarchy.has_type_property(self.node_type, key) ): properties = [ x[0] for x in self.hierarchy.type_properties[self.node_type] | self.hierarchy.token_properties[self.node_type] ] raise AnnotationAttributeError( "The '{}' annotation types do not have a '{}' property (available: {}).".format( self.node_type, key, ", ".join(properties) ) ) return AnnotationAttribute(self, key)
class AnnotationCollectionNode(CollectionNode): def with_statement(self): """ """ return ", ".join( [ "collect(n) as {a}".format(a=self.collection_alias), "collect(t) as {a}".format(a=self.collection_type_alias), ] ) @property def withs(self): withs = [self.collection_alias, self.collection_type_alias] return withs class AnnotationCollectionAttribute(CollectionAttribute): pass