import math
from datetime import datetime
from conch import analyze_segments
from conch.analysis.segments import SegmentMapping
from .helper import generate_pitch_function
from ..segments import generate_utterance_segments
from ...exceptions import SpeakerAttributeError
from ..classes import Track, TimePoint
from ..utils import PADDING
def analyze_utterance_pitch(corpus_context, utterance, source='praat', min_pitch=50, max_pitch=500,
**kwargs):
if isinstance(utterance, str):
utterance_id = utterance
else:
utterance_id = utterance.id
padding = kwargs.pop('padding', None)
if padding is None:
padding = PADDING
utt_type = corpus_context.hierarchy.highest
statement = '''MATCH (s:Speaker:{corpus_name})-[r:speaks_in]->(d:Discourse:{corpus_name}),
(u:{utt_type}:{corpus_name})-[:spoken_by]->(s),
(u)-[:spoken_in]->(d)
WHERE u.id = $utterance_id
RETURN u, d, r.channel as channel'''.format(corpus_name=corpus_context.cypher_safe_name,
utt_type=utt_type)
results = corpus_context.execute_cypher(statement, utterance_id=utterance_id)
segment_mapping = SegmentMapping()
for r in results:
channel = r['channel']
file_path = r['d']['vowel_file_path']
u = r['u']
segment_mapping.add_file_segment(file_path, u['begin'], u['end'], channel, padding=padding)
path = None
if source == 'praat':
path = corpus_context.config.praat_path
elif source == 'reaper':
path = corpus_context.config.reaper_path
pitch_function = generate_pitch_function(source, min_pitch, max_pitch, path=path)
track = Track()
for seg in segment_mapping:
output = pitch_function(seg)
for k, v in output.items():
if v['F0'] is None or v['F0'] <= 0:
continue
p = TimePoint(k)
p.add_value('F0', v['F0'])
track.add(p)
if 'pitch' not in corpus_context.hierarchy.acoustics:
corpus_context.hierarchy.add_acoustic_properties(corpus_context, 'pitch', [('F0', float)])
corpus_context.encode_hierarchy()
return track
def update_utterance_pitch_track(corpus_context, utterance, new_track):
from ...corpus.audio import s_to_ms, s_to_nano
if isinstance(utterance, str):
utterance_id = utterance
else:
utterance_id = utterance.id
today = datetime.utcnow()
utt_type = corpus_context.hierarchy.highest
phone_type = corpus_context.hierarchy.lowest
time_stamp = today.timestamp()
statement = '''MATCH (s:Speaker:{corpus_name})-[r:speaks_in]->(d:Discourse:{corpus_name}),
(u:{utt_type}:{corpus_name})-[:spoken_by]->(s),
(u)-[:spoken_in]->(d),
(p:{phone_type}:{corpus_name})-[:contained_by*]->(u)
WHERE u.id = $utterance_id
SET u.pitch_last_edited = $date
RETURN u, d, r.channel as channel, s, collect(p) as p'''.format(
corpus_name=corpus_context.cypher_safe_name,
utt_type=utt_type, phone_type=phone_type)
results = corpus_context.execute_cypher(statement, utterance_id=utterance_id, date=time_stamp)
for r in results:
channel = r['channel']
discourse = r['d']['name']
speaker = r['s']['name']
u = r['u']
phones = r['p']
client = corpus_context.acoustic_client()
query = '''DELETE from "pitch"
where "discourse" = '{}'
and "speaker" = '{}'
and "time" >= {}
and "time" <= {};'''.format(discourse, speaker, s_to_nano(u['begin']), s_to_nano(u['end']))
result = client.query(query)
data = []
for data_point in new_track:
speaker, discourse, channel = speaker, discourse, channel
time_point, value = data_point['time'], data_point['F0']
t_dict = {'speaker': speaker, 'discourse': discourse, 'channel': channel}
label = None
for i, p in enumerate(sorted(phones, key=lambda x: x['begin'])):
if p['begin'] > time_point:
break
label = p['label']
if i == len(phones) - 1:
break
else:
label = None
if label is None:
continue
fields = {'phone': label, 'utterance_id': u['id']}
try:
if value is None:
continue
value = float(value)
except TypeError:
continue
if value <= 0:
continue
fields['F0'] = value
d = {'measurement': 'pitch',
'tags': t_dict,
'time': s_to_ms(time_point),
'fields': fields
}
data.append(d)
client.write_points(data, batch_size=1000, time_precision='ms')
if 'pitch' not in corpus_context.hierarchy.acoustics:
corpus_context.hierarchy.acoustics.add('pitch')
corpus_context.encode_hierarchy()
return time_stamp
[docs]
def analyze_pitch(corpus_context,
source='praat',
algorithm='base',
call_back=None,
absolute_min_pitch=50,
absolute_max_pitch=500,
adjusted_octaves=1,
stop_check=None, multiprocessing=True):
"""
Parameters
----------
corpus_context : :class:`~polyglotdb.corpus.audio.AudioContext`
source : str
Program to use for analyzing pitch, either ``praat`` or ``reaper``
algorithm : str
Algorithm to use, ``base``, ``gendered``, or ``speaker_adjusted``
absolute_min_pitch : int
Absolute pitch floor
absolute_max_pitch : int
Absolute pitch ceiling
adjusted_octaves : int
How many octaves around the speaker's mean pitch to set the speaker adjusted pitch floor and ceiling
stop_check : callable
Function to check whether processing should stop early
call_back : callable
Function to report progress
multiprocessing : bool
Flag whether to use multiprocessing or threading
Returns
-------
"""
if not 'utterance' in corpus_context.hierarchy:
raise (Exception('Must encode utterances before pitch can be analyzed'))
segment_mapping = generate_utterance_segments(corpus_context, padding=PADDING).grouped_mapping('speaker')
num_speakers = len(segment_mapping)
path = None
if source == 'praat':
path = corpus_context.config.praat_path
# kwargs = {'silence_threshold': 0.03,
# 'voicing_threshold': 0.45, 'octave_cost': 0.01, 'octave_jump_cost': 0.35,
# 'voiced_unvoiced_cost': 0.14}
elif source == 'reaper':
path = corpus_context.config.reaper_path
# kwargs = None
pitch_function = generate_pitch_function(source, absolute_min_pitch, absolute_max_pitch,
path=path)
if 'pitch' not in corpus_context.hierarchy.acoustics:
corpus_context.hierarchy.add_acoustic_properties(corpus_context, 'pitch', [('F0', float)])
corpus_context.encode_hierarchy()
if algorithm == 'speaker_adjusted':
speaker_data = {}
if call_back is not None:
call_back('Getting original speaker means and SDs...')
for i, ((k,), v) in enumerate(segment_mapping.items()):
if call_back is not None:
call_back('Analyzing speaker {} ({} of {})'.format(k, i, num_speakers))
output = analyze_segments(v, pitch_function, stop_check=stop_check, multiprocessing=multiprocessing)
sum_pitch = 0
n = 0
for seg, track in output.items():
for t, v in track.items():
v = v['F0']
if v is not None and v > 0: # only voiced frames
n += 1
sum_pitch += v
mean_pitch = sum_pitch / n
speaker_data[k] = int(mean_pitch / math.pow(2, adjusted_octaves)), \
int( mean_pitch * math.pow(2, adjusted_octaves))
for i, ((speaker,), v) in enumerate(segment_mapping.items()):
if call_back is not None:
call_back('Analyzing speaker {} ({} of {})'.format(speaker, i, num_speakers))
if algorithm == 'gendered':
min_pitch = absolute_min_pitch
max_pitch = absolute_max_pitch
try:
q = corpus_context.query_speakers().filter(corpus_context.speaker.name == speaker)
q = q.columns(corpus_context.speaker.gender.column_name('Gender'))
gender = q.all()[0]['Gender']
if gender is not None:
if gender.lower()[0] == 'f':
min_pitch = 100
else:
max_pitch = 400
except SpeakerAttributeError:
pass
pitch_function = generate_pitch_function(source, min_pitch, max_pitch,
path=path)
elif algorithm == 'speaker_adjusted':
min_pitch, max_pitch = speaker_data[speaker]
if min_pitch < absolute_min_pitch:
min_pitch = absolute_min_pitch
if max_pitch > absolute_max_pitch:
max_pitch = absolute_max_pitch
pitch_function = generate_pitch_function(source, min_pitch, max_pitch,
path=path)
output = analyze_segments(v, pitch_function, stop_check=stop_check, multiprocessing=multiprocessing)
corpus_context.save_acoustic_tracks('pitch', output, speaker)
today = datetime.utcnow()
corpus_context.query_graph(corpus_context.utterance).set_properties(pitch_last_edited=today.timestamp())
corpus_context.encode_hierarchy()