from polyglotdb.exceptions import GraphQueryError, HierarchyError
from polyglotdb.query.annotations.attributes import AnnotationNode, PauseAnnotation
[docs]
class Hierarchy(object):
"""
Class containing information about how a corpus is structured.
Hierarchical data is stored in the form of a dictionary with keys
for linguistic types, and values for the linguistic type that contains
them. If no other type contains a given type, its value is ``None``.
Subannotation data is stored in the form of a dictionary with keys
for linguistic types, and values of sets of types of subannotations.
Parameters
----------
data : dict
Information about the hierarchy of linguistic types
corpus_name : str
Name of the corpus
"""
get_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
RETURN n.subsets as subsets"""
set_type_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
SET n.subsets = $subsets"""
get_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
RETURN n.subsets as subsets"""
set_token_subset_template = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
SET n.subsets = $subsets"""
def __init__(self, data=None, corpus_name=None):
if data is None:
data = {}
self._data = data
self.corpus_name = corpus_name
self.subannotations = {}
self.subannotation_properties = {}
self.subset_types = {}
self.token_properties = {}
self.subset_tokens = {}
self.type_properties = {}
self.acoustic_properties = {}
self.speaker_properties = {("name", str)}
self.discourse_properties = {
("name", str),
("file_path", str),
("low_freq_file_path", str),
("vowel_file_path", str),
("consonant_file_path", str),
("duration", float),
("sampling_rate", int),
("num_channels", int),
}
def __getattr__(self, key):
if key == "pause":
return PauseAnnotation(corpus=self.corpus_name, hierarchy=self)
if key + "s" in self.annotation_types:
key += "s" # FIXME
if key in self.annotation_types:
return AnnotationNode(key, corpus=self.corpus_name, hierarchy=self)
raise (
GraphQueryError(
"The graph does not have any annotations of type '{}'. Possible types are: {}".format(
key, ", ".join(sorted(self.annotation_types))
)
)
)
def __getstate__(self):
return self.to_json()
def __setstate__(self, state):
self.from_json(state)
def __str__(self):
return str(self.to_json())
@property
def annotation_types(self):
"""
Get a list of all the annotation types in the hierarchy
Returns
-------
list
All annotation types in the hierarchy
"""
return list(self._data.keys())
@property
def acoustics(self):
"""
Get all currently encoded acoustic measurements in the corpus
Returns
-------
list
All encoded acoustic measures
"""
return sorted(self.acoustic_properties.keys())
def to_json(self):
"""
Convert the Hierarchy object to a dictionary for JSON serialization
Returns
-------
dict
All necessary information for the Hierarchy object
"""
data = {"_data": self._data}
data["corpus_name"] = self.corpus_name
data["acoustic_properties"] = {
k: sorted((name, t()) for name, t in v) for k, v in self.acoustic_properties.items()
}
data["subannotations"] = {k: sorted(v) for k, v in self.subannotations.items()}
data["subannotation_properties"] = {
k: sorted((name, t()) for name, t in v)
for k, v in self.subannotation_properties.items()
}
data["subset_types"] = {k: sorted(v) for k, v in self.subset_types.items()}
data["subset_tokens"] = {k: sorted(v) for k, v in self.subset_tokens.items()}
data["token_properties"] = {
k: sorted((name, t()) for name, t in v) for k, v in self.token_properties.items()
}
data["type_properties"] = {
k: sorted((name, t()) for name, t in v) for k, v in self.type_properties.items()
}
data["speaker_properties"] = sorted((name, t()) for name, t in self.speaker_properties)
data["discourse_properties"] = sorted((name, t()) for name, t in self.discourse_properties)
return data
def from_json(self, json):
"""
Set all properties from a dictionary deserialized from JSON
Parameters
----------
json : dict
Object information
"""
self._data = json["_data"]
self.corpus_name = json["corpus_name"]
self.acoustic_properties = {
k: set((name, type(t)) for name, t in v)
for k, v in json.get("acoustic_properties", {}).items()
}
self.subannotations = {k: set(v) for k, v in json["subannotations"].items()}
self.subannotation_properties = {
k: set((name, type(t)) for name, t in v)
for k, v in json["subannotation_properties"].items()
}
self.subset_types = {k: set(v) for k, v in json["subset_types"].items()}
self.subset_tokens = {k: set(v) for k, v in json["subset_tokens"].items()}
self.token_properties = {
k: set((name, type(t)) for name, t in v) for k, v in json["token_properties"].items()
}
self.type_properties = {
k: set((name, type(t)) for name, t in v) for k, v in json["type_properties"].items()
}
self.speaker_properties = set((name, type(t)) for name, t in json["speaker_properties"])
self.discourse_properties = set(
(name, type(t)) for name, t in json["discourse_properties"]
)
def add_type_subsets(self, corpus_context, annotation_type, subsets):
"""
Adds type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to add subsets for
subsets : iterable
List of subsets to add for the annotation type
"""
statement = self.get_type_subset_template.format(type=annotation_type)
res = list(
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
)
try:
cur_subsets = res[0]["subsets"]
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets + subsets)
statement = self.set_type_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(
statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name
)
self.subset_types[annotation_type] = updated
corpus_context.cache_hierarchy()
def remove_type_subsets(self, corpus_context, annotation_type, subsets):
"""
Removes type subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to remove subsets for
subsets : iterable
List of subsets to remove for the annotation type
"""
statement = self.get_type_subset_template.format(type=annotation_type)
res = list(
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
)
try:
cur_subsets = res[0]["subsets"]
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets) - set(subsets)
statement = self.set_type_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(
statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name
)
self.subset_types[annotation_type] = updated
corpus_context.cache_hierarchy()
def add_token_subsets(self, corpus_context, annotation_type, subsets):
"""
Adds token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to add subsets for
subsets : iterable
List of subsets to add for the annotation tokens
"""
statement = self.get_token_subset_template.format(type=annotation_type)
res = list(
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
)
try:
cur_subsets = res[0]["subsets"]
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets + subsets)
statement = self.set_token_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(
statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name
)
self.subset_tokens[annotation_type] = updated
corpus_context.cache_hierarchy()
def remove_token_subsets(self, corpus_context, annotation_type, subsets):
"""
Removes token subsets to the Hierarchy object for a corpus, and syncs it to the hierarchy schema in a Neo4j database
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type: str
Annotation type to remove subsets for
subsets : iterable
List of subsets to remove for the annotation tokens
"""
statement = self.get_token_subset_template.format(type=annotation_type)
res = list(
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
)
try:
cur_subsets = res[0]["subsets"]
except (IndexError, AttributeError):
cur_subsets = []
updated = set(cur_subsets) - set(subsets)
statement = self.set_token_subset_template.format(type=annotation_type)
corpus_context.execute_cypher(
statement, subsets=sorted(updated), corpus_name=corpus_context.corpus_name
)
self.subset_tokens[annotation_type] = updated
corpus_context.cache_hierarchy()
def add_annotation_type(self, annotation_type, above=None, below=None):
"""
Adds an annotation type to the Hierarchy object along with default type and token properties for the new
annotation type
Parameters
----------
annotation_type : str
Annotation type to add
above : str
Annotation type that is contained by the new annotation type, leave out if new annotation type is at the bottom
of the hierarchy
below : str
Annotation type that contains the new annotation type, leave out if new annotation type is at the top
of the hierarchy
"""
self._data[above] = annotation_type
self._data[annotation_type] = below
self.token_properties[annotation_type] = {
("id", str),
("label", str),
("begin", float),
("end", float),
("duration", float),
}
self.type_properties[annotation_type] = {("label", str)}
def remove_annotation_type(self, annotation_type):
"""
Removes an annotation type from the hierarchy
Parameters
----------
annotation_type : str
Annotation type to remove
"""
cur_above = self._data[annotation_type]
cur_below = [k for k, v in self._data.items() if v == annotation_type][0]
del self._data[annotation_type]
self._data[cur_below] = cur_above
try:
del self.token_properties[annotation_type]
except KeyError:
pass
try:
del self.type_properties[annotation_type]
except KeyError:
pass
try:
del self.subset_types[annotation_type]
except KeyError:
pass
try:
del self.subset_tokens[annotation_type]
except KeyError:
pass
if annotation_type in self.subannotations:
for s in self.subannotations[annotation_type]:
del self.subannotation_properties[s]
del self.subannotations[annotation_type]
def add_type_properties(self, corpus_context, annotation_type, properties):
"""
Adds type properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add type properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "n.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
SET {sets}""".format(
type=annotation_type, sets=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
if annotation_type not in self.type_properties:
self.type_properties[annotation_type] = {("id", str)}
self.type_properties[annotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_type_properties(self, corpus_context, annotation_type, properties):
"""
Removes type properties for an annotation type and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to remove type properties for
properties : iterable
List of property names to remove
"""
remove_template = "n.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a:{type})-[:is_a]->(n:{type}_type)
REMOVE {removes}""".format(
type=annotation_type, removes=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
if annotation_type not in self.type_properties:
self.type_properties[annotation_type] = {("id", str)}
to_remove = set(x for x in self.type_properties[annotation_type] if x[0] in properties)
self.type_properties[annotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_acoustic_properties(self, corpus_context, acoustic_type, properties):
"""
Add acoustic properties to an encoded acoustic measure. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
acoustic_type : str
Acoustic measure to add properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "n.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:has_acoustics]->(n:{type})
SET {sets}""".format(
type=acoustic_type, sets=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
if acoustic_type not in self.acoustic_properties:
self.acoustic_properties[acoustic_type] = set()
self.acoustic_properties[acoustic_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_acoustic_properties(self, corpus_context, acoustic_type, properties):
"""
Remove acoustic properties to an encoded acoustic measure.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
acoustic_type : str
Acoustic measure to remove properties for
properties : iterable
List of property names
"""
remove_template = "n.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:has_acoustics]->(n:{type})
REMOVE {removes}""".format(
type=acoustic_type, removes=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
if acoustic_type not in self.acoustic_properties:
self.acoustic_properties[acoustic_type] = {}
to_remove = set(x for x in self.acoustic_properties[acoustic_type] if x[0] in properties)
self.acoustic_properties[acoustic_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_token_properties(self, corpus_context, annotation_type, properties):
"""
Adds token properties for an annotation type and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add token properties for
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "n.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
SET {sets}""".format(
type=annotation_type, sets=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
if annotation_type not in self.token_properties:
self.token_properties[annotation_type] = {("id", str)}
self.token_properties[annotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_token_properties(self, corpus_context, annotation_type, properties):
"""
Removes token properties for an annotation type and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to remove token properties for
properties : iterable
List of property names to remove
"""
remove_template = "n.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(n:{type})
REMOVE {removes}""".format(
type=annotation_type, removes=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
if annotation_type not in self.token_properties:
self.token_properties[annotation_type] = {("id", str)}
to_remove = set(x for x in self.token_properties[annotation_type] if x[0] in properties)
self.token_properties[annotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_speaker_properties(self, corpus_context, properties):
"""
Adds speaker properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "s.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_by]->(s:Speaker)
SET {sets}""".format(
sets=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
to_add_names = [x[0] for x in properties]
self.speaker_properties = {x for x in self.speaker_properties if x[0] not in to_add_names}
self.speaker_properties.update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_speaker_properties(self, corpus_context, properties):
"""
Removes speaker properties and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
List of property names to remove
"""
remove_template = "s.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_by]->(s:Speaker)
REMOVE {removes}""".format(
removes=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
to_remove = set(x for x in self.speaker_properties if x[0] in properties)
self.speaker_properties.difference_update(to_remove)
corpus_context.cache_hierarchy()
def add_discourse_properties(self, corpus_context, properties):
"""
Adds discourse properties to the Hierarchy object and syncs it to a Neo4j database. The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "d.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_in]->(d:Discourse)
SET {sets}""".format(
sets=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
to_add_names = [x[0] for x in properties]
self.discourse_properties = {
x for x in self.discourse_properties if x[0] not in to_add_names
}
self.discourse_properties.update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_discourse_properties(self, corpus_context, properties):
"""
Removes discourse properties and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
properties : iterable
List of property names to remove
"""
remove_template = "d.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)-[:spoken_in]->(d:Discourse)
REMOVE {removes}""".format(
removes=", ".join(ps)
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
to_remove = set(x for x in self.discourse_properties if x[0] in properties)
self.discourse_properties.difference_update(to_remove)
corpus_context.cache_hierarchy()
def keys(self):
"""
Keys (linguistic types) of the hierarchy.
Returns
-------
generator
Keys of the hierarchy
"""
return self._data.keys()
def values(self):
"""
Values (containing types) of the hierarchy.
Returns
-------
generator
Values of the hierarchy
"""
return self._data.values()
def items(self):
"""
Key/value pairs for the hierarchy.
Returns
-------
generator
Items of the hierarchy
"""
return self._data.items()
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
def __delitem__(self, key):
del self._data[key]
for k, v in self._data.items():
if v == key:
self._data[k] = None
def __contains__(self, item):
return item in self._data
def update(self, other):
"""
Merge Hierarchies together. If other is a dictionary, then only
the hierarchical data is updated.
Parameters
----------
other : Hierarchy or dict
Data to be merged in
"""
if isinstance(other, dict):
self._data.update(other)
else:
self._data.update(other._data)
self.subannotations.update(other.subannotations)
self.subannotation_properties.update(other.subannotation_properties)
for k, v in other.subannotation_properties.items():
if k not in self.subannotation_properties:
self.subannotation_properties[k] = v
else:
self.subannotation_properties[k] = self.subannotation_properties[k] & v
for k, v in other.type_properties.items():
if k not in self.type_properties.items():
self.type_properties[k] = v
else:
self.type_properties[k] = self.type_properties[k] & v
for k, v in other.token_properties.items():
if k not in self.token_properties.items():
self.token_properties[k] = other.token_properties[k]
else:
self.token_properties[k] = self.token_properties[k] & other.token_properties[k]
self.speaker_properties.update(other.speaker_properties)
self.discourse_properties.update(other.discourse_properties)
@property
def lowest(self):
"""
Get the lowest annotation type of the Hierarchy
Returns
-------
str
Lowest annotation type
"""
for k in self.keys():
if k not in self.values():
return k
@property
def highest(self):
"""
Get the highest annotation type of the Hierarchy
Returns
-------
str
Highest annotation type
"""
for k, v in self.items():
if v is None:
return k
@property
def highest_to_lowest(self):
"""
Get a list of annotation types sorted from highest to lowest
Returns
-------
list
Annotation types from highest to lowest
"""
ats = [self.highest]
while len(ats) < len(self.keys()):
for k, v in self.items():
if v == ats[-1]:
ats.append(k)
break
return ats
@property
def lowest_to_highest(self):
"""
Get a list of annotation types sorted from lowest to highest
Returns
-------
list
Annotation types from lowest to highest
"""
ats = [self.lowest]
while len(ats) < len(self.keys()):
ats.append(self[ats[-1]])
return ats
def get_lower_types(self, annotation_type):
"""
Get all annotation types that are lower than the specified annotation type
Parameters
----------
annotation_type : str
Annotation type from which to get lower annotation types
Returns
-------
list
List of all annotation types that are lower the specified annotation type
"""
lower = []
found = False
for t in self.highest_to_lowest:
if t == annotation_type:
found = True
continue
if found:
lower.append(t)
return lower
def get_higher_types(self, annotation_type):
"""
Get all annotation types that are higher than the specified annotation type
Parameters
----------
annotation_type : str
Annotation type from which to get higher annotation types
Returns
-------
list
List of all annotation types that are higher the specified annotation type
"""
higher = []
found = False
for t in self.lowest_to_highest:
if t == annotation_type:
found = True
continue
if found:
higher.append(t)
return higher
def has_subannotation_type(self, subannotation_type):
"""
Check whether the Hierarchy has a subannotation type
Parameters
----------
subannotation_type : str
Name of subannotation to check for
Returns
-------
bool
True if subannotation type is present
"""
return subannotation_type in self.subannotation_properties
def has_subannotation_property(self, subannotation_type, property_name):
"""
Check whether the Hierarchy has a property associated with a subannotation type
Parameters
----------
subannotation_type : str
Name of subannotation to check
property_name : str
Name of the property to check for
Returns
-------
bool
True if subannotation type has the given property name
"""
if not self.has_subannotation_type(subannotation_type):
return False
return property_name in [x[0] for x in self.subannotation_properties[subannotation_type]]
def add_subannotation_type(
self, corpus_context, annotation_type, subannotation_type, properties=None
):
"""
Adds subannotation type for a given annotation type to the Hierarchy object and syncs it to a Neo4j database.
The list of optional properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
annotation_type : str
Annotation type to add a subannotation to
subannotation_type : str
Name of the subannotation type
properties : iterable
Optional iterable of tuples of the form (property_name, Type)
"""
if properties is None:
properties = []
if subannotation_type in self.subannotation_properties:
raise (
HierarchyError(
"The subannotation_type {} is already specified for another linguistic type."
" Please use a different name.".format(subannotation_type)
)
)
if annotation_type not in self.subannotations:
self.subannotations[annotation_type] = set()
self.subannotations[annotation_type].add(subannotation_type)
self.subannotation_properties[subannotation_type] = set(k for k in properties)
if properties:
set_template = "s.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name
WITH a
CREATE (a)<-[:annotates]-(s:{s_type})
WITH s
SET {sets}""".format(
sets=", ".join(ps), a_type=annotation_type, s_type=subannotation_type
)
corpus_context.execute_cypher(
statement, corpus_name=corpus_context.corpus_name, **kwargs
)
else:
statement = """MATCH (c:Corpus), (c)<-[:contained_by*]-(a:{a_type}) WHERE c.name = $corpus_name
WITH a
MERGE (a)<-[:annotates]-(s:{s_type})""".format(
a_type=annotation_type, s_type=subannotation_type
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
corpus_context.cache_hierarchy()
def remove_subannotation_type(self, corpus_context, subannotation_type):
"""
Remove a subannotation type from the Hierarchy object and sync it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Subannotation type to remove
"""
try:
del self.subannotation_properties[subannotation_type]
except KeyError:
pass
for k, v in self.subannotations.items():
if subannotation_type in v:
self.subannotations[k] = v - {subannotation_type}
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
DETACH DELETE s""".format(
s_type=subannotation_type
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
corpus_context.cache_hierarchy()
def add_subannotation_properties(self, corpus_context, subannotation_type, properties):
"""
Adds properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database.
The list of properties are tuples
of the form (property_name, Type), where ``property_name`` is a string and ``Type`` is a Python type class, like
``bool``, ``str``, ``list``, or ``float``.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Name of the subannotation type
properties : iterable
Iterable of tuples of the form (property_name, Type)
"""
set_template = "s.{0} = ${0}"
ps = []
kwargs = {}
for k, v in properties:
if v == int:
v = 0
elif v == list:
v = []
elif v == float:
v = 0.0
elif v == str:
v = ""
elif v == bool:
v = False
elif v is type(None):
v = None
ps.append(set_template.format(k))
kwargs[k] = v
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
SET {sets}""".format(
sets=", ".join(ps), s_type=subannotation_type
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name, **kwargs)
self.subannotation_properties[subannotation_type].update(k for k in properties)
corpus_context.cache_hierarchy()
def remove_subannotation_properties(self, corpus_context, subannotation_type, properties):
"""
Removes properties for a subannotation type to the Hierarchy object and syncs it to a Neo4j database.
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.CorpusContext`
CorpusContext to use for updating Neo4j database
subannotation_type : str
Name of the subannotation type
properties : iterable
List of property names to remove
"""
remove_template = "s.{0}"
ps = []
for k in properties:
ps.append(remove_template.format(k))
statement = """MATCH (c:Corpus) WHERE c.name = $corpus_name
MATCH (c)<-[:contained_by*]-(a)<-[:annotates]-(s:{s_type})
REMOVE {removes}""".format(
removes=", ".join(ps), s_type=subannotation_type
)
corpus_context.execute_cypher(statement, corpus_name=corpus_context.corpus_name)
to_remove = set(
x for x in self.subannotation_properties[subannotation_type] if x[0] in properties
)
self.subannotation_properties[subannotation_type].difference_update(to_remove)
corpus_context.cache_hierarchy()
def has_speaker_property(self, key):
"""
Check for whether speakers have a given property
Parameters
----------
key : str
Property to check for
Returns
-------
bool
True if speakers have the given property
"""
for name, t in self.speaker_properties:
if name == key:
return True
return False
def has_discourse_property(self, key):
"""
Check for whether discourses have a given property
Parameters
----------
key : str
Property to check for
Returns
-------
bool
True if discourses have the given property
"""
for name, t in self.discourse_properties:
if name == key:
return True
return False
def has_token_property(self, annotation_type, key):
"""
Check whether a given annotation type has a given token property.
Parameters
----------
annotation_type : str
Annotation type to check for the given token property
key : str
Property to check for
Returns
-------
bool
True if the annotation type has the given token property
"""
if annotation_type not in self.token_properties:
return False
for name, t in self.token_properties[annotation_type]:
if name == key:
return True
return False
def has_type_property(self, annotation_type, key):
"""
Check whether a given annotation type has a given type property.
Parameters
----------
annotation_type : str
Annotation type to check for the given type property
key : str
Property to check for
Returns
-------
bool
True if the annotation type has the given type property
"""
if annotation_type not in self.type_properties:
return False
for name, t in self.type_properties[annotation_type]:
if name == key:
return True
return False
def has_type_subset(self, annotation_type, key):
"""
Check whether a given annotation type has a given type subset.
Parameters
----------
annotation_type : str
Annotation type to check for the given type subset
key : str
Subset to check for
Returns
-------
bool
True if the annotation type has the given type subset
"""
if annotation_type not in self.subset_types:
return False
return key in self.subset_types[annotation_type]
def has_token_subset(self, annotation_type, key):
"""
Check whether a given annotation type has a given token subset.
Parameters
----------
annotation_type : str
Annotation type to check for the given token subset
key : str
Subset to check for
Returns
-------
bool
True if the annotation type has the given token subset
"""
if annotation_type not in self.subset_tokens:
return False
return key in self.subset_tokens[annotation_type]
@property
def word_name(self):
"""
Shortcut for returning the annotation type matching "word"
Returns
-------
str or None
Annotation type that begins with "word"
"""
for at in self.annotation_types:
if at.startswith("word"):
return at
return None
@property
def phone_name(self):
"""
Alias function for getting the lowest annotation type
Returns
-------
str
Name of the lowest annotation type
"""
return self.lowest