Source code for polyglotdb.query.base.attributes

from .helper import key_for_cypher
from .elements import (EqualClauseElement, NotEqualClauseElement, SubsetClauseElement,
                       NotSubsetClauseElement, NullClauseElement, NotNullClauseElement,
                       InClauseElement, NotInClauseElement, GtClauseElement, GteClauseElement,
                       LtClauseElement, LteClauseElement, RegexClauseElement)


[docs]class NodeAttribute(object): has_subquery = False acoustic = False def __init__(self, node, label): self.node = node self.label = label self.output_label = None def __hash__(self): return hash((self.node, self.label)) def __str__(self): return '{}.{}'.format(self.node, self.label) def __repr__(self): return '<NodeAttribute \'{}\'>'.format(str(self)) def for_cypher(self): return '{}.{}'.format(self.node.alias, key_for_cypher(self.label)) def for_json(self): return [[x for x in self.node.for_json()] + [self.label], self.output_label] def for_filter(self): return self.for_cypher() def for_column(self): return self.for_cypher() def value_type(self): a_type = self.node.node_type if a_type == 'Speaker': for name, t in self.node.hierarchy.speaker_properties: if name == self.label: if t == type(None) or t is None: return None return t elif a_type == 'Discourse': for name, t in self.node.hierarchy.discourse_properties: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_token_property(a_type, self.label): for name, t in self.node.hierarchy.token_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_type_property(a_type, self.label): for name, t in self.node.hierarchy.type_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_subannotation_property(a_type, self.label): for name, t in self.node.hierarchy.subannotation_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t raise ValueError('Property type "{}" not found for "{}".'.format(self.label, a_type)) def coerce_value(self, value): if value is None: return value t = self.value_type() if t is None: return None if isinstance(value, list): return [t(x) for x in value] return t(value) @property def alias(self): """ Removes '`' from annotation, concatenates annotation alias and label""" return '{}_{}'.format(self.node.alias.replace('`', ''), self.label) @property def alias_for_cypher(self): return '`{}_{}`'.format(self.node.alias.replace('`', ''), self.label)
[docs] def aliased_for_cypher(self): """ creates cypher string to use in db Returns ------- string string for db """ return '{} AS {}'.format(self.for_cypher(), self.alias_for_cypher)
def for_return(self): return self.for_cypher()
[docs] def aliased_for_output(self): """ creates cypher string for output Returns ------- string string for output """ return '{} AS {}'.format(self.for_return(), self.output_alias_for_cypher)
@property def output_alias(self): """ returns output_label if there is one return alias otherwise """ if self.output_label is not None: return self.output_label return self.alias @property def output_alias_for_cypher(self): """ returns output_label if there is one return alias otherwise """ if self.output_label is not None: return self.output_label return self.alias_for_cypher @property def with_alias(self): """ returns type_alias if there is one alias otherwise """ return self.node.alias
[docs] def column_name(self, label): """ sets a column name to label """ self.output_label = label return self
def __eq__(self, other): if self.label == 'subset': return SubsetClauseElement(self, other) if other is None: return NullClauseElement(self, other) return EqualClauseElement(self, other) def __ne__(self, other): if self.label == 'subset': return NotSubsetClauseElement(self, other) if other is None: return NotNullClauseElement(self, other) return NotEqualClauseElement(self, other) def __gt__(self, other): return GtClauseElement(self, other) def __ge__(self, other): return GteClauseElement(self, other) def __lt__(self, other): return LtClauseElement(self, other) def __le__(self, other): return LteClauseElement(self, other)
[docs] def in_(self, other): """ Checks if the parameter other has a 'cypher' element executes the query if it does and appends the relevant results or appends parameter other Parameters ---------- other : list attribute will be checked against elements in this list Returns ------- string clause for asserting membership in a filter """ if hasattr(other, 'cypher'): results = other.all() t = [] for x in results: t.append(getattr(x, self.label)) else: t = other return InClauseElement(self, t)
[docs] def not_in_(self, other): """ Checks if the parameter other has a 'cypher' element executes the query if it does and appends the relevant results or appends parameter other Parameters ---------- other : list attribute will be checked against elements in this list Returns ------- string clause for asserting non-membership in a filter """ if hasattr(other, 'cypher'): results = other.all() t = [] for x in results: t.append(getattr(x, self.label)) else: t = other return NotInClauseElement(self, t)
[docs] def regex(self, pattern): """ Returns a clause for filtering based on regular expressions.""" return RegexClauseElement(self, pattern)
@property def nodes(self): return self.node.nodes cache_alias = alias
[docs]class CollectionAttribute(NodeAttribute): collapsing = True acoustic = False filter_template = '{alias}.{property}' return_template = '[n in {alias}|n.{property}]' def __repr__(self): return '<CollectionAttribute \'{}\'>'.format(str(self)) def value_type(self): n = self.node.collected_node a_type = n.node_type if a_type == 'Speaker': for name, t in self.node.hierarchy.speaker_properties: if name == self.label: if t == type(None) or t is None: return None return t elif a_type == 'Discourse': for name, t in self.node.hierarchy.discourse_properties: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_token_property(a_type, self.label): for name, t in self.node.hierarchy.token_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_type_property(a_type, self.label): for name, t in self.node.hierarchy.type_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t elif self.node.hierarchy.has_subannotation_property(a_type, self.label): for name, t in self.node.hierarchy.subannotation_properties[a_type]: if name == self.label: if t == type(None) or t is None: return None return t raise ValueError('Property type "{}" not found for "{}".'.format(self.label, a_type)) def for_cypher(self): return self.for_return() def for_filter(self): return self.filter_template.format(alias=self.node.collection_alias, property=self.label) def for_return(self): return self.return_template.format(alias=self.node.collection_alias, property=self.label) @property def with_aliases(self): """Returns annotation withs list """ return self.node.withs @property def with_alias(self): """returns annotation path_alias """ return self.node.collection_alias @property def cache_alias(self): return self.node.anchor_node.alias
[docs]class Node(object): non_optional = True has_subquery = False alias_template = 'node_{t}' match_template = '({alias})' def __init__(self, node_type, corpus=None, hierarchy=None): self.node_type = node_type self.corpus = corpus self.hierarchy = hierarchy self.subset_labels = [] def __eq__(self, other): if not isinstance(other, Node): return False if self.node_type != other.node_type: return False if self.corpus != other.corpus: return False if self.subset_labels != other.subset_labels: return False return True def __hash__(self): return hash(self.key) def __str__(self): return self.key def __repr__(self): return '<Node of {} in {} corpus'.format(self.node_type, self.corpus) def __getattr__(self, key): return NodeAttribute(self, key) @property def key(self): key = self.node_type if self.subset_labels: key += '_' + '_'.join(self.subset_labels) return key def for_json(self): return [self.node_type] def for_match(self): return self.match_template.format(alias=self.define_alias) @property def alias(self): return key_for_cypher(self.alias_template.format(t=self.key)) @property def define_alias(self): label_string = ':{}'.format(self.node_type) if self.corpus is not None: label_string += ':{}'.format(key_for_cypher(self.corpus)) if self.subset_labels: label_string += ':' + ':'.join(map(key_for_cypher, self.subset_labels)) return '{}{}'.format(self.alias, label_string)
[docs] def filter_by_subset(self, *args): """ adds each item in args to the hierarchy type_labels""" self.subset_labels = sorted(set(self.subset_labels + list(args))) return self
@property def with_alias(self): return self.alias @property def withs(self): return [self.alias] @property def nodes(self): return [self]
[docs]class CollectionNode(object): has_subquery = True non_optional = False subquery_match_template = '({anchor_node_alias})-->({def_collection_alias})' subquery_order_by_template = '' subquery_template = '''{optional}MATCH {for_match} {where_string} WITH {input_with_string}, {with_pre_collection} {sub_query} {order_by} WITH {output_with_string}''' collect_template = 'collect({a}) as {a}' def __init__(self, anchor_node, collected_node): self.anchor_node = anchor_node self.collected_node = collected_node def subquery(self, withs, filters=None, optional=False): input_with = ', '.join(withs) new_withs = withs - {self.collection_alias} output_with = ', '.join(new_withs) + ', ' + self.with_statement() where_string = '' if filters is not None: relevant = [] for c in filters: if c.involves(self): relevant.append(c.for_cypher()) if relevant: where_string = 'WHERE ' + '\nAND '.join(relevant) for_match = self.subquery_match_template.format(anchor_node_alias=self.anchor_node.alias, def_collection_alias=self.def_collection_alias) order_by = self.subquery_order_by_template kwargs = {'for_match': for_match, 'where_string': where_string, 'input_with_string': input_with, 'order_by': order_by, 'sub_query': '', 'optional': '', 'with_pre_collection': self.with_pre_collection, 'output_with_string': output_with} if optional: kwargs['optional']= 'OPTIONAL ' return self.subquery_template.format(**kwargs) @property def with_pre_collection(self): return self.collection_alias def __eq__(self, other): if not isinstance(other, CollectionNode): return False if self.anchor_node != other.anchor_node: return False if self.collected_node != other.collected_node: return False return True @property def nodes(self): return [self] + self.anchor_node.nodes + self.collected_node.nodes @property def hierarchy(self): return self.anchor_node.hierarchy @property def corpus(self): return self.anchor_node.corpus @property def node_type(self): return self.anchor_node.node_type def __str__(self): return '{}.{}'.format(self.anchor_node, self.collected_node) def __repr__(self): return '<CollectionNode of {} under {}'.format(str(self.collected_node), str(self.anchor_node)) def __hash__(self): return hash((self.anchor_node, self.collected_node)) @property def withs(self): withs = [self.collection_alias] return withs def with_statement(self): withs = [self.collect_template.format(a=self.collection_alias) ] return ', '.join(withs) @property def def_collection_alias(self): label_string = ':{}'.format(self.collected_node.node_type) if self.corpus is not None: label_string += ':{}'.format(key_for_cypher(self.collected_node.corpus)) if self.collected_node.subset_labels: label_string += ':' + ':'.join(map(key_for_cypher, self.collected_node.subset_labels)) return '{}{}'.format(self.collection_alias, label_string) @property def collection_alias(self): return key_for_cypher('{}_in_{}'.format(self.collected_node.alias, self.anchor_node.alias)) alias = collection_alias def filter_by_subset(self, *args): self.collected_node = self.collected_node.filter_by_subset(*args) return self def __getattr__(self, key): return CollectionAttribute(self, key)