import os
import shutil
import sys
from decimal import Decimal
from neo4j import GraphDatabase
from ..query.annotations.attributes import AnnotationNode, PauseAnnotation
from ..query.annotations import SplitQuery
from ..query.lexicon import LexiconQuery, LexiconNode
from ..query.speaker import SpeakerQuery, SpeakerNode
from ..query.discourse import DiscourseQuery, DiscourseNode
from ..config import CorpusConfig
from ..exceptions import (CorpusConfigError, GraphQueryError)
from ..structure import Hierarchy
[docs]
class BaseContext(object):
"""
Base CorpusContext class. Inherit from this and extend to create
more functionality.
Parameters
----------
*args
If the first argument is not a :class:`~polyglotdb.config.CorpusConfig` object, it is
the name of the corpus
**kwargs
If a :class:`~polyglotdb.config.CorpusConfig` object is not specified, all arguments and
keyword arguments are passed to a CorpusConfig object
"""
def __init__(self, *args, **kwargs):
if len(args) == 0:
raise (CorpusConfigError('Need to specify a corpus name or CorpusConfig.'))
if isinstance(args[0], CorpusConfig):
self.config = args[0]
else:
self.config = CorpusConfig(*args, **kwargs)
self.graph_driver = GraphDatabase.driver(self.config.graph_connection_string)
self.corpus_name = self.config.corpus_name
self.hierarchy = Hierarchy({}, corpus_name=self.corpus_name)
self._has_sound_files = None
self._has_all_sound_files = None
if getattr(sys, 'frozen', False):
self.config.reaper_path = os.path.join(sys.path[-1], 'reaper')
else:
self.config.reaper_path = shutil.which('reaper')
if sys.platform == 'win32':
praat_exe = 'praatcon.exe'
else:
praat_exe = 'praat'
if getattr(sys, 'frozen', False):
self.config.praat_path = os.path.join(sys.path[-1], praat_exe)
else:
self.config.praat_path = shutil.which(praat_exe)
def exists(self):
"""
Check whether the corpus has a Hierarchy schema in the Neo4j database
Returns
-------
bool
True if the corpus Hierarchy has been saved to the database
"""
statement = '''MATCH (c:Corpus) where c.name = '{}' return c '''.format(self.corpus_name)
res = list(self.execute_cypher(statement))
return len(res) > 0
def execute_cypher(self, statement, **parameters):
"""
Executes a cypher query
Parameters
----------
statement : str
the cypher statement
parameters : kwargs
keyword arguments to execute a cypher statement
Returns
-------
:class:`~neo4j.BoltStatementResult`
Result of Cypher query
"""
from neo4j.exceptions import ServiceUnavailable
return_graph = False
if 'return_graph' in parameters:
return_graph = parameters.pop('return_graph')
for k, v in parameters.items():
if isinstance(v, Decimal):
parameters[k] = float(v)
try:
with self.graph_driver.session() as session:
if self.config.debug:
print('Statement:', statement)
print('Parameters:',parameters)
results = session.run(statement, **parameters)
if return_graph:
results = results.graph()
else:
results = results.data()
return results
except Exception as e:
raise
@property
def cypher_safe_name(self):
"""
Escape the corpus name for use in Cypher queries
Returns
-------
str
Corpus name made safe for Cypher
"""
return '`{}`'.format(self.corpus_name)
@property
def discourses(self):
"""
Gets a list of discourses in the corpus
Returns
-------
list
Discourse names in the corpus
"""
res = self.execute_cypher('''MATCH (d:Discourse:{corpus_name}) RETURN d.name as discourse'''.format(
corpus_name=self.cypher_safe_name))
return [x['discourse'] for x in res]
@property
def speakers(self):
"""
Gets a list of speakers in the corpus
Returns
-------
list
Speaker names in the corpus
"""
res = self.execute_cypher('''MATCH (s:Speaker:{corpus_name}) RETURN s.name as speaker'''.format(
corpus_name=self.cypher_safe_name))
return [x['speaker'] for x in res]
def __enter__(self):
if self.corpus_name:
if not os.path.exists(self.hierarchy_path):
self.hierarchy = self.generate_hierarchy()
self.cache_hierarchy()
else:
self.load_hierarchy()
return self
@property
def hierarchy_path(self):
"""
Get the path to cached hierarchy information
Returns
-------
str
Path to the cached hierarchy data on disk
"""
return os.path.join(self.config.base_dir, 'hierarchy')
def cache_hierarchy(self):
"""
Save corpus Hierarchy to the disk
"""
import json
with open(self.hierarchy_path, 'w', encoding='utf8') as f:
json.dump(self.hierarchy.to_json(), f)
def load_hierarchy(self):
"""
Load Hierarchy object from the cached version
"""
import json
with open(self.hierarchy_path, 'r', encoding='utf8') as f:
self.hierarchy = Hierarchy(corpus_name=self.corpus_name)
self.hierarchy.from_json(json.load(f))
def __exit__(self, exc_type, exc, exc_tb):
self.graph_driver.close()
if exc_type is None:
# try:
# shutil.rmtree(self.config.temp_dir)
# except:
# pass
return True
else:
return False
def __getattr__(self, key):
if key == 'speaker':
return SpeakerNode(corpus=self.corpus_name, hierarchy=self.hierarchy)
if key == 'discourse':
return DiscourseNode(corpus=self.corpus_name, hierarchy=self.hierarchy)
if key == 'pause':
return PauseAnnotation(corpus=self.corpus_name, hierarchy=self.hierarchy)
if key + 's' in self.hierarchy.annotation_types:
key += 's' # FIXME
if key in self.hierarchy.annotation_types:
return AnnotationNode(key, corpus=self.corpus_name, hierarchy=self.hierarchy)
if key.startswith('lexicon_'):
key = key.split('_')[1]
if key in self.hierarchy.annotation_types:
return LexiconNode(key, corpus=self.corpus_name, hierarchy=self.hierarchy)
raise (GraphQueryError(
'The graph does not have any annotations of type \'{}\'. Possible types are: {}'.format(key, ', '.join(
sorted(self.hierarchy.annotation_types)))))
def encode_type_subset(self, annotation_type, annotation_labels, subset_label):
"""
Encode a type subset from labels of annotations
Parameters
----------
annotation_type : str
Annotation type of labels
annotation_labels : list
a list of labels of annotations to subset together
subset_label : str
the label for the subset
"""
ann = getattr(self, 'lexicon_' + annotation_type)
q = self.query_lexicon(ann).filter(ann.label.in_(annotation_labels))
q.create_subset(subset_label)
self.encode_hierarchy()
def reset_type_subset(self, annotation_type, subset_label):
"""
Reset and remove a type subset
Parameters
----------
annotation_type : str
Annotation type of the subset
subset_label : str
the label for the subset
"""
from ..exceptions import SubsetError
ann = getattr(self, 'lexicon_' + annotation_type)
try:
q = self.query_lexicon(ann.filter_by_subset(subset_label))
q.remove_subset(subset_label)
self.encode_hierarchy()
except SubsetError:
pass
@property
def word_name(self):
"""
Gets the word label
Returns
-------
str
word name
"""
for at in self.hierarchy.annotation_types:
if at.startswith('word'): # FIXME need a better way for storing word name
return at
return 'word'
@property
def phone_name(self):
"""
Gets the phone label
Returns
-------
str
phone name
"""
name = self.hierarchy.lowest
if name is None:
name = 'phone'
return name
def reset_graph(self, call_back=None, stop_check=None):
"""
Remove all nodes and relationships in the corpus.
"""
delete_statement = '''MATCH (n:{corpus}:{anno})-[:spoken_by]->(s:{corpus}:Speaker)
where s.name = $speaker
with n LIMIT 1000 DETACH DELETE n return count(n) as deleted_count'''
delete_type_statement = '''MATCH (n:{corpus}:{anno}_type)
with n LIMIT 1000 DETACH DELETE n return count(n) as deleted_count'''
if call_back is not None:
call_back('Resetting database...')
number = self.execute_cypher(
'''MATCH (n:{}) return count(*) as number '''.format(self.cypher_safe_name))['number']
call_back(0, number)
num_deleted = 0
for a in self.hierarchy.annotation_types:
if stop_check is not None and stop_check():
break
for s in self.speakers:
if stop_check is not None and stop_check():
break
deleted = 1000
while deleted > 0:
if stop_check is not None and stop_check():
break
deleted = self.execute_cypher(delete_statement.format(corpus=self.cypher_safe_name, anno=a),
speaker=s)[0]['deleted_count']
num_deleted += deleted
if call_back is not None:
call_back(num_deleted)
deleted = 1000
while deleted > 0:
if stop_check is not None and stop_check():
break
deleted = self.execute_cypher(
delete_type_statement.format(corpus=self.cypher_safe_name, anno=a))[0]['deleted_count']
num_deleted += deleted
if call_back is not None:
call_back(num_deleted)
self.execute_cypher('''MATCH (n:{}:Speaker) DETACH DELETE n '''.format(self.cypher_safe_name))
self.execute_cypher('''MATCH (n:{}:Discourse) DETACH DELETE n '''.format(self.cypher_safe_name))
self.reset_hierarchy()
self.execute_cypher('''MATCH (n:Corpus) where n.name = $corpus_name DELETE n ''', corpus_name=self.corpus_name)
self.hierarchy = Hierarchy(corpus_name=self.corpus_name)
self.cache_hierarchy()
def reset(self, call_back=None, stop_check=None):
"""
Reset the Neo4j and InfluxDB databases for a corpus
Parameters
----------
call_back : callable
Function to monitor progress
stop_check : callable
Function the check whether the process should terminate early
"""
self.reset_acoustics()
self.reset_graph(call_back, stop_check)
shutil.rmtree(self.config.base_dir, ignore_errors=True)
def query_graph(self, annotation_node):
"""
Start a query over the tokens of a specified annotation type (i.e. ``corpus.word``)
Parameters
----------
annotation_node : :class:`polyglotdb.query.attributes.AnnotationNode`
The type of annotation to look for in the corpus
Returns
-------
:class:`~polyglotdb.query.annotations.query.SplitQuery`
SplitQuery object
"""
if annotation_node.node_type not in self.hierarchy.annotation_types \
and annotation_node.node_type != 'pause': # FIXME make more general
raise (GraphQueryError(
'The graph does not have any annotations of type \'{}\'. Possible types are: {}'.format(
annotation_node.name, ', '.join(sorted(self.hierarchy.annotation_types)))))
return SplitQuery(self, annotation_node)
def query_lexicon(self, annotation_node):
"""
Start a query over types of a specified annotation type (i.e. ``corpus.lexicon_word``)
Parameters
----------
annotation_node : :class:`polyglotdb.query.attributes.AnnotationNode`
The type of annotation to look for in the corpus's lexicon
Returns
-------
:class:`~polyglotdb.query.lexicon.query.LexiconQuery`
LexiconQuery object
"""
if annotation_node.node_type not in self.hierarchy.annotation_types \
and annotation_node.node_type != 'pause': # FIXME make more general
raise (GraphQueryError(
'The graph does not have any annotations of type \'{}\'. Possible types are: {}'.format(
annotation_node.node_type, ', '.join(sorted(self.hierarchy.annotation_types)))))
return LexiconQuery(self, annotation_node)
def query_discourses(self):
"""
Start a query over discourses in the corpus
Returns
-------
:class:`~polyglotdb.query.discourse.query.DiscourseQuery`
DiscourseQuery object
"""
return DiscourseQuery(self)
def query_speakers(self):
"""
Start a query over speakers in the corpus
Returns
-------
:class:`~polyglotdb.query.speaker.query.SpeakerQuery`
SpeakerQuery object
"""
return SpeakerQuery(self)
@property
def annotation_types(self):
"""
Get a list of all the annotation types in the corpus's Hierarchy
Returns
-------
list
Annotation types
"""
return self.hierarchy.annotation_types
@property
def lowest_annotation(self):
"""
Returns the annotation type that is the lowest in the Hierarchy.
Returns
-------
str
Lowest annotation type in the Hierarchy
"""
return self.hierarchy.lowest
def remove_discourse(self, name):
"""
Remove the nodes and relationships associated with a single
discourse in the corpus.
Parameters
----------
name : str
Name of the discourse to remove
"""
if name not in self.discourses:
raise GraphQueryError('{} is not a discourse in this corpus.'.format(name))
d = self.discourse_sound_file(name)
if 'consonant_file_path' in d and d['consonant_file_path'] is not None and os.path.exists(d['consonant_file_path']):
directory = self.discourse_audio_directory(name)
if self.config.debug:
print('Removing', directory)
shutil.rmtree(directory, ignore_errors=True)
# Remove orphaned type nodes
for a in self.hierarchy.annotation_types:
# Remove tokens in discourse
statement = '''MATCH (d:{corpus_name}:Discourse)<-[:spoken_in]-(n:{corpus_name}:{atype})
WHERE d.name = $discourse
DETACH DELETE n'''.format(corpus_name=self.cypher_safe_name, atype=a)
if self.config.debug:
print(statement)
result = self.execute_cypher(statement, discourse=name)
if self.config.debug:
for r in result:
print('RESULT', r)
# Remove discourse node
statement = '''MATCH (d:{corpus_name}:Discourse)
WHERE d.name = $discourse
DETACH DELETE d'''.format(corpus_name=self.cypher_safe_name)
if self.config.debug:
print(statement)
result = self.execute_cypher(statement, discourse=name)
if self.config.debug:
for r in result:
print('RESULT', r)
for a in self.hierarchy.annotation_types:
statement = '''MATCH (t:{type}_type:{corpus_name})
WHERE NOT (t)<-[:is_a]-()
DETACH DELETE t'''.format(type=a, corpus_name=self.cypher_safe_name)
if self.config.debug:
print(statement)
result = self.execute_cypher(statement)
if self.config.debug:
for r in result:
print('RESULT', r)
# Remove orphaned speaker nodes
statement = '''MATCH (s:Speaker:{corpus_name})
WHERE NOT (s)<-[:spoken_by]-()
DETACH DELETE s'''.format(corpus_name=self.cypher_safe_name)
if self.config.debug:
print(statement)
result = self.execute_cypher(statement)
if self.config.debug:
for r in result:
print('RESULT', r)
@property
def phones(self):
"""
Get a list of all phone labels in the corpus.
Returns
-------
list
All phone labels in the corpus
"""
statement = '''MATCH (p:{phone_name}_type:{corpus_name}) return p.label as label'''.format(
phone_name=self.phone_name, corpus_name=self.cypher_safe_name)
results = self.execute_cypher(statement)
return [r['label'] for r in results]
@property
def words(self):
"""
Get a list of all word labels in the corpus.
Returns
-------
list
All word labels in the corpus
"""
statement = '''MATCH (p:{word_name}_type:{corpus_name}) return p.label as label'''.format(
word_name=self.word_name, corpus_name=self.cypher_safe_name)
results = self.execute_cypher(statement)
return [r['label'] for r in results]