Key Features

  • Standardized Interface: Consistent API across all optimization algorithms
  • Comprehensive Metadata: Rich algorithm descriptions and capabilities
  • Progress Tracking: Built-in support for real-time monitoring and early termination
  • Parameter Management: Flexible parameter system with validation
  • Integration Ready: Seamless integration with problems, benchmarking, and platform features

File Components

The base_optimizer.py file defines:

  • BaseOptimizer: Abstract base class for all optimization algorithms
  • OptimizerMetadata: Comprehensive metadata structure for optimizers
  • OptimizationResult: Standardized result format for optimization runs
  • Enumerations: Optimizer types, families, and algorithm categories

Core Enumerations

OptimizerType

Categorizes optimization algorithms by their approach:

class OptimizerType(Enum):
    EXACT = "exact"                    # Guaranteed optimal solutions
    HEURISTIC = "heuristic"           # Good solutions, no optimality guarantee
    METAHEURISTIC = "metaheuristic"   # High-level heuristic frameworks
    HYBRID = "hybrid"                 # Combination of multiple approaches
    MACHINE_LEARNING = "machine_learning"  # ML-based optimization
    QUANTUM = "quantum"               # Quantum optimization algorithms

OptimizerFamily

Groups algorithms by their underlying methodology:

class OptimizerFamily(Enum):
    EVOLUTIONARY = "evolutionary"      # Genetic algorithms, evolution strategies
    SWARM_INTELLIGENCE = "swarm_intelligence"  # PSO, ACO, etc.
    LOCAL_SEARCH = "local_search"     # Hill climbing, simulated annealing
    GRADIENT_BASED = "gradient_based" # Gradient descent, Newton methods
    TREE_SEARCH = "tree_search"       # Branch and bound, A*
    CONSTRAINT_PROGRAMMING = "constraint_programming"  # CP solvers
    LINEAR_PROGRAMMING = "linear_programming"  # LP/MIP solvers
    REINFORCEMENT_LEARNING = "reinforcement_learning"  # RL-based optimization

OptimizerMetadata Class

Comprehensive metadata structure for optimization algorithms:

@dataclass
class OptimizerMetadata:
    # Basic information (required)
    name: str                          # Human-readable name
    description: str                   # Detailed description
    optimizer_type: OptimizerType     # Algorithm category
    optimizer_family: OptimizerFamily # Algorithm family

    # Authorship and versioning
    author: str = ""                  # Algorithm author/implementer
    version: str = "1.0.0"           # Version number
    license: str = "MIT"             # License type
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)

    # Algorithm characteristics
    is_deterministic: bool = True     # Whether algorithm is deterministic
    supports_constraints: bool = False # Can handle constrained problems
    supports_multi_objective: bool = False # Multi-objective optimization
    supports_continuous: bool = True  # Continuous variable problems
    supports_discrete: bool = True    # Discrete variable problems
    supports_mixed_integer: bool = False # Mixed-integer problems

    # Performance characteristics
    time_complexity: str = "O(n)"     # Time complexity in Big-O notation
    space_complexity: str = "O(n)"    # Space complexity in Big-O notation
    convergence_guaranteed: bool = False # Convergence guarantee
    parallel_capable: bool = False    # Can utilize multiple cores

    # Parameter information
    required_parameters: List[str] = field(default_factory=list)
    optional_parameters: List[str] = field(default_factory=list)
    parameter_ranges: Dict[str, Tuple[float, float]] = field(default_factory=dict)

    # Benchmarking and references
    typical_problems: List[str] = field(default_factory=list)
    benchmark_results: Dict[str, float] = field(default_factory=dict)
    reference_papers: List[str] = field(default_factory=list)

OptimizationResult Class

Standardized result format for optimization runs:

@dataclass
class OptimizationResult:
    # Core results (required)
    best_solution: Any                # Best solution found
    best_value: float                # Best objective value
    is_feasible: bool = True         # Whether best solution is feasible

    # Optimization statistics
    iterations: int = 0              # Number of iterations performed
    evaluations: int = 0             # Number of function evaluations
    runtime_seconds: float = 0.0     # Total runtime in seconds
    convergence_achieved: bool = False # Whether convergence was achieved
    termination_reason: str = "max_iterations" # Reason for termination

    # Detailed tracking
    optimization_history: List[Dict[str, Any]] = field(default_factory=list)
    parameter_values: Dict[str, Any] = field(default_factory=dict)
    additional_metrics: Dict[str, float] = field(default_factory=dict)

    # Problem and optimizer metadata
    problem_metadata: Optional[Dict[str, Any]] = None
    optimizer_metadata: Optional[Dict[str, Any]] = None

BaseOptimizer Class

The BaseOptimizer class provides the foundation for all optimization algorithms in the qubots framework.

Constructor

def __init__(self, metadata: Optional[OptimizerMetadata] = None, **parameters):
    """
    Initialize the optimizer with metadata and parameters.

    Args:
        metadata: Optimizer metadata. If None, subclasses should provide default metadata.
        **parameters: Algorithm-specific parameters
    """

Internal State:

  • _metadata: Optimizer metadata containing algorithm information
  • _parameters: Algorithm-specific parameters dictionary
  • _run_count: Number of optimization runs performed
  • _total_runtime: Total runtime across all runs
  • _instance_id: Unique identifier for this optimizer instance
  • _is_running: Whether optimization is currently running
  • _should_stop: Flag for early termination requests
  • _current_result: Most recent optimization result
  • _progress_callback: Optional callback for progress updates
  • _log_callback: Optional callback for real-time logging

Abstract Methods

Subclasses must implement these two abstract methods to create a functional optimizer:

_get_default_metadata()

@abstractmethod
def _get_default_metadata(self) -> OptimizerMetadata:
    """
    Return default metadata for this optimizer.
    Subclasses must implement this method to provide algorithm information.

    Returns:
        OptimizerMetadata: Complete metadata describing the algorithm

    Example:
        return OptimizerMetadata(
            name="My Algorithm",
            description="Description of the algorithm",
            optimizer_type=OptimizerType.HEURISTIC,
            optimizer_family=OptimizerFamily.LOCAL_SEARCH,
            is_deterministic=False,
            supports_continuous=True,
            time_complexity="O(n²)",
            required_parameters=["max_iterations"],
            optional_parameters=["tolerance"]
        )
    """
    pass

_optimize_implementation()

@abstractmethod
def _optimize_implementation(self, problem: BaseProblem,
                           initial_solution: Optional[Any] = None) -> OptimizationResult:
    """
    Core optimization implementation that performs the actual optimization.

    This method should:
    1. Extract parameters from self._parameters
    2. Implement the optimization algorithm
    3. Use problem.evaluate_solution() to evaluate candidates
    4. Check self._should_stop for early termination
    5. Call self.report_progress() for progress updates
    6. Return comprehensive OptimizationResult

    Args:
        problem: The optimization problem to solve (BaseProblem instance)
        initial_solution: Optional initial solution to start from

    Returns:
        OptimizationResult: Comprehensive result with solution, metrics, and metadata

    Raises:
        ValueError: If problem is incompatible with this optimizer
        RuntimeError: If optimization fails due to algorithm-specific issues
    """
    pass

Public Methods

optimize()

Main optimization interface with comprehensive tracking and control:

def optimize(self, problem: BaseProblem,
           initial_solution: Optional[Any] = None,
           progress_callback: Optional[Callable] = None,
           log_callback: Optional[Callable] = None,
           **kwargs) -> OptimizationResult:
    """
    Main optimization interface with comprehensive tracking and control.

    This method handles:
    - Parameter validation and merging
    - Problem compatibility checking
    - Progress and logging callbacks
    - Runtime measurement
    - Result finalization with metadata

    Args:
        problem: The optimization problem to solve (BaseProblem instance)
        initial_solution: Optional initial solution to start optimization from
        progress_callback: Optional callback function for progress updates
                          Signature: callback(progress_data: Dict[str, Any])
        log_callback: Optional callback for real-time logging
                     Signature: callback(level: str, message: str, source: str)
        **kwargs: Additional parameters that override instance parameters

    Returns:
        OptimizationResult: Comprehensive result with solution, metrics, and metadata

    Raises:
        ValueError: If problem is incompatible or parameters are invalid
    """

optimize_legacy()

Legacy interface for backward compatibility:

def optimize_legacy(self, problem: BaseProblem,
                   initial_solution: Optional[Any] = None,
                   **kwargs) -> Tuple[Any, float]:
    """
    Legacy optimization interface for backward compatibility.

    Args:
        problem: The optimization problem to solve
        initial_solution: Optional initial solution
        **kwargs: Additional parameters

    Returns:
        Tuple of (best_solution, best_value)
    """

Control Methods

def stop_optimization(self) -> None:
    """Request early termination of current optimization."""

def is_running(self) -> bool:
    """Check if optimization is currently running."""

def should_stop(self) -> bool:
    """Check if optimization should stop (for use in _optimize_implementation)."""

Progress and Logging

def report_progress(self, iteration: int, best_value: float, **metrics):
    """
    Report progress to callback if available.
    Call this from _optimize_implementation to provide real-time updates.
    """

def log_message(self, level: str, message: str, **context):
    """
    Log a message during optimization if callback is available.
    Levels: 'debug', 'info', 'warning', 'error'
    """

Parameter Management

def get_parameter_info(self) -> Dict[str, Any]:
    """
    Get information about optimizer parameters.

    Returns:
        Dictionary with required_parameters, optional_parameters,
        parameter_ranges, and current_values
    """

def set_parameters(self, **parameters):
    """Update optimizer parameters with validation."""

def get_parameter(self, name: str, default: Any = None) -> Any:
    """Get a specific parameter value."""

Statistics and Information

def get_statistics(self) -> Dict[str, Any]:
    """
    Get comprehensive statistics about optimizer usage.

    Returns:
        Dictionary with run_count, total_runtime, average_runtime,
        metadata, current_parameters, and more
    """

def reset_statistics(self):
    """Reset optimization statistics."""

def export_optimizer_definition(self) -> Dict[str, Any]:
    """Export complete optimizer definition for sharing or storage."""

Properties

@property
def metadata(self) -> OptimizerMetadata:
    """Get optimizer metadata."""

@property
def parameters(self) -> Dict[str, Any]:
    """Get current optimizer parameters (copy)."""

@property
def run_count(self) -> int:
    """Get number of optimization runs performed."""

@property
def average_runtime_seconds(self) -> float:
    """Get average runtime in seconds across all runs."""

@property
def instance_id(self) -> str:
    """Get unique instance identifier."""

@property
def current_result(self) -> Optional[OptimizationResult]:
    """Get the most recent optimization result."""

Practical Usage Example

This section demonstrates how to create a complete optimizer implementation with a concrete mathematical optimization problem.

Problem: Quadratic Function Minimization

Let’s create a simple quadratic function minimization problem and solve it with a custom optimizer:

from qubots import BaseProblem, BaseOptimizer, OptimizerMetadata, OptimizationResult
from qubots import ProblemMetadata, ProblemType, ObjectiveType, EvaluationResult
from qubots import OptimizerType, OptimizerFamily
import random
from typing import List, Optional, Any

# Step 1: Define the optimization problem
class QuadraticProblem(BaseProblem):
    """
    Minimize f(x) = sum(a_i * x_i^2 + b_i * x_i + c_i) for i in range(dimension)
    Subject to: -10 <= x_i <= 10 for all i
    """

    def __init__(self, dimension: int = 5, coefficients: Optional[List] = None):
        self.dimension = dimension
        # Default coefficients: a=1, b=0, c=0 for each dimension
        self.coefficients = coefficients or [(1.0, 0.0, 0.0)] * dimension
        super().__init__()

    def _get_default_metadata(self) -> ProblemMetadata:
        return ProblemMetadata(
            name=f"Quadratic Function ({self.dimension}D)",
            description=f"Minimize quadratic function in {self.dimension} dimensions",
            problem_type=ProblemType.CONTINUOUS,
            objective_type=ObjectiveType.MINIMIZE,
            dimension=self.dimension,
            constraints_count=self.dimension * 2,  # Box constraints
        )

    def evaluate_solution(self, solution: List[float]) -> EvaluationResult:
        """Evaluate the quadratic function at given point."""
        if len(solution) != self.dimension:
            raise ValueError(f"Solution must have {self.dimension} dimensions")

        # Calculate objective value
        objective_value = 0.0
        for i, (a, b, c) in enumerate(self.coefficients):
            x = solution[i]
            objective_value += a * x**2 + b * x + c

        # Check constraints (box constraints: -10 <= x_i <= 10)
        constraint_violations = []
        for i, x in enumerate(solution):
            if x < -10 or x > 10:
                constraint_violations.append(f"x[{i}] = {x:.3f} violates bounds [-10, 10]")

        return EvaluationResult(
            objective_value=objective_value,
            is_feasible=len(constraint_violations) == 0,
            constraint_violations=constraint_violations
        )

    def generate_random_solution(self) -> List[float]:
        """Generate a random feasible solution."""
        return [random.uniform(-10, 10) for _ in range(self.dimension)]

    def get_neighbor_solution(self, solution: List[float], step_size: float = 1.0) -> List[float]:
        """Generate a neighboring solution for local search."""
        neighbor = solution.copy()

        # Choose strategy: modify one dimension (70%) or multiple dimensions (30%)
        if random.random() < 0.7:
            # Modify one random dimension
            idx = random.randint(0, self.dimension - 1)
            neighbor[idx] += random.uniform(-step_size, step_size)
            neighbor[idx] = max(-10, min(10, neighbor[idx]))
        else:
            # Modify multiple dimensions with smaller steps
            num_dims = random.randint(1, min(3, self.dimension))
            indices = random.sample(range(self.dimension), num_dims)
            for idx in indices:
                neighbor[idx] += random.uniform(-step_size * 0.5, step_size * 0.5)
                neighbor[idx] = max(-10, min(10, neighbor[idx]))

        return neighbor

Step 2: Implement a Hill Climbing Optimizer

class HillClimbingOptimizer(BaseOptimizer):
    """
    Simple hill climbing optimizer for continuous problems.
    Uses random restarts and adaptive step size.
    """

    def __init__(self, max_iterations: int = 1000, step_size: float = 1.0,
                 tolerance: float = 1e-6, restarts: int = 5, **kwargs):
        super().__init__(
            max_iterations=max_iterations,
            step_size=step_size,
            tolerance=tolerance,
            restarts=restarts,
            **kwargs
        )

    def _get_default_metadata(self) -> OptimizerMetadata:
        return OptimizerMetadata(
            name="Hill Climbing with Restarts",
            description="Hill climbing algorithm with random restarts and adaptive step size",
            optimizer_type=OptimizerType.HEURISTIC,
            optimizer_family=OptimizerFamily.LOCAL_SEARCH,
            is_deterministic=False,
            supports_continuous=True,
            supports_discrete=False,
            supports_constraints=True,
            time_complexity="O(iterations * restarts)",
            space_complexity="O(dimension)",
            required_parameters=["max_iterations"],
            optional_parameters=["step_size", "tolerance", "restarts"],
            parameter_ranges={
                "max_iterations": (1, 100000),
                "step_size": (0.001, 10.0),
                "tolerance": (1e-12, 1e-3),
                "restarts": (1, 100)
            }
        )

    def _optimize_implementation(self, problem: BaseProblem,
                               initial_solution: Optional[Any] = None) -> OptimizationResult:
        # Extract parameters
        max_iterations = self.get_parameter('max_iterations', 1000)
        step_size = self.get_parameter('step_size', 1.0)
        tolerance = self.get_parameter('tolerance', 1e-6)
        restarts = self.get_parameter('restarts', 5)

        best_solution = None
        best_value = float('inf')
        total_evaluations = 0
        optimization_history = []
        total_iterations = 0

        # Multiple restarts
        for restart in range(restarts):
            if self.should_stop():
                break

            # Initialize for this restart
            if initial_solution is not None and restart == 0:
                current_solution = initial_solution.copy()
            else:
                current_solution = problem.generate_random_solution()

            current_result = problem.evaluate_solution(current_solution)
            current_value = current_result.objective_value
            total_evaluations += 1

            # Track best across all restarts
            if current_value < best_value:
                best_solution = current_solution.copy()
                best_value = current_value

            current_step_size = step_size
            no_improvement_count = 0
            restart_iterations = max_iterations // restarts

            # Hill climbing for this restart
            for iteration in range(restart_iterations):
                if self.should_stop():
                    break

                # Try multiple neighbors and pick the best
                best_neighbor = None
                best_neighbor_value = current_value

                # Generate and evaluate multiple neighbors
                for _ in range(5):  # Try 5 neighbors per iteration
                    neighbor = problem.get_neighbor_solution(current_solution, current_step_size)
                    neighbor_result = problem.evaluate_solution(neighbor)
                    neighbor_value = neighbor_result.objective_value
                    total_evaluations += 1

                    if neighbor_value < best_neighbor_value:
                        best_neighbor = neighbor
                        best_neighbor_value = neighbor_value

                # Accept if better neighbor found
                if best_neighbor is not None and best_neighbor_value < current_value:
                    current_solution = best_neighbor
                    current_value = best_neighbor_value
                    no_improvement_count = 0

                    # Update global best
                    if current_value < best_value:
                        best_solution = current_solution.copy()
                        best_value = current_value
                else:
                    no_improvement_count += 1

                # Adaptive step size - more aggressive reduction
                if no_improvement_count > 5:
                    current_step_size *= 0.8
                    no_improvement_count = 0

                    # If step size becomes too small, try a larger step
                    if current_step_size < 0.001:
                        current_step_size = step_size * 0.1

                # Record progress
                global_iteration = restart * restart_iterations + iteration
                optimization_history.append({
                    "iteration": global_iteration,
                    "restart": restart,
                    "best_value": best_value,
                    "current_value": current_value,
                    "step_size": current_step_size
                })

                # Progress reporting
                self.report_progress(
                    iteration=global_iteration,
                    best_value=best_value,
                    current_value=current_value,
                    restart=restart,
                    step_size=current_step_size
                )

                # Convergence check - check if we're close to optimal
                if best_value < tolerance:
                    total_iterations = global_iteration + 1
                    break

                # Early termination if no improvement for too long
                if no_improvement_count > 50:
                    break

            total_iterations = restart * restart_iterations + iteration + 1

            # If we found a very good solution, no need for more restarts
            if best_value < tolerance:
                break

        # Final evaluation to get detailed result
        final_result = problem.evaluate_solution(best_solution)

        return OptimizationResult(
            best_solution=best_solution,
            best_value=best_value,
            is_feasible=final_result.is_feasible,
            iterations=total_iterations,
            evaluations=total_evaluations,
            convergence_achieved=best_value < tolerance,
            termination_reason="convergence" if best_value < tolerance else "max_iterations",
            optimization_history=optimization_history,
            parameter_values=self.parameters,
            additional_metrics={
                "restarts_completed": restart + 1,
                "final_step_size": current_step_size
            }
        )

Step 3: Running the Optimization

def main():
    # Create the problem instance
    # Minimize f(x) = x1^2 + 2*x2^2 + x3^2 + x4^2 + x5^2
    # Optimal solution: [0, 0, 0, 0, 0] with value 0
    coefficients = [(1.0, 0.0, 0.0), (2.0, 0.0, 0.0), (1.0, 0.0, 0.0),
                   (1.0, 0.0, 0.0), (1.0, 0.0, 0.0)]
    problem = QuadraticProblem(dimension=5, coefficients=coefficients)

    # Create the optimizer with better parameters
    optimizer = HillClimbingOptimizer(
        max_iterations=5000,
        step_size=2.0,  # Start with larger steps
        tolerance=1e-6,  # More strict tolerance
        restarts=10     # More restarts for better exploration
    )

    # Set up progress tracking
    def progress_callback(progress_data):
        iteration = progress_data['iteration']
        best_value = progress_data['best_value']
        if iteration % 50 == 0:  # More frequent updates
            print(f"Iteration {iteration}: Best value = {best_value:.6f}")

    def log_callback(level, message, source):
        print(f"[{level.upper()}] {source}: {message}")

    # Run optimization
    print("Starting optimization...")
    print(f"Problem: {problem.metadata.name}")
    print(f"Optimizer: {optimizer.metadata.name}")
    print(f"Parameters: {optimizer.parameters}")
    print("-" * 50)

    result = optimizer.optimize(
        problem=problem,
        progress_callback=progress_callback,
        log_callback=log_callback
    )

    # Display results
    print("\nOptimization completed!")
    print(f"Best solution: {[f'{x:.6f}' for x in result.best_solution]}")
    print(f"Best value: {result.best_value:.6f}")
    print(f"Is feasible: {result.is_feasible}")
    print(f"Iterations: {result.iterations}")
    print(f"Evaluations: {result.evaluations}")
    print(f"Runtime: {result.runtime_seconds:.3f} seconds")
    print(f"Convergence achieved: {result.convergence_achieved}")
    print(f"Termination reason: {result.termination_reason}")

    # Additional metrics
    if result.additional_metrics:
        print(f"Restarts completed: {result.additional_metrics.get('restarts_completed', 'N/A')}")
        print(f"Final step size: {result.additional_metrics.get('final_step_size', 'N/A'):.6f}")

    # Optimizer statistics
    stats = optimizer.get_statistics()
    print(f"\nOptimizer statistics:")
    print(f"Total runs: {stats['run_count']}")
    print(f"Average runtime: {stats['average_runtime_seconds']:.3f} seconds")

if __name__ == "__main__":
    main()

This example demonstrates:

  • Complete implementation of both problem and optimizer
  • Proper parameter handling with validation and defaults
  • Progress tracking with callbacks
  • Comprehensive results with detailed metrics
  • Integration with the qubots framework architecture

Implementation Guidelines

Creating a New Optimizer

Follow these steps to create a robust optimizer implementation:

  1. Inherit from BaseOptimizer
  2. Implement required abstract methods
  3. Define comprehensive metadata
  4. Handle parameters properly
  5. Implement progress tracking
  6. Add error handling

Best Practices

Metadata Definition

def _get_default_metadata(self) -> OptimizerMetadata:
    return OptimizerMetadata(
        name="My Algorithm",
        description="Detailed description of what the algorithm does",
        optimizer_type=OptimizerType.HEURISTIC,  # Choose appropriate type
        optimizer_family=OptimizerFamily.LOCAL_SEARCH,  # Choose appropriate family

        # Specify capabilities accurately
        supports_continuous=True,
        supports_discrete=False,
        supports_constraints=False,

        # Provide complexity information
        time_complexity="O(n²)",
        space_complexity="O(n)",

        # Document parameters
        required_parameters=["max_iterations"],
        optional_parameters=["tolerance", "step_size"],
        parameter_ranges={
            "max_iterations": (1, 100000),
            "tolerance": (1e-12, 1e-3)
        }
    )

Parameter Handling

def _optimize_implementation(self, problem, initial_solution=None):
    # Extract parameters with defaults
    max_iterations = self.get_parameter('max_iterations', 1000)
    tolerance = self.get_parameter('tolerance', 1e-6)

    # Validate parameter values
    if max_iterations <= 0:
        raise ValueError("max_iterations must be positive")

    # Use parameters in algorithm...

Progress Reporting

def _optimize_implementation(self, problem, initial_solution=None):
    for iteration in range(max_iterations):
        if self.should_stop():  # Check for early termination
            break

        # ... optimization logic ...

        # Report progress every N iterations
        if iteration % 10 == 0:
            self.report_progress(
                iteration=iteration,
                best_value=best_value,
                current_value=current_value,
                # Add custom metrics
                temperature=current_temperature,
                acceptance_rate=acceptance_rate
            )

        # Log important events
        if new_best_found:
            self.log_message('info', f'New best solution found: {best_value}')

Error Handling

def _optimize_implementation(self, problem, initial_solution=None):
    try:
        # Validate initial solution if provided
        if initial_solution is not None:
            if not problem.validate_solution_format(initial_solution):
                raise ValueError("Invalid initial solution format")

        # ... optimization logic ...

    except Exception as e:
        # Log error and re-raise with context
        self.log_message('error', f'Optimization failed: {str(e)}')
        raise RuntimeError(f"Optimization failed in {self.metadata.name}: {str(e)}") from e

Performance Considerations

  • Minimize Framework Overhead: The framework adds minimal overhead during optimization
  • Efficient Memory Usage: Store only necessary history and intermediate results
  • Optional Callbacks: Progress and logging callbacks are optional and don’t impact performance when not used
  • Lazy Evaluation: Expensive computations should be performed only when needed

Next Steps