Incremental audio production with duration alignment handling, per-stem verification, and adaptive extension strategies
This skill provides a resilient pattern for audio production that emphasizes incremental verification, fail-fast principles, and automatic duration alignment. Each major step produces verified outputs before proceeding, with explicit handling for stem duration mismatches using appropriate extension strategies.
Follow these steps in strict order. Each step must complete successfully and pass verification before proceeding to the next:
Calculate all timing parameters before generating any audio. This ensures consistent timing across all stems:
def calculate_section_transitions(bpm, total_duration_sec, sections):
"""Calculate beat-aligned transition points for song sections."""
beats_per_second = bpm / 60.0
section_durations = {}
cumulative_time = 0
for section_name, beat_count in sections.items():
duration = beat_count / beats_per_second
section_durations[section_name] = {
'start': cumulative_time,
'end': cumulative_time + duration,
'beats': beat_count,
'start_beat': cumulative_time * beats_per_second
}
cumulative_time += duration
return section_durations
# Configuration
BPM = 120
DURATION = 137
SECTIONS = {'intro': 16, 'verse': 32, 'chorus': 32, 'bridge': 16, 'outro': 16}
timing = calculate_section_transitions(BPM, DURATION, SECTIONS)
print("Timing calculated:")
for section, data in timing.items():
print(f" {section}: {data['start']:.2f}s - {data['end']:.2f}s ({data['beats']} beats)")
Validate the reference file exists and has expected properties:
import soundfile as sf
import os
def verify_reference_file(filepath, expected_sample_rate=None, min_duration=None):
"""Verify reference audio file and return info dict."""
if not os.path.exists(filepath):
raise FileNotFoundError(f"Reference file not found: {filepath}")
info = sf.info(filepath)
errors = []
if expected_sample_rate and info.samplerate != expected_sample_rate:
errors.append(f"Sample rate mismatch: expected {expected_sample_rate}, got {info.samplerate}")
if min_duration and info.duration < min_duration:
errors.append(f"Duration too short: expected >= {min_duration}s, got {info.duration}s")
if errors:
raise ValueError(f"Reference file validation failed: {'; '.join(errors)}")
print(f"Reference verified: {info.duration:.2f}s @ {info.samplerate}Hz, {info.channels}ch, {info.subtype}")
return {
'sample_rate': info.samplerate,
'duration': info.duration,
'channels': info.channels,
'subtype': info.subtype
}
# Verify reference
ref_info = verify_reference_file('reference.wav', expected_sample_rate=48000, min_duration=130)
TARGET_DURATION = ref_info['duration'] # Use actual reference duration as target
Generate one stem at a time, verify it immediately before proceeding to the next:
import numpy as np
def generate_stem(name, duration_sec, sample_rate, subtype='FLOAT', section_timing=None):
"""Generate a single stem with explicit sample type."""
frames = int(duration_sec * sample_rate)
t = np.linspace(0, duration_sec, frames)
# Generate stem-specific content (customize per stem type)
if name == 'bass':
freq = 110 # A2
audio_data = np.sin(2 * np.pi * freq * t) * 0.8
elif name == 'guitars':
freq = 440 # A4
audio_data = np.sin(2 * np.pi * freq * t) * 0.6
elif name == 'synths':
freq = 880 # A5
audio_data = np.sin(2 * np.pi * freq * t) * 0.5
elif name == 'bridge':
freq = 220 # A3
audio_data = np.sin(2 * np.pi * freq * t) * 0.7
else:
audio_data = np.sin(2 * np.pi * 440 * t) * 0.5
# Ensure proper data type
if subtype == 'FLOAT':
audio_data = audio_data.astype(np.float32)
elif subtype == 'PCM_24':
audio_data = np.clip(audio_data, -1, 1) * (2**23 - 1)
audio_data = audio_data.astype(np.int32)
filepath = f'{name}_stem.wav'
sf.write(filepath, audio_data, sample_rate, subtype=subtype, format='WAV')
return filepath, audio_data
def verify_stem(filepath, expected_sample_rate, expected_subtype, expected_duration, tolerance_sec=1.0):
"""Verify a single stem meets specifications."""
if not os.path.exists(filepath):
return {'success': False, 'error': f'File not found: {filepath}'}
info = sf.info(filepath)
errors = []
if info.samplerate != expected_sample_rate:
errors.append(f'sample_rate: expected {expected_sample_rate}, got {info.samplerate}')
if info.subtype != expected_subtype:
errors.append(f'subtype: expected {expected_subtype}, got {info.subtype}')
if abs(info.duration - expected_duration) > tolerance_sec:
errors.append(f'duration: expected ~{expected_duration}s, got {info.duration}s')
# Calculate duration discrepancy
duration_diff = info.duration - expected_duration
if errors:
return {'success': False, 'error': '; '.join(errors), 'duration_diff': duration_diff}
return {'success': True, 'info': info, 'duration_diff': duration_diff}
# Generate stems one at a time with verification
SAMPLE_RATE = 48000
SUBTYPE = 'FLOAT'
STEM_NAMES = ['bass', 'guitars', 'synths', 'bridge']
generated_stems = []
stem_info = {} # Track duration discrepancies
for stem_name in STEM_NAMES:
print(f"\n=== Generating {stem_name} stem ===")
# Generate
filepath, data = generate_stem(stem_name, DURATION, SAMPLE_RATE, subtype=SUBTYPE)
# Verify immediately
result = verify_stem(filepath, SAMPLE_RATE, SUBTYPE, TARGET_DURATION)
if result['success']:
print(f"✓ {stem_name} stem verified: {result['info'].duration:.2f}s @ {result['info'].samplerate}Hz")
if abs(result['duration_diff']) > 0.1:
print(f" ⚠ Duration discrepancy: {result['duration_diff']:+.2f}s")
generated_stems.append(filepath)
stem_info[stem_name] = result
else:
print(f"✗ {stem_name} stem FAILED: {result['error']}")
raise RuntimeError(f"Stem generation failed for {stem_name}: {result['error']}")
print(f"\nAll {len(generated_stems)} stems generated and verified successfully")
When stems have different durations, apply the appropriate extension strategy:
| Strategy | Best For | Duration Gap | Sound Characteristic |
|---|---|---|---|
| Zero-padding | Short gaps (<0.5s), silence sections, endings | Small | Clean, abrupt |
| Looping | Repetitive patterns (drums, bass, rhythmic elements) | Medium to large | Seamless, rhythmic |
| Crossfade extension | Melodic content, sustained instruments, vocals | Any | Natural, smooth |
def align_stem_duration(input_filepath, output_filepath, target_duration, strategy='auto',
sample_rate=None, subtype='FLOAT', loop_seamless=True):
"""
Align stem duration to target using appropriate strategy.
Args:
input_filepath: Path to source stem
output_filepath: Path for aligned output
target_duration: Target duration in seconds
strategy: 'zero_pad', 'loop', 'crossfade', or 'auto'
sample_rate: Sample rate (auto-detected if None)
subtype: Audio subtype
loop_seamless: Apply crossfade at loop boundaries if True
Returns:
dict with success status and alignment details
"""
if not os.path.exists(input_filepath):
return {'success': False, 'error': f'Input file not found: {input_filepath}'}
# Load source
data, sr = sf.read(input_filepath)
if sample_rate is None:
sample_rate = sr
source_duration = len(data) / sample_rate
duration_diff = target_duration - source_duration
# If already aligned (within tolerance), just copy
if abs(duration_diff) < 0.01:
sf.write(output_filepath, data, sample_rate, subtype=subtype, format='WAV')
return {'success': True, 'strategy': 'none', 'duration_diff': 0}
if duration_diff > 0:
# Need to EXTEND
extend_frames = int(duration_diff * sample_rate)
if strategy == 'auto':
# Auto-select based on duration gap and stem type
if duration_diff < 0.5:
strategy = 'zero_pad'
elif 'drum' in input_filepath or 'bass' in input_filepath:
strategy = 'loop'
else:
strategy = 'crossfade'
if strategy == 'zero_pad':
# Append zeros
padding = np.zeros(extend_frames, dtype=data.dtype)
aligned_data = np.concatenate([data, padding])
elif strategy == 'loop':
# Loop the content
loop_frames = len(data)
loops_needed = int(np.ceil(extend_frames / loop_frames))
if loop_seamless and loops_needed > 1:
# Apply crossfade at loop boundaries for seamless looping
crossfade_frames = min(int(0.05 * sample_rate), loop_frames // 4)
loop_extension = np.zeros(extend_frames, dtype=data.dtype)
for i in range(loops_needed):
start = i * loop_frames
end = min(start + loop_frames, extend_frames)
actual_len = end - start
# Extract loop segment
loop_segment = data[:actual_len].copy()
# Apply crossfade at boundaries
if i > 0 and actual_len >= crossfade_frames * 2:
# Fade in from previous loop
fade_in = np.linspace(0, 1, crossfade_frames)
loop_segment[:crossfade_frames] *= fade_in
if i < loops_needed - 1 and actual_len >= crossfade_frames * 2:
# Fade out for next loop
fade_out = np.linspace(1, 0, crossfade_frames)
loop_segment[-crossfade_frames:] *= fade_out
loop_extension[start:end] = loop_segment
extend_frames_actual = len(loop_extension)
else:
# Simple tiling
loop_extension = np.tile(data, loops_needed)[:extend_frames]
extend_frames_actual = extend_frames
aligned_data = np.concatenate([data, loop_extension[:extend_frames_actual]])
elif strategy == 'crossfade':
# Extend using crossfade from the end of the source
# Take last portion and crossfade it onto itself
fade_duration = min(duration_diff * 0.3, 2.0) # 30% of gap, max 2s
fade_frames = int(fade_duration * sample_rate)
if fade_frames >= len(data) // 2:
# Source too short for crossfade, fall back to loop
fade_frames = len(data) // 4
# Extract tail segment for extension
tail_segment = data[-fade_frames:].copy()
# Create extended portion with crossfade
extended_portion = np.zeros(extend_frames, dtype=data.dtype)
if extend_frames <= fade_frames:
# Short extension: just crossfade tail onto itself
fade_in = np.linspace(0, 1, extend_frames)
extended_portion = tail_segment[:extend_frames] * fade_in
else:
# Longer extension: loop tail with crossfades
loops = int(np.ceil(extend_frames / fade_frames))
for i in range(loops):
start = i * fade_frames
end = min(start + fade_frames, extend_frames)
seg_len = end - start
segment = tail_segment[:seg_len].copy()
# Crossfade boundaries
if seg_len >= 100:
cf_len = min(50, seg_len // 4)
if i > 0:
fade_in = np.linspace(0, 1, cf_len)
segment[:cf_len] *= fade_in
extended_portion[start:end] = segment
aligned_data = np.concatenate([data, extended_portion])
else:
return {'success': False, 'error': f'Unknown extension strategy: {strategy}'}
else:
# Need to TRUNCATE
truncate_frames = int(abs(duration_diff) * sample_rate)
aligned_data = data[:len(data) - truncate_frames]
strategy = 'truncate'
# Ensure proper data type and clip
if subtype == 'FLOAT':
aligned_data = aligned_data.astype(np.float32)
elif subtype == 'PCM_24':
aligned_data = np.clip(aligned_data, -1, 1) * (2**23 - 1)
aligned_data = aligned_data.astype(np.int32)
else:
aligned_data = np.clip(aligned_data, -1, 1)
# Export aligned stem
sf.write(output_filepath, aligned_data, sample_rate, subtype=subtype, format='WAV')
return {
'success': True,
'strategy': strategy,
'source_duration': source_duration,
'target_duration': target_duration,
'duration_diff': duration_diff,
'aligned_frames': len(aligned_data)
}
# Apply duration alignment to all stems
print("\n=== Aligning stem durations ===")
aligned_stems = []
TARGET_DURATION = ref_info['duration'] # Use reference as target
for stem_name in STEM_NAMES:
input_file = f'{stem_name}_stem.wav'
output_file = f'{stem_name}_aligned.wav'
# Determine strategy based on stem type
if stem_name in ['bass', 'drums']:
strategy = 'loop' # Rhythmic elements loop well
elif stem_name in ['bridge', 'outro']:
strategy = 'crossfade' # Sustained content benefits from crossfade
else:
strategy = 'auto' # Let the function decide
print(f"Aligning {stem_name} (strategy: {strategy})...")
result = align_stem_duration(input_file, output_file, TARGET_DURATION,
strategy=strategy, sample_rate=SAMPLE_RATE, subtype=SUBTYPE)
if result['success']:
if result['strategy'] != 'none':
print(f"✓ {stem_name} aligned: {result['source_duration']:.2f}s -> {result['target_duration']:.2f}s via {result['strategy']}")
else:
print(f"✓ {stem_name} already aligned at {result['target_duration']:.2f}s")
aligned_stems.append(output_file)
else:
print(f"✗ {stem_name} alignment FAILED: {result['error']}")
raise RuntimeError(f"Duration alignment failed for {stem_name}: {result['error']}")
print(f"\nAll {len(aligned_stems)} stems duration-aligned successfully")
Drums require different processing (rhythm patterns, percussion sounds):
def generate_drum_stem(duration_sec, sample_rate, bpm, section_timing, subtype='FLOAT'):
"""Generate drum stem with rhythm patterns aligned to sections."""
frames = int(duration_sec * sample_rate)
audio_data = np.zeros(frames, dtype=np.float32)
beats_per_second = bpm / 60.0
# Simple kick drum pattern (every beat)
kick_freq = 60
kick_duration = 0.1
kick_frames = int(kick_duration * sample_rate)
for beat_time in np.arange(0, duration_sec, 1.0 / beats_per_second):
start_frame = int(beat_time * sample_rate)
end_frame = min(start_frame + kick_frames, frames)
if start_frame < frames:
t = np.linspace(0, kick_duration, end_frame - start_frame)
kick = np.exp(-5 * t) * np.sin(2 * np.pi * kick_freq * t)
audio_data[start_frame:end_frame] += kick * 0.9
# Simple snare pattern (every 2nd and 4th beat)
snare_freq = 200
snare_duration = 0.05
snare_frames = int(snare_duration * sample_rate)
for beat_time in np.arange(0, duration_sec, 2.0 / beats_per_second):
start_frame = int((beat_time + 0.5 / beats_per_second) * sample_rate)
end_frame = min(start_frame + snare_frames, frames)
if start_frame < frames:
t = np.linspace(0, snare_duration, end_frame - start_frame)
snare = np.exp(-10 * t) * np.random.uniform(-1, 1, len(t)) * 0.5
audio_data[start_frame:end_frame] += snare * 0.7
audio_data = np.clip(audio_data, -1, 1)
filepath = 'drums_stem.wav'
sf.write(filepath, audio_data, sample_rate, subtype=subtype, format='WAV')
return filepath, audio_data
print("\n=== Generating drum stem ===")
drums_filepath, drums_data = generate_drum_stem(TARGET_DURATION, SAMPLE_RATE, BPM, timing, subtype=SUBTYPE)
drums_result = verify_stem(drums_filepath, SAMPLE_RATE, SUBTYPE, TARGET_DURATION)
if drums_result['success']:
print(f"✓ Drum stem verified: {drums_result['info'].duration:.2f}s @ {drums_result['info'].samplerate}Hz")
# Align drums if needed (usually uses loop strategy)
drums_aligned = 'drums_aligned.wav'
drums_align_result = align_stem_duration(drums_filepath, drums_aligned, TARGET_DURATION,
strategy='loop', sample_rate=SAMPLE_RATE, subtype=SUBTYPE)
if drums_align_result['success']:
aligned_stems.append(drums_aligned)
print(f"✓ Drums aligned via {drums_align_result['strategy']}")
else:
print(f"✗ Drum alignment FAILED: {drums_align_result['error']}")
raise RuntimeError(f"Drum alignment failed: {drums_align_result['error']}")