API Reference - Utils Module

The utils module provides utility classes and functions that support the core functionality of the OpenScope Experimental Launcher.

Overview

The utils module contains helper classes for:

  • ConfigLoader: Loading and parsing CamStim-style configuration files

  • GitManager: Git repository operations and version control

  • ProcessMonitor: Process health monitoring and resource tracking

  • FileUtils: File system operations and path management

  • ValidationUtils: Parameter validation and schema checking

ConfigLoader Class

class openscope_experimental_launcher.utils.config_loader.ConfigLoader[source]

Bases: object

Handles loading and parsing of CamStim-compatible configuration files.

Handles loading and parsing of CamStim-style configuration files.

Features: - Parse .cfg files with section-based configuration - Environment variable substitution - Type conversion and validation - Default value handling

__init__()[source]

Initialize the configuration loader.

load_config(params)[source]

Load configuration from CamStim config files.

Parameters:

params (Dict[str, Any]) – Experiment parameters that may contain config path overrides

Return type:

Dict[str, Dict[str, Any]]

Returns:

Dictionary containing all configuration sections

GitManager Class

class openscope_experimental_launcher.utils.git_manager.GitManager[source]

Bases: object

Handles Git repository operations for workflow management.

Manages Git repository operations including cloning, checkout, and version tracking.

Features: - Repository cloning with progress tracking - Branch and commit checkout - Version tracking and metadata - Repository validation

__init__()[source]

Initialize the Git manager.

setup_repository(params)[source]

Set up the repository based on parameters.

Parameters:

params (Dict[str, Any]) – Dictionary containing repository configuration

Return type:

bool

Returns:

True if successful, False otherwise

get_repository_path(params)[source]

Get the full path to the cloned repository.

Parameters:

params (Dict[str, Any]) – Dictionary containing repository configuration

Return type:

Optional[str]

Returns:

Path to repository or None if not configured

ProcessMonitor Class

class openscope_experimental_launcher.utils.process_monitor.ProcessMonitor(kill_threshold=90.0)[source]

Bases: object

Handles process monitoring with memory usage tracking and runaway detection.

Monitors process health, memory usage, and handles cleanup operations.

Features: - Real-time memory usage monitoring - Process termination handling - Windows job object management - Resource cleanup

__init__(kill_threshold=90.0)[source]

Initialize the process monitor.

Parameters:

kill_threshold (float) – Memory usage threshold (percentage) above initial usage that triggers process termination

monitor_process(process, initial_memory_percent, kill_callback=None)[source]

Monitor a process until it completes or exceeds memory threshold.

Parameters:
  • process (Popen) – The subprocess to monitor

  • initial_memory_percent (float) – Initial memory usage percentage

  • kill_callback (Optional[Callable]) – Function to call if process needs to be killed

get_process_memory_info(process)[source]

Get detailed memory information for a process.

Parameters:

process (Popen) – The subprocess to inspect

Return type:

dict

Returns:

Dictionary containing memory information

is_process_responsive(process, timeout=5.0)[source]

Check if a process is responsive by testing if it responds within timeout.

Parameters:
  • process (Popen) – The subprocess to check

  • timeout (float) – Timeout in seconds

Return type:

bool

Returns:

True if process is responsive, False otherwise

Example Usage

Configuration Loading

from openscope_experimental_launcher.utils.config_loader import ConfigLoader

# Load CamStim configuration file
config_loader = ConfigLoader()
config = config_loader.load("C:/ProgramData/AIBS_MPE/camstim/config/stim.cfg")

# Access configuration values
display_config = config.get('display', {})
print(f"Monitor refresh rate: {display_config.get('refresh_rate', 60)}")

# Load with environment variable substitution
config_with_env = config_loader.load_with_env_substitution("config.cfg")

Git Repository Management

from openscope_experimental_launcher.utils.git_manager import GitManager

# Initialize Git manager
git_manager = GitManager()

# Clone repository
repo_path = git_manager.clone_repository(
    url="https://github.com/AllenNeuralDynamics/openscope-community-predictive-processing.git",
    local_path="C:/BonsaiWorkflows/PredictiveProcessing",
    commit_hash="main"
)

# Get repository information
repo_info = git_manager.get_repository_info(repo_path)
print(f"Current commit: {repo_info['commit_hash']}")
print(f"Branch: {repo_info['branch']}")
print(f"Last modified: {repo_info['last_modified']}")

Process Monitoring

from openscope_experimental_launcher.utils.process_monitor import ProcessMonitor
import subprocess

# Start a process
process = subprocess.Popen(["python", "long_running_script.py"])

# Monitor the process
monitor = ProcessMonitor(process)

# Check memory usage periodically
while process.poll() is None:
    memory_info = monitor.get_memory_usage()
    print(f"Memory usage: {memory_info['percent']}%")

    # Terminate if memory usage is too high
    if memory_info['percent'] > 80:
        monitor.terminate_process()
        break

    time.sleep(1)

Utility Functions

File System Utilities

from openscope_experimental_launcher.utils.file_utils import (
    ensure_directory_exists,
    get_file_checksum,
    safe_file_copy,
    cleanup_temp_files
)

# Ensure output directory exists
output_dir = "C:/ExperimentData/2025-06-13"
ensure_directory_exists(output_dir)

# Calculate file checksum for integrity checking
checksum = get_file_checksum("experiment_params.json")
print(f"Parameter file checksum: {checksum}")

# Safe file copying with error handling
success = safe_file_copy(
    source="temp_results.pkl",
    destination="final_results.pkl",
    create_backup=True
)

Validation Utilities

from openscope_experimental_launcher.utils.validation_utils import (
    validate_parameter_schema,
    validate_file_paths,
    validate_git_url,
    sanitize_filename
)

# Validate parameter structure
params = {
    "subject_id": "test_mouse",
    "user_id": "researcher",
    "repository_url": "https://github.com/user/repo.git"
}

validation_result = validate_parameter_schema(params)
if not validation_result.is_valid:
    print(f"Validation errors: {validation_result.errors}")

# Validate file paths exist
paths_to_check = [
    "C:/Bonsai/Bonsai.exe",
    "workflow.bonsai",
    "output_directory"
]

path_validation = validate_file_paths(paths_to_check)
missing_paths = [path for path, exists in path_validation.items() if not exists]

if missing_paths:
    print(f"Missing files/directories: {missing_paths}")

Advanced Usage

Custom Configuration Parser

from openscope_experimental_launcher.utils.config_loader import ConfigLoader

class CustomConfigLoader(ConfigLoader):
    """Extended config loader with custom parsing logic."""

    def load_experimental_config(self, config_path):
        """Load config with experimental parameter validation."""
        config = self.load(config_path)

        # Apply custom validation
        self._validate_experimental_parameters(config)

        # Add computed values
        config['computed'] = self._compute_derived_values(config)

        return config

    def _validate_experimental_parameters(self, config):
        """Validate experiment-specific configuration."""
        required_sections = ['stimulus', 'recording', 'output']

        for section in required_sections:
            if section not in config:
                raise ValueError(f"Missing required config section: {section}")

    def _compute_derived_values(self, config):
        """Compute derived configuration values."""
        stimulus_config = config.get('stimulus', {})
        recording_config = config.get('recording', {})

        return {
            'estimated_duration_seconds': stimulus_config.get('trial_count', 100) *
                                        stimulus_config.get('trial_duration', 5),
            'estimated_file_size_mb': recording_config.get('sampling_rate', 1000) *
                                    recording_config.get('channel_count', 64) * 0.001
        }

Repository Caching System

from openscope_experimental_launcher.utils.git_manager import GitManager
import os
import json

class CachedGitManager(GitManager):
    """Git manager with local repository caching."""

    def __init__(self, cache_dir="C:/BonsaiCache"):
        super().__init__()
        self.cache_dir = cache_dir
        self.cache_index_file = os.path.join(cache_dir, "cache_index.json")
        self._load_cache_index()

    def clone_or_update_repository(self, url, commit_hash="main"):
        """Clone repository or update existing cached copy."""
        cache_key = self._get_cache_key(url, commit_hash)
        cached_path = self._get_cached_path(cache_key)

        if self._is_cache_valid(cached_path, commit_hash):
            print(f"Using cached repository: {cached_path}")
            return cached_path

        # Clone to cache
        repo_path = self.clone_repository(url, cached_path, commit_hash)
        self._update_cache_index(cache_key, url, commit_hash, repo_path)

        return repo_path

    def _get_cache_key(self, url, commit_hash):
        """Generate cache key for repository."""
        import hashlib
        key_string = f"{url}#{commit_hash}"
        return hashlib.md5(key_string.encode()).hexdigest()

    def _is_cache_valid(self, cached_path, expected_commit):
        """Check if cached repository is valid and up-to-date."""
        if not os.path.exists(cached_path):
            return False

        try:
            current_commit = self.get_repository_info(cached_path)['commit_hash']
            return current_commit.startswith(expected_commit[:8])  # Short hash match
        except:
            return False

Process Resource Manager

from openscope_experimental_launcher.utils.process_monitor import ProcessMonitor
import psutil
import threading
import time

class ResourceManager:
    """Advanced process resource management."""

    def __init__(self, memory_limit_percent=80, cpu_limit_percent=90):
        self.memory_limit = memory_limit_percent
        self.cpu_limit = cpu_limit_percent
        self.monitors = {}
        self.monitoring_active = False
        self.monitor_thread = None

    def add_process(self, process, name):
        """Add process to resource monitoring."""
        monitor = ProcessMonitor(process)
        self.monitors[name] = {
            'monitor': monitor,
            'process': process,
            'start_time': time.time(),
            'peak_memory': 0,
            'peak_cpu': 0
        }

    def start_monitoring(self, interval=1.0):
        """Start resource monitoring thread."""
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,),
            daemon=True
        )
        self.monitor_thread.start()

    def stop_monitoring(self):
        """Stop resource monitoring."""
        self.monitoring_active = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5.0)

    def get_resource_summary(self):
        """Get summary of resource usage for all monitored processes."""
        summary = {}

        for name, info in self.monitors.items():
            if info['process'].poll() is None:  # Process still running
                current_memory = info['monitor'].get_memory_usage()
                summary[name] = {
                    'status': 'running',
                    'duration_seconds': time.time() - info['start_time'],
                    'current_memory_percent': current_memory['percent'],
                    'peak_memory_percent': info['peak_memory'],
                    'peak_cpu_percent': info['peak_cpu']
                }
            else:
                summary[name] = {
                    'status': 'completed',
                    'duration_seconds': time.time() - info['start_time'],
                    'exit_code': info['process'].returncode
                }

        return summary

    def _monitor_loop(self, interval):
        """Main monitoring loop."""
        while self.monitoring_active:
            for name, info in self.monitors.items():
                if info['process'].poll() is None:  # Still running
                    try:
                        # Check memory usage
                        memory_info = info['monitor'].get_memory_usage()
                        info['peak_memory'] = max(info['peak_memory'],
                                                memory_info['percent'])

                        # Check CPU usage
                        cpu_percent = psutil.Process(info['process'].pid).cpu_percent()
                        info['peak_cpu'] = max(info['peak_cpu'], cpu_percent)

                        # Check limits
                        if memory_info['percent'] > self.memory_limit:
                            print(f"Process {name} exceeded memory limit, terminating")
                            info['monitor'].terminate_process()

                        if cpu_percent > self.cpu_limit:
                            print(f"Process {name} exceeded CPU limit, terminating")
                            info['monitor'].terminate_process()

                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        # Process may have terminated
                        pass

            time.sleep(interval)

Error Handling and Logging

Utility Error Classes

# Custom exception classes for utility modules
class ConfigLoaderError(Exception):
    """Raised when configuration loading fails."""
    pass

class GitManagerError(Exception):
    """Raised when Git operations fail."""
    pass

class ProcessMonitorError(Exception):
    """Raised when process monitoring fails."""
    pass

# Usage example with proper error handling
from openscope_experimental_launcher.utils.config_loader import ConfigLoader, ConfigLoaderError

try:
    config_loader = ConfigLoader()
    config = config_loader.load("nonexistent_config.cfg")
except ConfigLoaderError as e:
    print(f"Configuration loading failed: {e}")
    # Use default configuration
    config = load_default_config()
except Exception as e:
    print(f"Unexpected error: {e}")

Logging Configuration

import logging
from openscope_experimental_launcher.utils.git_manager import GitManager

# Configure logging for utility modules
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Use utilities with logging
git_manager = GitManager()

# Git operations will log progress and errors
try:
    repo_path = git_manager.clone_repository(
        "https://github.com/user/repo.git",
        "local_path"
    )
except Exception as e:
    logging.error(f"Repository cloning failed: {e}")

Testing Utilities

Mock Utilities for Testing

from unittest.mock import Mock, patch
from openscope_experimental_launcher.utils.git_manager import GitManager

def test_git_manager_with_mock():
    """Test GitManager with mocked Git operations."""

    with patch('subprocess.run') as mock_run:
        # Mock successful git clone
        mock_run.return_value.returncode = 0

        git_manager = GitManager()
        result = git_manager.clone_repository(
            "https://github.com/test/repo.git",
            "test_path"
        )

        # Verify git command was called
        mock_run.assert_called()
        assert result == "test_path"

def test_process_monitor_with_mock():
    """Test ProcessMonitor with mocked process."""

    mock_process = Mock()
    mock_process.pid = 12345
    mock_process.poll.return_value = None  # Still running

    from openscope_experimental_launcher.utils.process_monitor import ProcessMonitor

    monitor = ProcessMonitor(mock_process)

    # Mock memory usage
    with patch('psutil.Process') as mock_psutil:
        mock_psutil.return_value.memory_info.return_value.rss = 100 * 1024 * 1024  # 100MB
        mock_psutil.return_value.memory_percent.return_value = 25.0

        memory_info = monitor.get_memory_usage()
        assert memory_info['percent'] == 25.0