Getting Started
Core Concepts
BaseOptimizer
The BaseOptimizer
class is the foundation of all optimization algorithms in the Qubots framework. It provides a standardized interface, comprehensive metadata system, and advanced features for algorithm development and integration.
Key Features
- Standardized Interface: Consistent API across all optimization algorithms
- Comprehensive Metadata: Rich algorithm descriptions and capabilities
- Progress Tracking: Built-in support for real-time monitoring and early termination
- Parameter Management: Flexible parameter system with validation
- Integration Ready: Seamless integration with problems, benchmarking, and platform features
File Components
The base_optimizer.py
file defines:
- BaseOptimizer: Abstract base class for all optimization algorithms
- OptimizerMetadata: Comprehensive metadata structure for optimizers
- OptimizationResult: Standardized result format for optimization runs
- Enumerations: Optimizer types, families, and algorithm categories
Core Enumerations
OptimizerType
Categorizes optimization algorithms by their approach:
class OptimizerType(Enum):
EXACT = "exact" # Guaranteed optimal solutions
HEURISTIC = "heuristic" # Good solutions, no optimality guarantee
METAHEURISTIC = "metaheuristic" # High-level heuristic frameworks
HYBRID = "hybrid" # Combination of multiple approaches
MACHINE_LEARNING = "machine_learning" # ML-based optimization
QUANTUM = "quantum" # Quantum optimization algorithms
OptimizerFamily
Groups algorithms by their underlying methodology:
class OptimizerFamily(Enum):
EVOLUTIONARY = "evolutionary" # Genetic algorithms, evolution strategies
SWARM_INTELLIGENCE = "swarm_intelligence" # PSO, ACO, etc.
LOCAL_SEARCH = "local_search" # Hill climbing, simulated annealing
GRADIENT_BASED = "gradient_based" # Gradient descent, Newton methods
TREE_SEARCH = "tree_search" # Branch and bound, A*
CONSTRAINT_PROGRAMMING = "constraint_programming" # CP solvers
LINEAR_PROGRAMMING = "linear_programming" # LP/MIP solvers
REINFORCEMENT_LEARNING = "reinforcement_learning" # RL-based optimization
OptimizerMetadata Class
Comprehensive metadata structure for optimization algorithms:
@dataclass
class OptimizerMetadata:
# Basic information (required)
name: str # Human-readable name
description: str # Detailed description
optimizer_type: OptimizerType # Algorithm category
optimizer_family: OptimizerFamily # Algorithm family
# Authorship and versioning
author: str = "" # Algorithm author/implementer
version: str = "1.0.0" # Version number
license: str = "MIT" # License type
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
# Algorithm characteristics
is_deterministic: bool = True # Whether algorithm is deterministic
supports_constraints: bool = False # Can handle constrained problems
supports_multi_objective: bool = False # Multi-objective optimization
supports_continuous: bool = True # Continuous variable problems
supports_discrete: bool = True # Discrete variable problems
supports_mixed_integer: bool = False # Mixed-integer problems
# Performance characteristics
time_complexity: str = "O(n)" # Time complexity in Big-O notation
space_complexity: str = "O(n)" # Space complexity in Big-O notation
convergence_guaranteed: bool = False # Convergence guarantee
parallel_capable: bool = False # Can utilize multiple cores
# Parameter information
required_parameters: List[str] = field(default_factory=list)
optional_parameters: List[str] = field(default_factory=list)
parameter_ranges: Dict[str, Tuple[float, float]] = field(default_factory=dict)
# Benchmarking and references
typical_problems: List[str] = field(default_factory=list)
benchmark_results: Dict[str, float] = field(default_factory=dict)
reference_papers: List[str] = field(default_factory=list)
OptimizationResult Class
Standardized result format for optimization runs:
@dataclass
class OptimizationResult:
# Core results (required)
best_solution: Any # Best solution found
best_value: float # Best objective value
is_feasible: bool = True # Whether best solution is feasible
# Optimization statistics
iterations: int = 0 # Number of iterations performed
evaluations: int = 0 # Number of function evaluations
runtime_seconds: float = 0.0 # Total runtime in seconds
convergence_achieved: bool = False # Whether convergence was achieved
termination_reason: str = "max_iterations" # Reason for termination
# Detailed tracking
optimization_history: List[Dict[str, Any]] = field(default_factory=list)
parameter_values: Dict[str, Any] = field(default_factory=dict)
additional_metrics: Dict[str, float] = field(default_factory=dict)
# Problem and optimizer metadata
problem_metadata: Optional[Dict[str, Any]] = None
optimizer_metadata: Optional[Dict[str, Any]] = None
BaseOptimizer Class
The BaseOptimizer
class provides the foundation for all optimization algorithms in the qubots framework.
Constructor
def __init__(self, metadata: Optional[OptimizerMetadata] = None, **parameters):
"""
Initialize the optimizer with metadata and parameters.
Args:
metadata: Optimizer metadata. If None, subclasses should provide default metadata.
**parameters: Algorithm-specific parameters
"""
Internal State:
_metadata
: Optimizer metadata containing algorithm information_parameters
: Algorithm-specific parameters dictionary_run_count
: Number of optimization runs performed_total_runtime
: Total runtime across all runs_instance_id
: Unique identifier for this optimizer instance_is_running
: Whether optimization is currently running_should_stop
: Flag for early termination requests_current_result
: Most recent optimization result_progress_callback
: Optional callback for progress updates_log_callback
: Optional callback for real-time logging
Abstract Methods
Subclasses must implement these two abstract methods to create a functional optimizer:
_get_default_metadata()
@abstractmethod
def _get_default_metadata(self) -> OptimizerMetadata:
"""
Return default metadata for this optimizer.
Subclasses must implement this method to provide algorithm information.
Returns:
OptimizerMetadata: Complete metadata describing the algorithm
Example:
return OptimizerMetadata(
name="My Algorithm",
description="Description of the algorithm",
optimizer_type=OptimizerType.HEURISTIC,
optimizer_family=OptimizerFamily.LOCAL_SEARCH,
is_deterministic=False,
supports_continuous=True,
time_complexity="O(n²)",
required_parameters=["max_iterations"],
optional_parameters=["tolerance"]
)
"""
pass
_optimize_implementation()
@abstractmethod
def _optimize_implementation(self, problem: BaseProblem,
initial_solution: Optional[Any] = None) -> OptimizationResult:
"""
Core optimization implementation that performs the actual optimization.
This method should:
1. Extract parameters from self._parameters
2. Implement the optimization algorithm
3. Use problem.evaluate_solution() to evaluate candidates
4. Check self._should_stop for early termination
5. Call self.report_progress() for progress updates
6. Return comprehensive OptimizationResult
Args:
problem: The optimization problem to solve (BaseProblem instance)
initial_solution: Optional initial solution to start from
Returns:
OptimizationResult: Comprehensive result with solution, metrics, and metadata
Raises:
ValueError: If problem is incompatible with this optimizer
RuntimeError: If optimization fails due to algorithm-specific issues
"""
pass
Public Methods
optimize()
Main optimization interface with comprehensive tracking and control:
def optimize(self, problem: BaseProblem,
initial_solution: Optional[Any] = None,
progress_callback: Optional[Callable] = None,
log_callback: Optional[Callable] = None,
**kwargs) -> OptimizationResult:
"""
Main optimization interface with comprehensive tracking and control.
This method handles:
- Parameter validation and merging
- Problem compatibility checking
- Progress and logging callbacks
- Runtime measurement
- Result finalization with metadata
Args:
problem: The optimization problem to solve (BaseProblem instance)
initial_solution: Optional initial solution to start optimization from
progress_callback: Optional callback function for progress updates
Signature: callback(progress_data: Dict[str, Any])
log_callback: Optional callback for real-time logging
Signature: callback(level: str, message: str, source: str)
**kwargs: Additional parameters that override instance parameters
Returns:
OptimizationResult: Comprehensive result with solution, metrics, and metadata
Raises:
ValueError: If problem is incompatible or parameters are invalid
"""
optimize_legacy()
Legacy interface for backward compatibility:
def optimize_legacy(self, problem: BaseProblem,
initial_solution: Optional[Any] = None,
**kwargs) -> Tuple[Any, float]:
"""
Legacy optimization interface for backward compatibility.
Args:
problem: The optimization problem to solve
initial_solution: Optional initial solution
**kwargs: Additional parameters
Returns:
Tuple of (best_solution, best_value)
"""
Control Methods
def stop_optimization(self) -> None:
"""Request early termination of current optimization."""
def is_running(self) -> bool:
"""Check if optimization is currently running."""
def should_stop(self) -> bool:
"""Check if optimization should stop (for use in _optimize_implementation)."""
Progress and Logging
def report_progress(self, iteration: int, best_value: float, **metrics):
"""
Report progress to callback if available.
Call this from _optimize_implementation to provide real-time updates.
"""
def log_message(self, level: str, message: str, **context):
"""
Log a message during optimization if callback is available.
Levels: 'debug', 'info', 'warning', 'error'
"""
Parameter Management
def get_parameter_info(self) -> Dict[str, Any]:
"""
Get information about optimizer parameters.
Returns:
Dictionary with required_parameters, optional_parameters,
parameter_ranges, and current_values
"""
def set_parameters(self, **parameters):
"""Update optimizer parameters with validation."""
def get_parameter(self, name: str, default: Any = None) -> Any:
"""Get a specific parameter value."""
Statistics and Information
def get_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive statistics about optimizer usage.
Returns:
Dictionary with run_count, total_runtime, average_runtime,
metadata, current_parameters, and more
"""
def reset_statistics(self):
"""Reset optimization statistics."""
def export_optimizer_definition(self) -> Dict[str, Any]:
"""Export complete optimizer definition for sharing or storage."""
Properties
@property
def metadata(self) -> OptimizerMetadata:
"""Get optimizer metadata."""
@property
def parameters(self) -> Dict[str, Any]:
"""Get current optimizer parameters (copy)."""
@property
def run_count(self) -> int:
"""Get number of optimization runs performed."""
@property
def average_runtime_seconds(self) -> float:
"""Get average runtime in seconds across all runs."""
@property
def instance_id(self) -> str:
"""Get unique instance identifier."""
@property
def current_result(self) -> Optional[OptimizationResult]:
"""Get the most recent optimization result."""
Practical Usage Example
This section demonstrates how to create a complete optimizer implementation with a concrete mathematical optimization problem.
Problem: Quadratic Function Minimization
Let’s create a simple quadratic function minimization problem and solve it with a custom optimizer:
from qubots import BaseProblem, BaseOptimizer, OptimizerMetadata, OptimizationResult
from qubots import ProblemMetadata, ProblemType, ObjectiveType, EvaluationResult
from qubots import OptimizerType, OptimizerFamily
import random
from typing import List, Optional, Any
# Step 1: Define the optimization problem
class QuadraticProblem(BaseProblem):
"""
Minimize f(x) = sum(a_i * x_i^2 + b_i * x_i + c_i) for i in range(dimension)
Subject to: -10 <= x_i <= 10 for all i
"""
def __init__(self, dimension: int = 5, coefficients: Optional[List] = None):
self.dimension = dimension
# Default coefficients: a=1, b=0, c=0 for each dimension
self.coefficients = coefficients or [(1.0, 0.0, 0.0)] * dimension
super().__init__()
def _get_default_metadata(self) -> ProblemMetadata:
return ProblemMetadata(
name=f"Quadratic Function ({self.dimension}D)",
description=f"Minimize quadratic function in {self.dimension} dimensions",
problem_type=ProblemType.CONTINUOUS,
objective_type=ObjectiveType.MINIMIZE,
dimension=self.dimension,
constraints_count=self.dimension * 2, # Box constraints
)
def evaluate_solution(self, solution: List[float]) -> EvaluationResult:
"""Evaluate the quadratic function at given point."""
if len(solution) != self.dimension:
raise ValueError(f"Solution must have {self.dimension} dimensions")
# Calculate objective value
objective_value = 0.0
for i, (a, b, c) in enumerate(self.coefficients):
x = solution[i]
objective_value += a * x**2 + b * x + c
# Check constraints (box constraints: -10 <= x_i <= 10)
constraint_violations = []
for i, x in enumerate(solution):
if x < -10 or x > 10:
constraint_violations.append(f"x[{i}] = {x:.3f} violates bounds [-10, 10]")
return EvaluationResult(
objective_value=objective_value,
is_feasible=len(constraint_violations) == 0,
constraint_violations=constraint_violations
)
def generate_random_solution(self) -> List[float]:
"""Generate a random feasible solution."""
return [random.uniform(-10, 10) for _ in range(self.dimension)]
def get_neighbor_solution(self, solution: List[float], step_size: float = 1.0) -> List[float]:
"""Generate a neighboring solution for local search."""
neighbor = solution.copy()
# Choose strategy: modify one dimension (70%) or multiple dimensions (30%)
if random.random() < 0.7:
# Modify one random dimension
idx = random.randint(0, self.dimension - 1)
neighbor[idx] += random.uniform(-step_size, step_size)
neighbor[idx] = max(-10, min(10, neighbor[idx]))
else:
# Modify multiple dimensions with smaller steps
num_dims = random.randint(1, min(3, self.dimension))
indices = random.sample(range(self.dimension), num_dims)
for idx in indices:
neighbor[idx] += random.uniform(-step_size * 0.5, step_size * 0.5)
neighbor[idx] = max(-10, min(10, neighbor[idx]))
return neighbor
Step 2: Implement a Hill Climbing Optimizer
class HillClimbingOptimizer(BaseOptimizer):
"""
Simple hill climbing optimizer for continuous problems.
Uses random restarts and adaptive step size.
"""
def __init__(self, max_iterations: int = 1000, step_size: float = 1.0,
tolerance: float = 1e-6, restarts: int = 5, **kwargs):
super().__init__(
max_iterations=max_iterations,
step_size=step_size,
tolerance=tolerance,
restarts=restarts,
**kwargs
)
def _get_default_metadata(self) -> OptimizerMetadata:
return OptimizerMetadata(
name="Hill Climbing with Restarts",
description="Hill climbing algorithm with random restarts and adaptive step size",
optimizer_type=OptimizerType.HEURISTIC,
optimizer_family=OptimizerFamily.LOCAL_SEARCH,
is_deterministic=False,
supports_continuous=True,
supports_discrete=False,
supports_constraints=True,
time_complexity="O(iterations * restarts)",
space_complexity="O(dimension)",
required_parameters=["max_iterations"],
optional_parameters=["step_size", "tolerance", "restarts"],
parameter_ranges={
"max_iterations": (1, 100000),
"step_size": (0.001, 10.0),
"tolerance": (1e-12, 1e-3),
"restarts": (1, 100)
}
)
def _optimize_implementation(self, problem: BaseProblem,
initial_solution: Optional[Any] = None) -> OptimizationResult:
# Extract parameters
max_iterations = self.get_parameter('max_iterations', 1000)
step_size = self.get_parameter('step_size', 1.0)
tolerance = self.get_parameter('tolerance', 1e-6)
restarts = self.get_parameter('restarts', 5)
best_solution = None
best_value = float('inf')
total_evaluations = 0
optimization_history = []
total_iterations = 0
# Multiple restarts
for restart in range(restarts):
if self.should_stop():
break
# Initialize for this restart
if initial_solution is not None and restart == 0:
current_solution = initial_solution.copy()
else:
current_solution = problem.generate_random_solution()
current_result = problem.evaluate_solution(current_solution)
current_value = current_result.objective_value
total_evaluations += 1
# Track best across all restarts
if current_value < best_value:
best_solution = current_solution.copy()
best_value = current_value
current_step_size = step_size
no_improvement_count = 0
restart_iterations = max_iterations // restarts
# Hill climbing for this restart
for iteration in range(restart_iterations):
if self.should_stop():
break
# Try multiple neighbors and pick the best
best_neighbor = None
best_neighbor_value = current_value
# Generate and evaluate multiple neighbors
for _ in range(5): # Try 5 neighbors per iteration
neighbor = problem.get_neighbor_solution(current_solution, current_step_size)
neighbor_result = problem.evaluate_solution(neighbor)
neighbor_value = neighbor_result.objective_value
total_evaluations += 1
if neighbor_value < best_neighbor_value:
best_neighbor = neighbor
best_neighbor_value = neighbor_value
# Accept if better neighbor found
if best_neighbor is not None and best_neighbor_value < current_value:
current_solution = best_neighbor
current_value = best_neighbor_value
no_improvement_count = 0
# Update global best
if current_value < best_value:
best_solution = current_solution.copy()
best_value = current_value
else:
no_improvement_count += 1
# Adaptive step size - more aggressive reduction
if no_improvement_count > 5:
current_step_size *= 0.8
no_improvement_count = 0
# If step size becomes too small, try a larger step
if current_step_size < 0.001:
current_step_size = step_size * 0.1
# Record progress
global_iteration = restart * restart_iterations + iteration
optimization_history.append({
"iteration": global_iteration,
"restart": restart,
"best_value": best_value,
"current_value": current_value,
"step_size": current_step_size
})
# Progress reporting
self.report_progress(
iteration=global_iteration,
best_value=best_value,
current_value=current_value,
restart=restart,
step_size=current_step_size
)
# Convergence check - check if we're close to optimal
if best_value < tolerance:
total_iterations = global_iteration + 1
break
# Early termination if no improvement for too long
if no_improvement_count > 50:
break
total_iterations = restart * restart_iterations + iteration + 1
# If we found a very good solution, no need for more restarts
if best_value < tolerance:
break
# Final evaluation to get detailed result
final_result = problem.evaluate_solution(best_solution)
return OptimizationResult(
best_solution=best_solution,
best_value=best_value,
is_feasible=final_result.is_feasible,
iterations=total_iterations,
evaluations=total_evaluations,
convergence_achieved=best_value < tolerance,
termination_reason="convergence" if best_value < tolerance else "max_iterations",
optimization_history=optimization_history,
parameter_values=self.parameters,
additional_metrics={
"restarts_completed": restart + 1,
"final_step_size": current_step_size
}
)
Step 3: Running the Optimization
def main():
# Create the problem instance
# Minimize f(x) = x1^2 + 2*x2^2 + x3^2 + x4^2 + x5^2
# Optimal solution: [0, 0, 0, 0, 0] with value 0
coefficients = [(1.0, 0.0, 0.0), (2.0, 0.0, 0.0), (1.0, 0.0, 0.0),
(1.0, 0.0, 0.0), (1.0, 0.0, 0.0)]
problem = QuadraticProblem(dimension=5, coefficients=coefficients)
# Create the optimizer with better parameters
optimizer = HillClimbingOptimizer(
max_iterations=5000,
step_size=2.0, # Start with larger steps
tolerance=1e-6, # More strict tolerance
restarts=10 # More restarts for better exploration
)
# Set up progress tracking
def progress_callback(progress_data):
iteration = progress_data['iteration']
best_value = progress_data['best_value']
if iteration % 50 == 0: # More frequent updates
print(f"Iteration {iteration}: Best value = {best_value:.6f}")
def log_callback(level, message, source):
print(f"[{level.upper()}] {source}: {message}")
# Run optimization
print("Starting optimization...")
print(f"Problem: {problem.metadata.name}")
print(f"Optimizer: {optimizer.metadata.name}")
print(f"Parameters: {optimizer.parameters}")
print("-" * 50)
result = optimizer.optimize(
problem=problem,
progress_callback=progress_callback,
log_callback=log_callback
)
# Display results
print("\nOptimization completed!")
print(f"Best solution: {[f'{x:.6f}' for x in result.best_solution]}")
print(f"Best value: {result.best_value:.6f}")
print(f"Is feasible: {result.is_feasible}")
print(f"Iterations: {result.iterations}")
print(f"Evaluations: {result.evaluations}")
print(f"Runtime: {result.runtime_seconds:.3f} seconds")
print(f"Convergence achieved: {result.convergence_achieved}")
print(f"Termination reason: {result.termination_reason}")
# Additional metrics
if result.additional_metrics:
print(f"Restarts completed: {result.additional_metrics.get('restarts_completed', 'N/A')}")
print(f"Final step size: {result.additional_metrics.get('final_step_size', 'N/A'):.6f}")
# Optimizer statistics
stats = optimizer.get_statistics()
print(f"\nOptimizer statistics:")
print(f"Total runs: {stats['run_count']}")
print(f"Average runtime: {stats['average_runtime_seconds']:.3f} seconds")
if __name__ == "__main__":
main()
This example demonstrates:
- Complete implementation of both problem and optimizer
- Proper parameter handling with validation and defaults
- Progress tracking with callbacks
- Comprehensive results with detailed metrics
- Integration with the qubots framework architecture
Implementation Guidelines
Creating a New Optimizer
Follow these steps to create a robust optimizer implementation:
- Inherit from BaseOptimizer
- Implement required abstract methods
- Define comprehensive metadata
- Handle parameters properly
- Implement progress tracking
- Add error handling
Best Practices
Metadata Definition
def _get_default_metadata(self) -> OptimizerMetadata:
return OptimizerMetadata(
name="My Algorithm",
description="Detailed description of what the algorithm does",
optimizer_type=OptimizerType.HEURISTIC, # Choose appropriate type
optimizer_family=OptimizerFamily.LOCAL_SEARCH, # Choose appropriate family
# Specify capabilities accurately
supports_continuous=True,
supports_discrete=False,
supports_constraints=False,
# Provide complexity information
time_complexity="O(n²)",
space_complexity="O(n)",
# Document parameters
required_parameters=["max_iterations"],
optional_parameters=["tolerance", "step_size"],
parameter_ranges={
"max_iterations": (1, 100000),
"tolerance": (1e-12, 1e-3)
}
)
Parameter Handling
def _optimize_implementation(self, problem, initial_solution=None):
# Extract parameters with defaults
max_iterations = self.get_parameter('max_iterations', 1000)
tolerance = self.get_parameter('tolerance', 1e-6)
# Validate parameter values
if max_iterations <= 0:
raise ValueError("max_iterations must be positive")
# Use parameters in algorithm...
Progress Reporting
def _optimize_implementation(self, problem, initial_solution=None):
for iteration in range(max_iterations):
if self.should_stop(): # Check for early termination
break
# ... optimization logic ...
# Report progress every N iterations
if iteration % 10 == 0:
self.report_progress(
iteration=iteration,
best_value=best_value,
current_value=current_value,
# Add custom metrics
temperature=current_temperature,
acceptance_rate=acceptance_rate
)
# Log important events
if new_best_found:
self.log_message('info', f'New best solution found: {best_value}')
Error Handling
def _optimize_implementation(self, problem, initial_solution=None):
try:
# Validate initial solution if provided
if initial_solution is not None:
if not problem.validate_solution_format(initial_solution):
raise ValueError("Invalid initial solution format")
# ... optimization logic ...
except Exception as e:
# Log error and re-raise with context
self.log_message('error', f'Optimization failed: {str(e)}')
raise RuntimeError(f"Optimization failed in {self.metadata.name}: {str(e)}") from e
Performance Considerations
- Minimize Framework Overhead: The framework adds minimal overhead during optimization
- Efficient Memory Usage: Store only necessary history and intermediate results
- Optional Callbacks: Progress and logging callbacks are optional and don’t impact performance when not used
- Lazy Evaluation: Expensive computations should be performed only when needed
Next Steps
- Key Features
- File Components
- Core Enumerations
- OptimizerType
- OptimizerFamily
- OptimizerMetadata Class
- OptimizationResult Class
- BaseOptimizer Class
- Constructor
- Abstract Methods
- _get_default_metadata()
- _optimize_implementation()
- Public Methods
- optimize()
- optimize_legacy()
- Control Methods
- Progress and Logging
- Parameter Management
- Statistics and Information
- Properties
- Practical Usage Example
- Problem: Quadratic Function Minimization
- Step 2: Implement a Hill Climbing Optimizer
- Step 3: Running the Optimization
- Implementation Guidelines
- Creating a New Optimizer
- Best Practices
- Metadata Definition
- Parameter Handling
- Progress Reporting
- Error Handling
- Performance Considerations
- Next Steps