Skip to main content

NORD's Replay System: Simulation Data Recording and Playback

Overview​

NORD's Replay System is a comprehensive framework for recording, storing, analyzing, and replaying robot simulation data within the NVIDIA Omniverse ecosystem. This system enables developers and researchers to capture complete simulation sessions, analyze robot behavior, debug complex scenarios, and validate robot performance against real-world data. The Replay System serves as a critical component for the iterative development and validation of physical AI systems.

Architecture and Design​

Replay System Components​

The NORD Replay System consists of several interconnected components:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ NORD Replay System β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Data Logger β”‚ β”‚ Data Manager β”‚ β”‚ Codec β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ - Joint States β”‚ β”‚ - Buffer Mgmt β”‚ β”‚ - HDF5 β”‚ β”‚
β”‚ β”‚ - Sensor Data β”‚ β”‚ - Compression β”‚ β”‚ - Protobufβ”‚β”‚
β”‚ β”‚ - Env State β”‚ β”‚ - Serialization β”‚ β”‚ - Customβ”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Playback β”‚ β”‚ Analysis β”‚ β”‚
β”‚ β”‚ Engine β”‚ β”‚ Tools β”‚ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ - Frame Sync β”‚ β”‚ - Visualization β”‚ β”‚
β”‚ β”‚ - Interpolation β”‚ β”‚ - Statistics β”‚ β”‚
β”‚ β”‚ - Real-time β”‚ β”‚ - Comparison β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Recording Architecture​

Core Recording Engine​

The recording engine captures comprehensive robot simulation data:

import numpy as np
import h5py
from datetime import datetime
import json

class NORDReplayEngine:
def __init__(self, robot_config, output_path):
self.robot_config = robot_config
self.output_path = output_path
self.is_recording = False
self.start_time = None
self.frame_count = 0

# Initialize recording buffers
self.buffers = {
'timestamps': [],
'joint_states': {
'positions': [],
'velocities': [],
'efforts': []
},
'sensor_data': {},
'environment_state': [],
'control_commands': [],
'physics_state': [],
'user_annotations': []
}

# Recording parameters
self.record_frequency = 60 # Hz
self.max_buffer_size = 10000 # frames
self.compression_enabled = True

def start_recording(self):
"""Initialize recording session"""
self.is_recording = True
self.start_time = time.time()
self.frame_count = 0

# Initialize sensor data buffers
for sensor_name in self.robot_config.get('sensors', []):
self.buffers['sensor_data'][sensor_name] = []

print(f"Recording started at {datetime.now().isoformat()}")
return True

def record_frame(self, robot_state, sensor_data, environment_state, control_commands):
"""Record a single simulation frame"""
if not self.is_recording:
return False

current_time = time.time() - self.start_time
self.buffers['timestamps'].append(current_time)

# Record joint states
joint_positions = []
joint_velocities = []
joint_efforts = []

for joint_name in self.robot_config['joints']:
joint_state = robot_state.get_joint_state(joint_name)
joint_positions.append(joint_state.position)
joint_velocities.append(joint_state.velocity)
joint_efforts.append(joint_state.effort)

self.buffers['joint_states']['positions'].append(joint_positions)
self.buffers['joint_states']['velocities'].append(joint_velocities)
self.buffers['joint_states']['efforts'].append(joint_efforts)

# Record sensor data
for sensor_name, data in sensor_data.items():
self.buffers['sensor_data'][sensor_name].append(data)

# Record environment state
self.buffers['environment_state'].append(environment_state)

# Record control commands
self.buffers['control_commands'].append(control_commands)

# Record physics state
physics_state = {
'gravity': robot_state.get_gravity(),
'friction': robot_state.get_contact_friction(),
'damping': robot_state.get_damping()
}
self.buffers['physics_state'].append(physics_state)

self.frame_count += 1

# Check buffer size and flush if needed
if len(self.buffers['timestamps']) >= self.max_buffer_size:
self.flush_buffer()

return True

def stop_recording(self):
"""Finalize and save recording"""
self.is_recording = False

# Save all buffers to file
self.save_recording()

print(f"Recording stopped. Saved {self.frame_count} frames to {self.output_path}")
return True

def save_recording(self):
"""Save recording data to file using HDF5"""
with h5py.File(self.output_path, 'w') as f:
# Save metadata
f.attrs['created'] = datetime.now().isoformat()
f.attrs['robot_config'] = json.dumps(self.robot_config)
f.attrs['frame_count'] = self.frame_count
f.attrs['duration'] = self.buffers['timestamps'][-1] if self.buffers['timestamps'] else 0

# Save timestamps
f.create_dataset('timestamps', data=np.array(self.buffers['timestamps']))

# Save joint states
joint_group = f.create_group('joint_states')
joint_group.create_dataset('positions',
data=np.array(self.buffers['joint_states']['positions']),
compression='gzip' if self.compression_enabled else None)
joint_group.create_dataset('velocities',
data=np.array(self.buffers['joint_states']['velocities']),
compression='gzip' if self.compression_enabled else None)
joint_group.create_dataset('efforts',
data=np.array(self.buffers['joint_states']['efforts']),
compression='gzip' if self.compression_enabled else None)

# Save sensor data
sensor_group = f.create_group('sensor_data')
for sensor_name, data_list in self.buffers['sensor_data'].items():
if data_list: # Only save if there's data
sensor_group.create_dataset(sensor_name,
data=np.array(data_list),
compression='gzip' if self.compression_enabled else None)

# Save other data
f.create_dataset('environment_state',
data=np.array(self.buffers['environment_state']),
compression='gzip' if self.compression_enabled else None)
f.create_dataset('control_commands',
data=np.array(self.buffers['control_commands']),
compression='gzip' if self.compression_enabled else None)
f.create_dataset('physics_state',
data=np.array(self.buffers['physics_state']),
compression='gzip' if self.compression_enabled else None)

print(f"Recording saved to {self.output_path}")

Advanced Recording Features​

Selective Recording​

class SelectiveReplayRecorder(NORDReplayEngine):
def __init__(self, robot_config, output_path):
super().__init__(robot_config, output_path)
self.recording_mask = {
'joint_positions': True,
'joint_velocities': True,
'joint_efforts': False, # Only record when needed
'sensor_data': True,
'environment_state': True,
'control_commands': True
}

def record_frame(self, robot_state, sensor_data, environment_state, control_commands):
"""Record frame with selective data capture"""
if not self.is_recording:
return False

current_time = time.time() - self.start_time
self.buffers['timestamps'].append(current_time)

# Conditionally record joint states based on mask
if self.recording_mask['joint_positions']:
joint_positions = []
for joint_name in self.robot_config['joints']:
joint_state = robot_state.get_joint_state(joint_name)
joint_positions.append(joint_state.position)
self.buffers['joint_states']['positions'].append(joint_positions)

if self.recording_mask['joint_velocities']:
joint_velocities = []
for joint_name in self.robot_config['joints']:
joint_state = robot_state.get_joint_state(joint_name)
joint_velocities.append(joint_state.velocity)
self.buffers['joint_states']['velocities'].append(joint_velocities)

if self.recording_mask['joint_efforts']:
joint_efforts = []
for joint_name in self.robot_config['joints']:
joint_state = robot_state.get_joint_state(joint_name)
joint_efforts.append(joint_state.effort)
self.buffers['joint_states']['efforts'].append(joint_efforts)

# Record sensor data if enabled
if self.recording_mask['sensor_data']:
for sensor_name, data in sensor_data.items():
self.buffers['sensor_data'][sensor_name].append(data)

# Record other data based on mask
if self.recording_mask['environment_state']:
self.buffers['environment_state'].append(environment_state)

if self.recording_mask['control_commands']:
self.buffers['control_commands'].append(control_commands)

self.frame_count += 1
return True

Event-Based Recording​

class EventBasedReplayRecorder(NORDReplayEngine):
def __init__(self, robot_config, output_path):
super().__init__(robot_config, output_path)
self.event_triggers = []
self.event_buffer = []
self.event_recording = False

def add_event_trigger(self, condition_func, description):
"""Add a condition that triggers recording"""
self.event_triggers.append({
'condition': condition_func,
'description': description,
'active': False
})

def check_events(self, robot_state, sensor_data):
"""Check if any events are triggered"""
triggered_events = []

for i, trigger in enumerate(self.event_triggers):
if trigger['condition'](robot_state, sensor_data):
if not trigger['active']:
# Event just triggered
trigger['active'] = True
triggered_events.append({
'timestamp': time.time() - self.start_time,
'description': trigger['description'],
'type': 'start_recording'
})
else:
if trigger['active']:
# Event just ended
trigger['active'] = False
triggered_events.append({
'timestamp': time.time() - self.start_time,
'description': trigger['description'],
'type': 'stop_recording'
})

return triggered_events

def record_frame(self, robot_state, sensor_data, environment_state, control_commands):
"""Record frame with event-based logic"""
if not self.is_recording:
# Check if any events should start recording
events = self.check_events(robot_state, sensor_data)
for event in events:
if event['type'] == 'start_recording':
self.is_recording = True
print(f"Event triggered recording: {event['description']}")

if self.is_recording:
# Record the frame normally
super().record_frame(robot_state, sensor_data, environment_state, control_commands)

# Check if any events should stop recording
events = self.check_events(robot_state, sensor_data)
for event in events:
if event['type'] == 'stop_recording':
self.is_recording = False
print(f"Event stopped recording: {event['description']}")

Data Playback System​

Core Playback Engine​

The playback engine reproduces recorded simulation data:

class NORDPlaybackEngine:
def __init__(self, recording_path):
self.recording_path = recording_path
self.recording_data = None
self.current_frame = 0
self.is_playing = False
self.playback_speed = 1.0
self.loop_playback = False

# Load recording
self.load_recording()

def load_recording(self):
"""Load recording data from file"""
with h5py.File(self.recording_path, 'r') as f:
self.recording_data = {}

# Load metadata
self.recording_data['metadata'] = {
'created': f.attrs.get('created', ''),
'frame_count': f.attrs.get('frame_count', 0),
'duration': f.attrs.get('duration', 0.0)
}

# Load timestamps
self.recording_data['timestamps'] = f['timestamps'][:]

# Load joint states
joint_group = f['joint_states']
self.recording_data['joint_states'] = {
'positions': joint_group['positions'][:],
'velocities': joint_group['velocities'][:],
'efforts': joint_group['efforts'][:]
}

# Load sensor data
self.recording_data['sensor_data'] = {}
sensor_group = f['sensor_data']
for sensor_name in sensor_group.keys():
self.recording_data['sensor_data'][sensor_name] = sensor_group[sensor_name][:]

# Load other data
self.recording_data['environment_state'] = f['environment_state'][:]
self.recording_data['control_commands'] = f['control_commands'][:]
self.recording_data['physics_state'] = f['physics_state'][:]

print(f"Recording loaded: {self.recording_data['metadata']['frame_count']} frames")

def play_frame(self, target_robot, interpolate=True):
"""Play back a single frame of recording"""
if self.current_frame >= len(self.recording_data['timestamps']):
if self.loop_playback:
self.current_frame = 0 # Loop back to start
else:
return False # End of recording

# Get current frame data
frame_idx = self.current_frame

# Apply joint positions
if frame_idx < len(self.recording_data['joint_states']['positions']):
joint_positions = self.recording_data['joint_states']['positions'][frame_idx]
joint_names = self.get_joint_names(target_robot) # Implementation depends on robot interface

for i, joint_name in enumerate(joint_names):
if i < len(joint_positions):
target_robot.set_joint_position(joint_name, joint_positions[i])

# Apply joint velocities if available
if frame_idx < len(self.recording_data['joint_states']['velocities']):
joint_velocities = self.recording_data['joint_states']['velocities'][frame_idx]
for i, joint_name in enumerate(joint_names):
if i < len(joint_velocities):
target_robot.set_joint_velocity(joint_name, joint_velocities[i])

# Apply sensor data visualization (for debugging/analysis)
for sensor_name, sensor_data_list in self.recording_data['sensor_data'].items():
if frame_idx < len(sensor_data_list):
# This could trigger sensor data visualization
self.visualize_sensor_data(sensor_name, sensor_data_list[frame_idx])

# Apply environment state
if frame_idx < len(self.recording_data['environment_state']):
env_state = self.recording_data['environment_state'][frame_idx]
self.apply_environment_state(env_state)

self.current_frame += 1
return True

def play_sequence(self, target_robot, start_frame=0, end_frame=None, callback=None):
"""Play back entire sequence or specified range"""
self.current_frame = start_frame
if end_frame is None:
end_frame = len(self.recording_data['timestamps'])

while self.current_frame < end_frame:
success = self.play_frame(target_robot)
if not success:
break

if callback:
progress = (self.current_frame - start_frame) / (end_frame - start_frame)
callback(progress, self.current_frame)

# Control playback speed
if self.playback_speed > 0:
time.sleep(1.0 / (60.0 * self.playback_speed)) # Assuming 60Hz base rate

def interpolate_frame(self, frame_idx, target_robot):
"""Interpolate between frames for smooth playback"""
if frame_idx >= len(self.recording_data['timestamps']) - 1:
return self.play_frame(target_robot, interpolate=False)

# Get current and next frame data
current_pos = self.recording_data['joint_states']['positions'][frame_idx]
next_pos = self.recording_data['joint_states']['positions'][frame_idx + 1]

# Calculate interpolation factor (simplified)
current_time = self.recording_data['timestamps'][frame_idx]
next_time = self.recording_data['timestamps'][frame_idx + 1]
target_time = current_time + (next_time - current_time) * 0.5 # 50% interpolation

# Linear interpolation
interpolated_pos = []
for i in range(len(current_pos)):
interp_val = current_pos[i] + 0.5 * (next_pos[i] - current_pos[i])
interpolated_pos.append(interp_val)

# Apply interpolated positions
joint_names = self.get_joint_names(target_robot)
for i, joint_name in enumerate(joint_names):
if i < len(interpolated_pos):
target_robot.set_joint_position(joint_name, interpolated_pos[i])

Advanced Playback Features​

Synchronized Multi-Robot Playback​

class SynchronizedPlaybackEngine:
def __init__(self, recording_paths):
self.recording_engines = []
self.robots = []

for path in recording_paths:
self.recording_engines.append(NORDPlaybackEngine(path))

def add_robot(self, robot, recording_index):
"""Add robot to synchronized playback"""
self.robots.append({
'robot': robot,
'recording_index': recording_index,
'offset': 0.0 # Time offset for this robot
})

def play_synchronized(self, start_time=0.0, duration=None):
"""Play multiple recordings in sync"""
# Find common time range
min_duration = min(
engine.recording_data['metadata']['duration']
for engine in self.recording_engines
)

if duration is None:
duration = min_duration

current_time = start_time
base_timestamp = time.time()

while current_time <= duration:
# Calculate target frame for each recording
for i, robot_info in enumerate(self.robots):
recording_idx = robot_info['recording_index']
offset = robot_info['offset']

# Find closest frame to current_time + offset
frame_idx = self.find_frame_at_time(
self.recording_engines[recording_idx],
current_time + offset
)

# Play that frame for the robot
self.recording_engines[recording_idx].current_frame = frame_idx
self.recording_engines[recording_idx].play_frame(robot_info['robot'])

# Update time based on real elapsed time
elapsed = time.time() - base_timestamp
current_time = start_time + elapsed * self.playback_speed

def find_frame_at_time(self, engine, target_time):
"""Find the frame index closest to target time"""
timestamps = engine.recording_data['timestamps']

# Binary search for closest frame
left, right = 0, len(timestamps) - 1
while left <= right:
mid = (left + right) // 2
if timestamps[mid] == target_time:
return mid
elif timestamps[mid] < target_time:
left = mid + 1
else:
right = mid - 1

# Return closest frame
if right < 0:
return 0
if left >= len(timestamps):
return len(timestamps) - 1

# Return frame with closest timestamp
if abs(timestamps[left] - target_time) < abs(timestamps[right] - target_time):
return left
else:
return right

Analysis and Visualization Tools​

Data Analysis Engine​

class NORDAnalysisEngine:
def __init__(self, recording_path):
self.playback_engine = NORDPlaybackEngine(recording_path)
self.recording_data = self.playback_engine.recording_data

def analyze_joint_trajectories(self, joint_name):
"""Analyze joint trajectory data"""
joint_idx = self.get_joint_index(joint_name)

positions = []
velocities = []
accelerations = []

for i in range(len(self.recording_data['joint_states']['positions'])):
if joint_idx < len(self.recording_data['joint_states']['positions'][i]):
positions.append(self.recording_data['joint_states']['positions'][i][joint_idx])

# Calculate velocities and accelerations
for i in range(1, len(positions)):
dt = (self.recording_data['timestamps'][i] -
self.recording_data['timestamps'][i-1])
if dt > 0:
vel = (positions[i] - positions[i-1]) / dt
velocities.append(vel)

for i in range(1, len(velocities)):
dt = (self.recording_data['timestamps'][i+1] -
self.recording_data['timestamps'][i])
if dt > 0:
acc = (velocities[i] - velocities[i-1]) / dt
accelerations.append(acc)

return {
'positions': positions,
'velocities': velocities,
'accelerations': accelerations,
'timestamps': self.recording_data['timestamps'][:len(positions)]
}

def calculate_performance_metrics(self):
"""Calculate key performance metrics"""
metrics = {}

# Calculate tracking accuracy if reference trajectory exists
if 'reference_trajectory' in self.recording_data:
tracking_errors = []
for i in range(min(len(self.recording_data['joint_states']['positions']),
len(self.recording_data['reference_trajectory']))):
actual = self.recording_data['joint_states']['positions'][i]
reference = self.recording_data['reference_trajectory'][i]

error = np.linalg.norm(np.array(actual) - np.array(reference))
tracking_errors.append(error)

metrics['tracking_accuracy'] = {
'mean_error': np.mean(tracking_errors),
'max_error': np.max(tracking_errors),
'rmse': np.sqrt(np.mean(np.array(tracking_errors) ** 2))
}

# Calculate execution time statistics
timestamps = self.recording_data['timestamps']
metrics['execution_time'] = {
'total_duration': timestamps[-1] - timestamps[0] if timestamps else 0,
'average_step_time': np.mean(np.diff(timestamps)) if len(timestamps) > 1 else 0,
'min_step_time': np.min(np.diff(timestamps)) if len(timestamps) > 1 else 0,
'max_step_time': np.max(np.diff(timestamps)) if len(timestamps) > 1 else 0
}

# Calculate energy consumption (estimated from efforts)
total_energy = 0
for i in range(len(self.recording_data['joint_states']['efforts']) - 1):
efforts = self.recording_data['joint_states']['efforts'][i]
dt = (self.recording_data['timestamps'][i+1] -
self.recording_data['timestamps'][i])
power = sum(abs(effort) * 1.0 for effort in efforts) # Simplified power calculation
total_energy += power * dt

metrics['energy_consumption'] = total_energy

return metrics

def compare_recordings(self, other_recording_path):
"""Compare two recordings"""
other_engine = NORDAnalysisEngine(other_recording_path)

comparison = {
'duration_difference': (
self.recording_data['metadata']['duration'] -
other_engine.recording_data['metadata']['duration']
),
'frame_count_difference': (
self.recording_data['metadata']['frame_count'] -
other_engine.recording_data['metadata']['frame_count']
)
}

# Compare joint trajectories
comparison['joint_trajectory_similarity'] = {}
for joint_idx in range(min(
len(self.recording_data['joint_states']['positions'][0]),
len(other_engine.recording_data['joint_states']['positions'][0])
)):
self_joint_data = [frame[joint_idx] for frame in
self.recording_data['joint_states']['positions']]
other_joint_data = [frame[joint_idx] for frame in
other_engine.recording_data['joint_states']['positions']]

# Pad shorter sequence
min_len = min(len(self_joint_data), len(other_joint_data))
similarity = np.corrcoef(self_joint_data[:min_len],
other_joint_data[:min_len])[0, 1]

comparison['joint_trajectory_similarity'][f'joint_{joint_idx}'] = similarity

return comparison

Integration with Development Workflow​

Validation and Testing​

class NORDReplayValidator:
def __init__(self, robot_config):
self.robot_config = robot_config

def validate_recording_integrity(self, recording_path):
"""Validate that recording data is complete and consistent"""
try:
engine = NORDPlaybackEngine(recording_path)
data = engine.recording_data

validation_results = {
'timestamps_valid': self._validate_timestamps(data['timestamps']),
'joint_data_consistency': self._validate_joint_data_consistency(data['joint_states']),
'sensor_data_completeness': self._validate_sensor_data_completeness(
data['sensor_data'], data['timestamps']
),
'metadata_consistency': self._validate_metadata_consistency(data, recording_path)
}

return all(validation_results.values()), validation_results

except Exception as e:
return False, {'error': str(e)}

def _validate_timestamps(self, timestamps):
"""Validate timestamp sequence"""
if len(timestamps) < 2:
return len(timestamps) == 1 # Single frame is valid

# Check that timestamps are monotonically increasing
return all(timestamps[i] <= timestamps[i+1] for i in range(len(timestamps)-1))

def _validate_joint_data_consistency(self, joint_states):
"""Validate that joint state arrays have consistent dimensions"""
pos_len = len(joint_states['positions'])
vel_len = len(joint_states['velocities'])
eff_len = len(joint_states['efforts'])

# All should have same number of frames
return pos_len == vel_len == eff_len

def regression_test(self, test_recording_path, baseline_recording_path, threshold=0.01):
"""Perform regression testing by comparing recordings"""
test_analyzer = NORDAnalysisEngine(test_recording_path)
baseline_analyzer = NORDAnalysisEngine(baseline_recording_path)

# Compare key metrics
test_metrics = test_analyzer.calculate_performance_metrics()
baseline_metrics = baseline_analyzer.calculate_performance_metrics()

regression_results = {}

# Compare execution time
time_diff = abs(
test_metrics['execution_time']['total_duration'] -
baseline_metrics['execution_time']['total_duration']
)
regression_results['execution_time_regression'] = time_diff < threshold

# Compare energy consumption
if 'energy_consumption' in test_metrics and 'energy_consumption' in baseline_metrics:
energy_diff = abs(
test_metrics['energy_consumption'] -
baseline_metrics['energy_consumption']
) / baseline_metrics['energy_consumption']
regression_results['energy_regression'] = energy_diff < threshold

# Compare tracking accuracy if available
if ('tracking_accuracy' in test_metrics and
'tracking_accuracy' in baseline_metrics):
accuracy_diff = abs(
test_metrics['tracking_accuracy']['rmse'] -
baseline_metrics['tracking_accuracy']['rmse']
)
regression_results['tracking_accuracy_regression'] = accuracy_diff < threshold

overall_pass = all(regression_results.values())

return {
'pass': overall_pass,
'results': regression_results,
'details': {
'test_metrics': test_metrics,
'baseline_metrics': baseline_metrics
}
}

Best Practices and Guidelines​

Recording Best Practices​

  • Use appropriate recording frequency based on robot dynamics
  • Include sufficient pre- and post-event data for analysis
  • Record both low-level joint data and high-level commands
  • Include environmental context in recordings
  • Use descriptive file naming conventions

Playback Best Practices​

  • Validate robot state before starting playback
  • Monitor for interpolation errors in real-time playback
  • Implement safety checks during playback
  • Use appropriate playback speeds for analysis

Analysis Best Practices​

  • Establish baseline performance metrics
  • Use statistical methods for comparison
  • Document analysis methodology
  • Include uncertainty estimates in results

The NORD Replay System provides a comprehensive solution for recording, analyzing, and replaying robot simulation data, enabling rigorous validation and iterative improvement of physical AI systems.