from polyglotdb.exceptions import AnnotationAttributeError, SubsetError
from polyglotdb.query.annotations.elements import (
EqualClauseElement,
FollowsClauseElement,
GtClauseElement,
GteClauseElement,
InClauseElement,
LeftAlignedClauseElement,
LtClauseElement,
LteClauseElement,
NotEqualClauseElement,
NotInClauseElement,
NotLeftAlignedClauseElement,
NotNullClauseElement,
NotRightAlignedClauseElement,
NotSubsetClauseElement,
NullClauseElement,
PrecedesClauseElement,
RegexClauseElement,
RightAlignedClauseElement,
SubsetClauseElement,
)
from polyglotdb.query.base import CollectionAttribute, CollectionNode, Node, NodeAttribute
from polyglotdb.query.base.helper import key_for_cypher
special_attributes = ["duration", "count", "rate", "position", "subset"]
[docs]
class AnnotationAttribute(NodeAttribute):
"""
Class for information about the attributes of annotations in a graph
query
Parameters
----------
annotation : AnnotationAttribute
Annotation that this attribute refers to
label : str
Label of the attribute
Attributes
----------
annotation : AnnotationAttribute
Annotation that this attribute refers to
label : str
Label of the attribute
output_label : str or None
User-specified label to use in query results
"""
collapsing = False
def __init__(self, annotation, label):
super(AnnotationAttribute, self).__init__(annotation, label)
self.acoustic = False
def __hash__(self):
return hash((self.node, self.label))
def __repr__(self):
return "<AnnotationAttribute '{}'>".format(str(self))
def requires_type(self):
if self.node.hierarchy is None or self.label in special_attributes:
return False
return not self.node.hierarchy.has_token_property(self.node.node_type, self.label)
def for_cypher(self, type=False):
"""Returns annotation duration or annotation type if applicable, otherwise annotation name and label"""
if self.label == "duration":
return "{a}.end - {a}.begin".format(a=self.node.alias)
if type or self.requires_type():
return "{}.{}".format(self.node.type_alias, key_for_cypher(self.label))
return "{}.{}".format(self.node.alias, key_for_cypher(self.label))
@property
def with_alias(self):
"""
returns type_alias if there is one
alias otherwise
"""
if self.requires_type():
return self.node.type_alias
else:
return self.node.alias
def __eq__(self, other):
try:
if self.label == "begin" and other.label == "begin":
return LeftAlignedClauseElement(self.node, other.node)
elif self.label == "end" and other.label == "end":
return RightAlignedClauseElement(self.node, other.node)
except AttributeError:
pass
if self.label == "subset":
return SubsetClauseElement(self, other)
if other is None:
return NullClauseElement(self, other)
return EqualClauseElement(self, other)
def __ne__(self, other):
try:
if self.label == "begin" and other.label == "begin":
return NotLeftAlignedClauseElement(self.node, other.node)
elif self.label == "end" and other.label == "end":
return NotRightAlignedClauseElement(self.node, other.node)
except AttributeError:
pass
if self.label == "subset":
return NotSubsetClauseElement(self, other)
if other is None:
return NotNullClauseElement(self, other)
return NotEqualClauseElement(self, other)
def __gt__(self, other):
return GtClauseElement(self, other)
def __ge__(self, other):
return GteClauseElement(self, other)
def __lt__(self, other):
return LtClauseElement(self, other)
def __le__(self, other):
return LteClauseElement(self, other)
def in_(self, other):
"""
Checks if the parameter other has a 'cypher' element
executes the query if it does and appends the relevant results
or appends parameter other
Parameters
----------
other : list
attribute will be checked against elements in this list
Returns
-------
string
clause for asserting membership in a filter
"""
if hasattr(other, "cypher"):
results = other.all()
t = []
for x in results:
t.append(getattr(x, self.label))
else:
t = other
return InClauseElement(self, t)
def not_in_(self, other):
"""
Checks if the parameter other has a 'cypher' element
executes the query if it does and appends the relevant results
or appends parameter other
Parameters
----------
other : list
attribute will be checked against elements in this list
Returns
-------
string
clause for asserting non-membership in a filter
"""
if hasattr(other, "cypher"):
results = other.all()
t = []
for x in results:
t.append(getattr(x, self.label))
else:
t = other
return NotInClauseElement(self, t)
def regex(self, pattern):
"""Returns a clause for filtering based on regular expressions."""
return RegexClauseElement(self, pattern)
def aliased_for_output(self, type=False):
"""
creates cypher string for output
Returns
-------
string
string for output
"""
return "{} AS {}".format(self.for_cypher(type), self.output_alias_for_cypher)
def for_type_filter(self):
return self.for_cypher(type=True)
[docs]
class AnnotationNode(Node):
"""
Class for annotations referenced in graph queries
Parameters
----------
type : str
Annotation type
pos : int
Position in the query, defaults to 0
Attributes
----------
type : str
Annotation type
pos : int
Position in the query
previous : :class:`~polyglotdb.graph.attributes.AnnotationAttribute`
Returns the Annotation of the same type with the previous position
following : :class:`~polyglotdb.graph.attributes.AnnotationAttribute`
Returns the Annotation of the same type with the following position
"""
match_template = """({token_alias})-[:is_a]->({type_alias})"""
# template = '''({token_alias})'''
begin_template = "{}_{}_begin"
end_template = "{}_{}_end"
alias_template = "node_{t}"
def __init__(self, node_type, corpus=None, hierarchy=None):
super(AnnotationNode, self).__init__(node_type, corpus=corpus, hierarchy=hierarchy)
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, AnnotationNode):
return False
if self.key != other.key:
return False
return True
def __str__(self):
return "{}_0".format(self.key)
def __repr__(self):
return "<AnnotationNode object with '{}' type>".format(self.node_type)
def for_match(self):
"""sets 'token_alias' and 'type_alias' keyword arguments for an annotation"""
kwargs = {
"token_alias": self.define_alias,
"type_alias": self.define_type_alias,
}
return self.match_template.format(**kwargs)
def filter_by_subset(self, *args):
"""adds each item in args to the hierarchy type_labels"""
if self.hierarchy is not None:
for a in args:
if not self.hierarchy.has_type_subset(
self.node_type, a
) and not self.hierarchy.has_token_subset(self.node_type, a):
raise (
SubsetError(
"{} is not a subset of {} types or tokens.".format(a, self.node_type)
)
)
self.subset_labels = sorted(set(self.subset_labels + list(args)))
return self
@property
def define_type_alias(self):
"""Returns a cypher string for getting all type_labels"""
label_string = ":{}_type".format(self.node_type)
if self.corpus is not None:
label_string += ":{}".format(key_for_cypher(self.corpus))
if self.subset_labels:
subset_type_labels = [
x for x in self.subset_labels if self.hierarchy.has_type_subset(self.node_type, x)
]
if subset_type_labels:
label_string += ":" + ":".join(map(key_for_cypher, subset_type_labels))
return "{}{}".format(self.type_alias, label_string)
@property
def define_alias(self):
"""Returns a cypher string for getting all token_labels"""
label_string = ":{}:speech".format(self.node_type)
if self.corpus is not None:
label_string += ":{}".format(key_for_cypher(self.corpus))
if self.subset_labels:
subset_token_labels = [
x for x in self.subset_labels if self.hierarchy.has_token_subset(self.node_type, x)
]
if subset_token_labels:
label_string += ":" + ":".join(map(key_for_cypher, subset_token_labels))
return "{}{}".format(self.alias, label_string)
@property
def type_alias(self):
"""Returns a cypher formatted string of type alias"""
return key_for_cypher("type_" + self.alias.replace("`", ""))
@property
def alias(self):
"""Returns a cypher formatted string of keys and prefixes"""
return key_for_cypher(self.alias_template.format(t=self.key))
@property
def with_alias(self):
"""Returns alias"""
return self.alias
@property
def labels_alias(self):
"""Returns alias"""
return "labels({}) as {}".format(self.alias, key_for_cypher(self.alias + "_labels"))
@property
def withs(self):
"""Returns a list of alias and type_alias"""
return [self.alias, self.type_alias, self.labels_alias]
def precedes(self, other_annotation):
return PrecedesClauseElement(self, other_annotation)
def follows(self, other_annotation):
return FollowsClauseElement(self, other_annotation)
def __getattr__(self, key):
if key == "current":
return self
elif key in ["previous", "following"]:
from .precedence import FollowingAnnotation, PreviousAnnotation
if key == "previous":
return PreviousAnnotation(self, -1)
else:
return FollowingAnnotation(self, 1)
elif key in ["previous_pause", "following_pause"]:
from .pause import FollowingPauseAnnotation, PreviousPauseAnnotation
node = self
if self.node_type != self.hierarchy.word_name:
node = getattr(self, self.hierarchy.word_name)
if key == "previous_pause":
return PreviousPauseAnnotation(node)
else:
return FollowingPauseAnnotation(node)
elif key.startswith("previous"):
p, key = key.split("_", 1)
p = self.previous
return getattr(p, key)
elif key.startswith("following"):
p, key = key.split("_", 1)
f = self.following
return getattr(f, key)
elif key == "follows_pause":
from .pause import FollowsPauseAttribute
return FollowsPauseAttribute(self)
elif key == "precedes_pause":
from .pause import PrecedesPauseAttribute
return PrecedesPauseAttribute(self)
elif key == "speaker":
from .speaker import SpeakerAnnotation
return SpeakerAnnotation(self)
elif key == "discourse":
from .discourse import DiscourseAnnotation
return DiscourseAnnotation(self)
elif key in self.hierarchy.acoustics:
from .acoustic import AcousticAttribute
return AcousticAttribute(self, key)
elif self.hierarchy is not None and key in self.hierarchy.get_higher_types(self.node_type):
from .hierarchical import HierarchicalAnnotation
prev_node = self
cur_node = HierarchicalAnnotation(
prev_node,
AnnotationNode(key, corpus=self.corpus, hierarchy=self.hierarchy),
)
return cur_node
elif self.hierarchy is not None and key in self.hierarchy.get_lower_types(self.node_type):
from .path import SubPathAnnotation
return SubPathAnnotation(self, AnnotationNode(key, corpus=self.corpus))
elif (
self.hierarchy is not None
and self.node_type in self.hierarchy.subannotations
and key in self.hierarchy.subannotations[self.node_type]
):
from .subannotation import SubAnnotation
return SubAnnotation(self, AnnotationNode(key, corpus=self.corpus))
else:
if (
key not in special_attributes
and self.hierarchy is not None
and not self.hierarchy.has_token_property(self.node_type, key)
and not self.hierarchy.has_type_property(self.node_type, key)
):
properties = [
x[0]
for x in self.hierarchy.type_properties[self.node_type]
| self.hierarchy.token_properties[self.node_type]
]
raise AnnotationAttributeError(
"The '{}' annotation types do not have a '{}' property (available: {}).".format(
self.node_type, key, ", ".join(properties)
)
)
return AnnotationAttribute(self, key)
class AnnotationCollectionNode(CollectionNode):
def with_statement(self):
""" """
return ", ".join(
[
"collect(n) as {a}".format(a=self.collection_alias),
"collect(t) as {a}".format(a=self.collection_type_alias),
]
)
@property
def withs(self):
withs = [self.collection_alias, self.collection_type_alias]
return withs
class AnnotationCollectionAttribute(CollectionAttribute):
pass