Source code for openscope_experimental_launcher.utils.git_manager

"""
Git repository management for OpenScope experimental launchers.

Handles cloning, updating, and version management of workflow repositories.
"""

import os
import logging
import subprocess
import shutil
import stat
from typing import Dict, Any, Optional


[docs] class GitManager: """ Handles Git repository operations for workflow management. """
[docs] def __init__(self): """Initialize the Git manager.""" self.git_available = self._check_git_available()
def _check_git_available(self) -> bool: """Check if Git is available on the system.""" try: subprocess.check_output(['git', '--version'], stderr=subprocess.STDOUT) return True except (subprocess.CalledProcessError, OSError): logging.error("Git is not available on this system. Please install Git to use repository management features.") return False
[docs] def setup_repository(self, params: Dict[str, Any]) -> bool: """ Set up the repository based on parameters. Args: params: Dictionary containing repository configuration Returns: True if successful, False otherwise """ repo_url = params.get('repository_url') commit_hash = params.get('repository_commit_hash', 'main') local_repo_path = params.get('local_repository_path') if not repo_url or not local_repo_path: logging.info("No repository configuration found, skipping repository setup") return True if not self.git_available: return False logging.info(f"Setting up repository: {repo_url}") logging.info(f"Target commit: {commit_hash}") logging.info(f"Local path: {local_repo_path}") # Determine repository name from URL repo_name = self._get_repo_name_from_url(repo_url) repo_full_path = os.path.join(local_repo_path, repo_name) # Check if repository already exists if os.path.exists(repo_full_path): if os.path.exists(os.path.join(repo_full_path, '.git')): logging.info("Repository already exists, checking commit hash") if self._is_on_target_commit(repo_full_path, commit_hash): logging.info("Repository is already at the correct commit") return True else: logging.info("Repository needs to be updated") if self._update_repository(repo_full_path, commit_hash): logging.info("Repository updated successfully") return True else: logging.warning("Failed to update repository, will try fresh clone") if not self._force_remove_directory(repo_full_path): logging.error("Failed to remove existing repository for fresh clone") return False else: logging.info("Directory exists but is not a Git repository, removing it") if not self._force_remove_directory(repo_full_path): logging.error("Failed to remove existing directory") return False # Clone the repository if not self._clone_repository(repo_url, repo_full_path): return False # Checkout specific commit if not 'main' if commit_hash != 'main': if not self._checkout_commit(repo_full_path, commit_hash): return False logging.info("Repository setup completed successfully") return True
[docs] def get_repository_path(self, params: Dict[str, Any]) -> Optional[str]: """ Get the full path to the cloned repository. Args: params: Dictionary containing repository configuration Returns: Path to repository or None if not configured """ local_repo_path = params.get('local_repository_path') repo_url = params.get('repository_url') if not local_repo_path or not repo_url: return None repo_name = self._get_repo_name_from_url(repo_url) return os.path.join(local_repo_path, repo_name)
def _get_repo_name_from_url(self, repo_url: str) -> str: """Extract repository name from Git URL.""" # Remove trailing slash if present repo_url = repo_url.rstrip('/') # Handle both HTTPS and SSH URLs if repo_url.endswith('.git'): repo_name = os.path.basename(repo_url)[:-4] else: repo_name = os.path.basename(repo_url) return repo_name def _get_current_commit_hash(self, repo_path: str) -> Optional[str]: """Get the current commit hash of a Git repository.""" try: original_dir = os.getcwd() os.chdir(repo_path) commit_hash = subprocess.check_output( ['git', 'rev-parse', 'HEAD'], stderr=subprocess.STDOUT ).decode().strip() return commit_hash except (subprocess.CalledProcessError, OSError) as e: logging.warning(f"Failed to get current commit hash: {e}") return None finally: os.chdir(original_dir) def _get_remote_commit_hash(self, repo_path: str, branch: str = 'main') -> Optional[str]: """Get the latest commit hash from the remote repository for a specific branch.""" try: original_dir = os.getcwd() os.chdir(repo_path) # Fetch latest changes from remote subprocess.check_call(['git', 'fetch', 'origin'], stderr=subprocess.STDOUT) # Get the commit hash of the remote branch remote_commit = subprocess.check_output( ['git', 'rev-parse', f'origin/{branch}'], stderr=subprocess.STDOUT ).decode().strip() return remote_commit except (subprocess.CalledProcessError, OSError) as e: logging.warning(f"Failed to get remote commit hash for {branch}: {e}") return None finally: os.chdir(original_dir) def _is_on_target_commit(self, repo_path: str, target_commit: str) -> bool: """Check if the repository is on the target commit.""" if target_commit == 'main': # For main branch, check against remote origin/main current_hash = self._get_current_commit_hash(repo_path) remote_hash = self._get_remote_commit_hash(repo_path, 'main') if current_hash and remote_hash: if current_hash == remote_hash: logging.info("Repository is on the latest commit of main branch") return True else: logging.info("Repository is not on the latest commit") logging.info(f"Current: {current_hash[:8]}, Latest remote: {remote_hash[:8]}") return False else: logging.warning("Could not compare commit hashes") return False else: # For specific commit hash, check if current commit matches current_hash = self._get_current_commit_hash(repo_path) if current_hash and current_hash.startswith(target_commit): logging.info(f"Repository is at the specified commit: {target_commit}") return True else: logging.info("Repository is not at the specified commit") logging.info(f"Current: {current_hash[:8] if current_hash else 'unknown'}, Required: {target_commit}") return False def _clone_repository(self, repo_url: str, local_path: str) -> bool: """Clone a Git repository to the specified local path.""" try: logging.info(f"Cloning repository {repo_url} to {local_path}") # Create parent directory if it doesn't exist parent_dir = os.path.dirname(local_path) if not os.path.exists(parent_dir): os.makedirs(parent_dir) # Clone the repository subprocess.check_call( ['git', 'clone', repo_url, local_path], stderr=subprocess.STDOUT ) logging.info("Repository cloned successfully") return True except subprocess.CalledProcessError as e: logging.error(f"Failed to clone repository: {e}") return False except OSError as e: logging.error(f"Git command failed: {e}") return False def _checkout_commit(self, repo_path: str, commit_hash: str) -> bool: """Checkout a specific commit in the repository.""" try: original_dir = os.getcwd() os.chdir(repo_path) logging.info(f"Checking out commit {commit_hash}") # Fetch latest changes first subprocess.check_call(['git', 'fetch'], stderr=subprocess.STDOUT) # Checkout the specific commit subprocess.check_call(['git', 'checkout', commit_hash], stderr=subprocess.STDOUT) logging.info(f"Successfully checked out commit {commit_hash}") return True except subprocess.CalledProcessError as e: logging.error(f"Failed to checkout commit {commit_hash}: {e}") return False except OSError as e: logging.error(f"Git command failed: {e}") return False finally: os.chdir(original_dir) def _update_repository(self, repo_path: str, commit_hash: str) -> bool: """Update an existing repository to the specified commit using Git operations.""" try: original_dir = os.getcwd() os.chdir(repo_path) logging.info(f"Updating existing repository to commit {commit_hash}") # Reset any local changes subprocess.check_call(['git', 'reset', '--hard'], stderr=subprocess.STDOUT) # Fetch latest changes subprocess.check_call(['git', 'fetch', 'origin'], stderr=subprocess.STDOUT) # Checkout the target commit/branch if commit_hash == 'main': subprocess.check_call(['git', 'checkout', 'main'], stderr=subprocess.STDOUT) subprocess.check_call(['git', 'pull', 'origin', 'main'], stderr=subprocess.STDOUT) else: subprocess.check_call(['git', 'checkout', commit_hash], stderr=subprocess.STDOUT) logging.info("Repository updated successfully") return True except subprocess.CalledProcessError as e: logging.error(f"Failed to update repository: {e}") return False except OSError as e: logging.error(f"Git command failed: {e}") return False finally: os.chdir(original_dir) def _force_remove_directory(self, path: str) -> bool: """Force remove a directory, handling Windows file locks.""" def handle_remove_readonly(func, path, exc): """Error handler for Windows readonly files.""" if os.path.exists(path): os.chmod(path, stat.S_IWRITE) func(path) try: logging.info(f"Removing directory: {path}") shutil.rmtree(path, onerror=handle_remove_readonly) return True except Exception as e: logging.error(f"Failed to remove directory {path}: {e}") return False