from .exceptions import HierarchyError, GraphQueryError
from .query.annotations.attributes import PauseAnnotation, AnnotationNode
from datetime import datetime
[docs]
class Hierarchy(object):
"""
Class containing information about how a corpus is structured.
Hierarchical data is stored in the form of a dictionary with keys
for linguistic types, and values for the linguistic type that contains
them. If no other type contains a given type, its value is ``None``.
Subannotation data is stored in the form of a dictionary with keys
for linguistic types, and values of sets of types of subannotations.
Parameters
----------
data : dict
Information about the hierarchy of linguistic types
corpus_name : str
Name of the corpus
"""
get_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
RETURN n.subsets as subsets"""
set_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
SET n.subsets = $subsets"""
get_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
RETURN n.subsets as subsets"""
set_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
SET n.subsets = $subsets"""
def __init__(self, data=None, corpus_name=None):
if data is None:
data = {}
self._data = data
self.corpus_name = corpus_name
self.subannotations = {}
self.subannotation_properties = {}
self.subset_types = {}
self.token_properties = {}
self.subset_tokens = {}
self.type_properties = {}
self.acoustic_properties = {}
self.speaker_properties = {('name', str)}
self.discourse_properties = {('name', str), ('file_path', str), ('low_freq_file_path', str), ('vowel_file_path', str), ('consonant_file_path', str), ('duration', float), ('sampling_rate', int), ('num_channels', int)}
def __getattr__(self, key):
if key == 'pause':
return PauseAnnotation(corpus=self.corpus_name, hierarchy=self)
if key + 's' in self.annotation_types:
key += 's' # FIXME
if key in self.annotation_types:
return AnnotationNode(key, corpus=self.corpus_name, hierarchy=self)
raise (GraphQueryError(
'The graph does not have any annotations of type \'{}\'. Possible types are: {}'.format(key, ', '.join(
sorted(self.annotation_types)))))
def __getstate__(self):
return self.to_json()
def __setstate__(self, state):
self.from_json(state)
def __str__(self):
return str(self.to_json())
def get_depth(self, lower_type, higher_type):
"""
Get the distance between two annotation types in the hierarchy
Parameters
----------
lower_type : str
Name of the lower type
higher_type : str
Name of the higher type
Returns
-------
int
Distance between the two types
"""
depth = 1
t = self.get_higher_types(lower_type)
for i in t:
if i == higher_type:
break
depth += 1
return depth
@property
def annotation_types(self):
"""
Get a list of all the annotation types in the hierarchy
Returns
-------
list
All annotation types in the hierarchy
"""
return list(self._data.keys())
@property
def acoustics(self):
"""
Get all currently encoded acoustic measurements in the corpus
Returns
-------
list
All encoded acoustic measures
"""
return sorted(self.acoustic_properties.keys())
def to_json(self):
"""
Convert the Hierarchy object to a dictionary for JSON serialization
Returns
-------
dict
All necessary information for the Hierarchy object
"""
data = {'_data': self._data}
data['corpus_name'] = self.corpus_name
data['acoustic_properties'] = {k: sorted((name, t()) for name, t in v) for k, v in self.acoustic_properties.items()}
data['subannotations'] = {k: sorted(v) for k, v in self.subannotations.items()}
data['subannotation_properties'] = {k: sorted((name, t()) for name, t in v) for k, v in
self.subannotation_properties.items()}
data['subset_types'] = {k: sorted(v) for k, v in self.subset_types.items()}
data['subset_tokens'] = {k: sorted(v) for k, v in self.subset_tokens.items()}
data['token_properties'] = {k: sorted((name, t()) for name, t in v) for k, v in self.token_properties.items()}
data['type_properties'] = {k: sorted((name, t()) for name, t in v) for k, v in self.type_properties.items()}
data['speaker_properties'] = sorted((name, t()) for name, t in self.speaker_properties)
data['discourse_properties'] = sorted((name, t()) for name, t in self.discourse_properties)
return data
def from_json(self, json):
"""
Set all properties from a dictionary deserialized from JSON
Parameters
----------
json : dict
Object information
"""
self._data = json['_data']
self.corpus_name = json['corpus_name']
self.acoustic_properties = {k: set((name, type(t)) for name, t in v) for k, v in json.get('acoustic_properties', {}).items()}
self.subannotations = {k: set(v) for k, v in json['subannotations'].items()}
self.subannotation_properties = {k: set((name, type(t)) for name, t in v) for k, v in
json['subannotation_properties'].items()}
self.subset_types = {k: set(v) for k, v in json['subset_types'].items()}
self.subset_tokens = {k: set(v) for k, v in json['subset_tokens'].items()}
self.token_properties = {k: set((name, type(t)) for name, t in v) for k, v in json['token_properties'].items()}
self.type_properties = {k: set((name, type(t)) for name, t in v) for k, v in json['type_properties'].items()}
self.speaker_properties = set((name, type(t)) for name, t in json['speaker_properties'])
self.discourse_properties = set((name, type(t)) for name, t in json['discourse_properties'])
def add_type_subsets(self, corpus_context, annotation_type, subsets):
"""
Adds type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to add subsets for
subsets : iterable
List of subsets to add for the annotation type
"""
statement = self.get_type_subset_template.format(type=annotation_type)
res = list(corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name))
try:
cur_subsets = res[0]['subsets']
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets + subsets)
statement = self.set_type_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(statement, subsets=sorted(updated),
corpus_name=corpus_context.corpus_name)
self.subset_types[annotation_type] = updated
corpus_context.cache_hierarchy()
def remove_type_subsets(self, corpus_context, annotation_type, subsets):
"""
Removes type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to remove subsets for
subsets : iterable
List of subsets to remove for the annotation type
"""
statement = self.get_type_subset_template.format(type=annotation_type)
res = list(corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name))
try:
cur_subsets = res[0]['subsets']
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets) - set(subsets)
statement = self.set_type_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(statement, subsets=sorted(updated),
corpus_name=corpus_context.corpus_name)
self.subset_types[annotation_type] = updated
corpus_context.cache_hierarchy()
def add_token_subsets(self, corpus_context, annotation_type, subsets):
"""
Adds token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to add subsets for
subsets : iterable
List of subsets to add for the annotation tokens
"""
statement = self.get_token_subset_template.format(type=annotation_type)
res = list(corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name))
try:
cur_subsets = res[0]['subsets']
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets + subsets)
statement = self.set_token_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(statement, subsets=sorted(updated),
corpus_name=corpus_context.corpus_name)
self.subset_tokens[annotation_type] = updated
corpus_context.cache_hierarchy()
def remove_token_subsets(self, corpus_context, annotation_type, subsets):
"""
Removes token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to remove subsets for
subsets : iterable
List of subsets to remove for the annotation tokens
"""
statement = self.get_token_subset_template.format(type=annotation_type)
res = list(corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name))
try:
cur_subsets = res[0]['subsets']
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets) - set(subsets)
statement = self.set_token_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(statement, subsets=sorted(updated),
corpus_name=corpus_context.corpus_name)
self.subset_tokens[annotation_type] = updated
corpus_context.cache_hierarchy()
def add_annotation_type(self, annotation_type, above=None, below=None):
"""
Adds an annotation type to the Hierarchy object along with default type and token properties for the new
annotation type
Parameters
----------
annotation_type : str
Annotation type to add
above : str
Annotation type that is contained by the new annotation type, leave out if new annotation type is at the bottom
of the hierarchy
below : str
Annotation type that contains the new annotation type, leave out if new annotation type is at the top
of the hierarchy
"""
self._data[above] = annotation_type
self._data[annotation_type] = below
self.token_properties[annotation_type] = {('id', str), ('label', str),
('begin', float), ('end', float), ('duration', float)}
self.type_properties[annotation_type] = {('label', str)}
def remove_annotation_type(self, annotation_type):
"""
Removes an annotation type from the hierarchy
Parameters
----------
annotation_type : str
Annotation type to remove
"""
cur_above = self._data[annotation_type]
cur_below = [k for k, v in self._data.items() if v == annotation_type][0]
del self._data[annotation_type]
self._data[cur_below] = cur_above
try:
del self.token_properties[annotation_type]
except KeyError:
pass
try:
del self.type_properties[annotation_type]
except KeyError:
pass
try:
del self.subset_types[annotation_type]
except KeyError:
pass
try:
del self.subset_tokens[annotation_type]
except KeyError:
pass
if annotation_type in self.subannotations:
for s in self.subannotations[annotation_type]:
del self.subannotation_properties[s]
del self.subannotations[annotation_type]
def add_type_properties(self, corpus_context, annotation_type, properties):
"""
Adds type properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add type properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 'n.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
SET {sets}""".format(type=annotation_type, sets=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
if annotation_type not in self.type_properties:
self.type_properties[annotation_type] = {('id', str)}
self.type_properties[annotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_type_properties(self, corpus_context, annotation_type, properties):
"""
Removes type properties for an annotation type and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to remove type properties for
properties : iterable
List of property names to remove
"""
remove_template = 'n.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
REMOVE {removes}""".format(type=annotation_type, removes=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
if annotation_type not in self.type_properties:
self.type_properties[annotation_type] = {('id', str)}
to_remove = set(x for x in self.type_properties[annotation_type] if x[0] in properties)
self.type_properties[annotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_acoustic_properties(self, corpus_context, acoustic_type, properties):
"""
Add acoustic properties to an encoded acoustic measure. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
acoustic_type : str
Acoustic measure to add properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 'n.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:has_acoustics]->(n:{type})
SET {sets}""".format(type=acoustic_type, sets=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
if acoustic_type not in self.acoustic_properties:
self.acoustic_properties[acoustic_type] = set()
self.acoustic_properties[acoustic_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_acoustic_properties(self, corpus_context, acoustic_type, properties):
"""
Remove acoustic properties to an encoded acoustic measure.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
acoustic_type : str
Acoustic measure to remove properties for
properties : iterable
List of property names
"""
remove_template = 'n.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:has_acoustics]->(n:{type})
REMOVE {removes}""".format(type=acoustic_type, removes=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
if acoustic_type not in self.acoustic_properties:
self.acoustic_properties[acoustic_type] = {}
to_remove = set(x for x in self.acoustic_properties[acoustic_type] if x[0] in properties)
self.acoustic_properties[acoustic_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_token_properties(self, corpus_context, annotation_type, properties):
"""
Adds token properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add token properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 'n.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
SET {sets}""".format(type=annotation_type, sets=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
if annotation_type not in self.token_properties:
self.token_properties[annotation_type] = {('id', str)}
self.token_properties[annotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_token_properties(self, corpus_context, annotation_type, properties):
"""
Removes token properties for an annotation type and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to remove token properties for
properties : iterable
List of property names to remove
"""
remove_template = 'n.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
REMOVE {removes}""".format(type=annotation_type, removes=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
if annotation_type not in self.token_properties:
self.token_properties[annotation_type] = {('id', str)}
to_remove = set(x for x in self.token_properties[annotation_type] if x[0] in properties)
self.token_properties[annotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_speaker_properties(self, corpus_context, properties):
"""
Adds speaker properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 's.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_by]->(s:Speaker)
SET {sets}""".format(sets=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
to_add_names = [x[0] for x in properties]
self.speaker_properties = {x for x in self.speaker_properties if x[0] not in to_add_names}
self.speaker_properties.update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_speaker_properties(self, corpus_context, properties):
"""
Removes speaker properties and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
List of property names to remove
"""
remove_template = 's.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_by]->(s:Speaker)
REMOVE {removes}""".format(removes=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
to_remove = set(x for x in self.speaker_properties if x[0] in properties)
self.speaker_properties.difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_discourse_properties(self, corpus_context, properties):
"""
Adds discourse properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 'd.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_in]->(d:Discourse)
SET {sets}""".format(sets=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
to_add_names = [x[0] for x in properties]
self.discourse_properties = {x for x in self.discourse_properties if x[0] not in to_add_names}
self.discourse_properties.update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_discourse_properties(self, corpus_context, properties):
"""
Removes discourse properties and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
List of property names to remove
"""
remove_template = 'd.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_in]->(d:Discourse)
REMOVE {removes}""".format(removes=', '.join(ps))
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
to_remove = set(x for x in self.discourse_properties if x[0] in properties)
self.discourse_properties.difference_update(to_remove)
corpus_context.cache_hierarchy()
def keys(self):
"""
Keys (linguistic types) of the hierarchy.
Returns
-------
generator
Keys of the hierarchy
"""
return self._data.keys()
def values(self):
"""
Values (containing types) of the hierarchy.
Returns
-------
generator
Values of the hierarchy
"""
return self._data.values()
def items(self):
"""
Key/value pairs for the hierarchy.
Returns
-------
generator
Items of the hierarchy
"""
return self._data.items()
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
def __delitem__(self, key):
del self._data[key]
for k, v in self._data.items():
if v == key:
self._data[k] = None
def __contains__(self, item):
return item in self._data
def update(self, other):
"""
Merge Hierarchies together. If other is a dictionary, then only
the hierarchical data is updated.
Parameters
----------
other : Hierarchy or dict
Data to be merged in
"""
if isinstance(other, dict):
self._data.update(other)
else:
self._data.update(other._data)
self.subannotations.update(other.subannotations)
self.subannotation_properties.update(other.subannotation_properties)
for k, v in other.subannotation_properties.items():
if k not in self.subannotation_properties:
self.subannotation_properties[k] = v
else:
self.subannotation_properties[k] = self.subannotation_properties[k] & v
for k, v in other.type_properties.items():
if k not in self.type_properties.items():
self.type_properties[k] = v
else:
self.type_properties[k] = self.type_properties[k] & v
for k, v in other.token_properties.items():
if k not in self.token_properties.items():
self.token_properties[k] = other.token_properties[k]
else:
self.token_properties[k] = self.token_properties[k] & other.token_properties[k]
self.speaker_properties.update(other.speaker_properties)
self.discourse_properties.update(other.discourse_properties)
@property
def lowest(self):
"""
Get the lowest annotation type of the Hierarchy
Returns
-------
str
Lowest annotation type
"""
for k in self.keys():
if k not in self.values():
return k
@property
def highest(self):
"""
Get the highest annotation type of the Hierarchy
Returns
-------
str
Highest annotation type
"""
for k, v in self.items():
if v is None:
return k
@property
def highest_to_lowest(self):
"""
Get a list of annotation types sorted from highest to lowest
Returns
-------
list
Annotation types from highest to lowest
"""
ats = [self.highest]
while len(ats) < len(self.keys()):
for k, v in self.items():
if v == ats[-1]:
ats.append(k)
break
return ats
@property
def lowest_to_highest(self):
"""
Get a list of annotation types sorted from lowest to highest
Returns
-------
list
Annotation types from lowest to highest
"""
ats = [self.lowest]
while len(ats) < len(self.keys()):
ats.append(self[ats[-1]])
return ats
def get_lower_types(self, annotation_type):
"""
Get all annotation types that are lower than the specified annotation type
Parameters
----------
annotation_type : str
Annotation type from which to get lower annotation types
Returns
-------
list
List of all annotation types that are lower the specified annotation type
"""
lower = []
found = False
for t in self.highest_to_lowest:
if t == annotation_type:
found = True
continue
if found:
lower.append(t)
return lower
def get_higher_types(self, annotation_type):
"""
Get all annotation types that are higher than the specified annotation type
Parameters
----------
annotation_type : str
Annotation type from which to get higher annotation types
Returns
-------
list
List of all annotation types that are higher the specified annotation type
"""
higher = []
found = False
for t in self.lowest_to_highest:
if t == annotation_type:
found = True
continue
if found:
higher.append(t)
return higher
def has_subannotation_type(self, subannotation_type):
"""
Check whether the Hierarchy has a subannotation type
Parameters
----------
subannotation_type : str
Name of subannotation to check for
Returns
-------
bool
True if subannotation type is present
"""
return subannotation_type in self.subannotation_properties
def has_subannotation_property(self, subannotation_type, property_name):
"""
Check whether the Hierarchy has a property associated with a subannotation type
Parameters
----------
subannotation_type : str
Name of subannotation to check
property_name : str
Name of the property to check for
Returns
-------
bool
True if subannotation type has the given property name
"""
if not self.has_subannotation_type(subannotation_type):
return False
return property_name in [x[0] for x in self.subannotation_properties[subannotation_type]]
def add_subannotation_type(self, corpus_context, annotation_type, subannotation_type, properties=None):
"""
Adds subannotation type for a given annotation type to the Hierarchy object and syncs it to a Neo4j database.
The list of optional properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add a subannotation to
subannotation_type : str
Name of the subannotation type
properties : iterable
Optional iterable of tuples of the form (property_name, Type)
"""
if properties is None:
properties = []
if subannotation_type in self.subannotation_properties:
raise (HierarchyError('The subannotation_type {} is already specified for another linguistic type.'
' Please use a different name.'.format(subannotation_type)))
if annotation_type not in self.subannotations:
self.subannotations[annotation_type] = set()
self.subannotations[annotation_type].add(subannotation_type)
self.subannotation_properties[subannotation_type] = set(k for k in properties)
if properties:
set_template = 's.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name
WITH a
CREATE (a)<-[:annotates]-(s:{s_type})
WITH s
SET {sets}""".format(sets=', '.join(ps), a_type= annotation_type, s_type=subannotation_type)
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
else:
statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name
WITH a
MERGE (a)<-[:annotates]-(s:{s_type})""".format(a_type= annotation_type, s_type=subannotation_type)
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
corpus_context.cache_hierarchy()
def remove_subannotation_type(self, corpus_context, subannotation_type):
"""
Remove a subannotation type from the Hierarchy object and sync it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Subannotation type to remove
"""
try:
del self.subannotation_properties[subannotation_type]
except KeyError:
pass
for k, v in self.subannotations.items():
if subannotation_type in v:
self.subannotations[k] = v - {subannotation_type}
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
DETACH DELETE s""".format(s_type=subannotation_type)
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
corpus_context.cache_hierarchy()
def add_subannotation_properties(self, corpus_context, subannotation_type, properties):
"""
Adds properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database.
The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Name of the subannotation type
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = 's.{0} = ${0}'
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ''
elif v == bool:
v = False
elif v == type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
SET {sets}""".format(sets=', '.join(ps), s_type=subannotation_type)
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name, **kwargs)
self.subannotation_properties[subannotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_subannotation_properties(self, corpus_context, subannotation_type, properties):
"""
Removes properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Name of the subannotation type
properties : iterable
List of property names to remove
"""
remove_template = 's.{0}'
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
REMOVE {removes}""".format(removes=', '.join(ps), s_type=subannotation_type)
corpus_context.execute_cypher(statement,
corpus_name=corpus_context.corpus_name)
to_remove = set(x for x in self.subannotation_properties[subannotation_type] if x[0] in properties)
self.subannotation_properties[subannotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def has_speaker_property(self, key):
"""
Check for whether speakers have a given property
Parameters
----------
key : str
Property to check for
Returns
-------
bool
True if speakers have the given property
"""
for name, t in self.speaker_properties:
if name == key:
return True
return False
def has_discourse_property(self, key):
"""
Check for whether discourses have a given property
Parameters
----------
key : str
Property to check for
Returns
-------
bool
True if discourses have the given property
"""
for name, t in self.discourse_properties:
if name == key:
return True
return False
def has_token_property(self, annotation_type, key):
"""
Check whether a given annotation type has a given token property.
Parameters
----------
annotation_type : str
Annotation type to check for the given token property
key : str
Property to check for
Returns
-------
bool
True if the annotation type has the given token property
"""
if annotation_type not in self.token_properties:
return False
for name, t in self.token_properties[annotation_type]:
if name == key:
return True
return False
def has_type_property(self, annotation_type, key):
"""
Check whether a given annotation type has a given type property.
Parameters
----------
annotation_type : str
Annotation type to check for the given type property
key : str
Property to check for
Returns
-------
bool
True if the annotation type has the given type property
"""
if annotation_type not in self.type_properties:
return False
for name, t in self.type_properties[annotation_type]:
if name == key:
return True
return False
def has_type_subset(self, annotation_type, key):
"""
Check whether a given annotation type has a given type subset.
Parameters
----------
annotation_type : str
Annotation type to check for the given type subset
key : str
Subset to check for
Returns
-------
bool
True if the annotation type has the given type subset
"""
if annotation_type not in self.subset_types:
return False
return key in self.subset_types[annotation_type]
def has_token_subset(self, annotation_type, key):
"""
Check whether a given annotation type has a given token subset.
Parameters
----------
annotation_type : str
Annotation type to check for the given token subset
key : str
Subset to check for
Returns
-------
bool
True if the annotation type has the given token subset
"""
if annotation_type not in self.subset_tokens:
return False
return key in self.subset_tokens[annotation_type]
@property
def word_name(self):
"""
Shortcut for returning the annotation type matching "word"
Returns
-------
str or None
Annotation type that begins with "word"
"""
for at in self.annotation_types:
if at.startswith('word'):
return at
return None
@property
def phone_name(self):
"""
Alias function for getting the lowest annotation type
Returns
-------
str
Name of the lowest annotation type
"""
return self.lowest