Playground Integration

The playground_integration.py file implements the integration between the Qubots framework and the Rastion platform’s cloud playground. It provides cloud execution, real-time monitoring, and seamless integration with the web-based playground interface.

File Purpose

The playground integration provides:

  • Cloud Execution: Run optimizations in the cloud with scalable resources
  • Real-time Monitoring: Live progress tracking and result streaming
  • Web Integration: Seamless integration with the playground web interface
  • Resource Management: Automatic resource allocation and cleanup
  • Result Synchronization: Sync results between local and cloud environments

Core Classes

PlaygroundExecutor

Main class for executing optimizations in the cloud:

class PlaygroundExecutor:
    """
    Executor for running optimizations in the Rastion cloud playground.
    Handles cloud execution, monitoring, and result retrieval.
    """
    
    def __init__(self, client: Optional[RastionClient] = None):
        """
        Initialize playground executor.
        
        Args:
            client: Rastion client for API access (uses global client if None)
        """
        self.client = client or get_global_client()
        self.active_executions: Dict[str, Dict[str, Any]] = {}
        self.execution_callbacks: Dict[str, List[Callable]] = {}

PlaygroundResult

Container for playground execution results:

@dataclass
class PlaygroundResult:
    """Result from playground execution."""
    
    # Execution information
    execution_id: str                    # Unique execution identifier
    status: str                         # "pending", "running", "completed", "failed"
    
    # Optimization results
    optimization_result: Optional[OptimizationResult] = None
    
    # Execution metadata
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    total_runtime_seconds: Optional[float] = None
    
    # Resource usage
    cpu_time_seconds: Optional[float] = None
    memory_usage_mb: Optional[float] = None
    
    # Configuration used
    problem_config: Dict[str, Any] = field(default_factory=dict)
    optimizer_config: Dict[str, Any] = field(default_factory=dict)
    
    # Progress information
    progress_updates: List[Dict[str, Any]] = field(default_factory=list)
    
    # Error information
    error_message: Optional[str] = None
    error_details: Optional[Dict[str, Any]] = None

ModelInfo

Information about models available in the playground:

@dataclass
class ModelInfo:
    """Information about a model in the playground."""
    
    name: str                           # Model name
    username: str                       # Owner username
    model_type: str                     # "problem" or "optimizer"
    description: str                    # Model description
    tags: List[str] = field(default_factory=list)
    
    # Version information
    version: str = "latest"
    available_versions: List[str] = field(default_factory=list)
    
    # Compatibility
    compatible_with: List[str] = field(default_factory=list)
    
    # Usage statistics
    usage_count: int = 0
    average_rating: float = 0.0
    
    # Metadata
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)

Core Functions

execute_playground_optimization

High-level function for cloud optimization:

def execute_playground_optimization(
    problem_name: str,
    problem_username: str,
    optimizer_name: str,
    optimizer_username: str,
    problem_params: Optional[Dict[str, Any]] = None,
    optimizer_params: Optional[Dict[str, Any]] = None,
    execution_config: Optional[Dict[str, Any]] = None,
    progress_callback: Optional[Callable] = None,
    client: Optional[RastionClient] = None
) -> PlaygroundResult:
    """
    Execute optimization in the cloud playground.
    
    Args:
        problem_name: Name of the problem model
        problem_username: Username of problem owner
        optimizer_name: Name of the optimizer model
        optimizer_username: Username of optimizer owner
        problem_params: Parameters for problem configuration
        optimizer_params: Parameters for optimizer configuration
        execution_config: Configuration for cloud execution
        progress_callback: Callback for progress updates
        client: Rastion client (uses global if None)
        
    Returns:
        PlaygroundResult with execution results
        
    Raises:
        PlaygroundError: If execution fails
        AuthenticationError: If not authenticated
        NotFoundError: If models not found
    """

Execution Flow:

  1. Validation: Validate model names and parameters
  2. Authentication: Ensure client is authenticated
  3. Submission: Submit execution request to playground
  4. Monitoring: Monitor execution progress
  5. Result Retrieval: Retrieve and parse results
  6. Cleanup: Clean up cloud resources

get_available_models

Retrieve available models in the playground:

def get_available_models(
    model_type: Optional[str] = None,
    username: Optional[str] = None,
    tags: Optional[List[str]] = None,
    client: Optional[RastionClient] = None
) -> List[ModelInfo]:
    """
    Get list of available models in the playground.
    
    Args:
        model_type: Filter by "problem" or "optimizer"
        username: Filter by username
        tags: Filter by tags
        client: Rastion client
        
    Returns:
        List of ModelInfo objects
    """

monitor_execution

Monitor ongoing execution:

def monitor_execution(
    execution_id: str,
    progress_callback: Optional[Callable] = None,
    poll_interval: float = 2.0,
    client: Optional[RastionClient] = None
) -> PlaygroundResult:
    """
    Monitor playground execution until completion.
    
    Args:
        execution_id: ID of execution to monitor
        progress_callback: Callback for progress updates
        poll_interval: Polling interval in seconds
        client: Rastion client
        
    Returns:
        Final PlaygroundResult
    """

PlaygroundExecutor Methods

execute_async

Asynchronous execution with monitoring:

def execute_async(
    self,
    problem_name: str,
    problem_username: str,
    optimizer_name: str,
    optimizer_username: str,
    problem_params: Optional[Dict[str, Any]] = None,
    optimizer_params: Optional[Dict[str, Any]] = None,
    execution_config: Optional[Dict[str, Any]] = None
) -> str:
    """
    Start asynchronous execution in playground.
    
    Args:
        problem_name: Name of problem model
        problem_username: Problem owner username
        optimizer_name: Name of optimizer model
        optimizer_username: Optimizer owner username
        problem_params: Problem parameters
        optimizer_params: Optimizer parameters
        execution_config: Execution configuration
        
    Returns:
        Execution ID for monitoring
    """

get_execution_status

Check execution status:

def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
    """
    Get current status of execution.
    
    Args:
        execution_id: ID of execution
        
    Returns:
        Status information dictionary
    """

cancel_execution

Cancel running execution:

def cancel_execution(self, execution_id: str) -> bool:
    """
    Cancel a running execution.
    
    Args:
        execution_id: ID of execution to cancel
        
    Returns:
        True if cancellation successful
    """

add_progress_callback

Add callback for progress updates:

def add_progress_callback(
    self,
    execution_id: str,
    callback: Callable[[Dict[str, Any]], None]
) -> None:
    """
    Add progress callback for execution.
    
    Args:
        execution_id: ID of execution
        callback: Callback function for progress updates
    """

Usage Examples

Basic Cloud Execution

from qubots import execute_playground_optimization
import qubots.rastion as rastion

# Authenticate with platform
rastion.authenticate("your_api_token")

# Execute optimization in cloud with logging
def log_callback(level, message, source):
    print(f"[{level}] {source}: {message}")

result = execute_playground_optimization(
    problem_name="tsp_problem",
    problem_username="community",
    optimizer_name="genetic_tsp",
    optimizer_username="research_group",
    problem_params={
        "n_cities": 50,
        "city_distribution": "random"
    },
    optimizer_params={
        "population_size": 100,
        "generations": 500,
        "mutation_rate": 0.1
    },
    log_callback=log_callback
)

print(f"Success: {result['success']}")
if result['success']:
    print(f"Best value: {result['best_value']}")
    print(f"Runtime: {result['execution_time']:.2f} seconds")
    print(f"Iterations: {result.get('iterations', 'N/A')}")

    # Access dashboard data
    if 'dashboard' in result:
        dashboard = result['dashboard']
        print(f"Dashboard metrics: {dashboard.get('metrics', {})}")
else:
    print(f"Error: {result.get('error_message', 'Unknown error')}")

Direct Execution with Real-time Monitoring

from qubots import PlaygroundExecutor

# Create executor with callbacks
def progress_callback(message, progress):
    print(f"Progress: {message} ({progress:.1f}%)")

def log_callback(level, message, source):
    print(f"[{level}] {source}: {message}")

executor = PlaygroundExecutor(
    progress_callback=progress_callback,
    log_callback=log_callback
)

# Execute with real-time monitoring
result = executor.execute_optimization(
    problem_name="maxcut_problem",
    problem_username="examples",
    optimizer_name="ortools_maxcut",
    optimizer_username="examples",
    problem_params={"n_vertices": 30, "density": 0.4},
    optimizer_params={"time_limit": 120.0}
)

# Process results
if result.success:
    print(f"Optimization completed: {result.best_value}")
    print(f"Solution: {result.best_solution}")
    print(f"Metadata: {result.metadata}")
else:
    print(f"Execution failed: {result.error_message}")

Directory-based Execution

from qubots import execute_playground_optimization

# Execute using local model directories
result = execute_playground_optimization(
    problem_dir="./my_tsp_problem",
    optimizer_dir="./my_genetic_optimizer",
    problem_params={"n_cities": 50},
    optimizer_params={"population_size": 100, "generations": 500}
)

print(f"Success: {result['success']}")
if result['success']:
    print(f"Best value: {result['best_value']}")
    print(f"Execution time: {result['execution_time']:.2f}s")

    # Access dashboard data
    if 'dashboard' in result:
        dashboard = result['dashboard']
        print(f"Dashboard plots: {len(dashboard.get('plots', []))}")
        print(f"Dashboard metrics: {dashboard.get('metrics', {})}")

Batch Execution

from qubots import execute_playground_optimization

# Test multiple optimizers on same problem
optimizers = [
    ("genetic_tsp", "algorithms"),
    ("simulated_annealing_tsp", "algorithms"),
    ("ortools_tsp", "algorithms")
]

results = []
for optimizer_name, username in optimizers:
    result = execute_playground_optimization(
        problem_name="tsp_berlin52",
        problem_username="benchmarks",
        optimizer_name=optimizer_name,
        optimizer_username=username
    )

    if result['success']:
        results.append((optimizer_name, result['best_value']))
        print(f"{optimizer_name}: {result['best_value']:.4f}")
    else:
        print(f"{optimizer_name}: Failed - {result.get('error_message', 'Unknown error')}")

# Find best result
if results:
    best_optimizer, best_value = min(results, key=lambda x: x[1])
    print(f"Best optimizer: {best_optimizer} (value: {best_value:.4f})")

Real-time Progress Tracking

from qubots.playground_integration import monitor_execution
import matplotlib.pyplot as plt

# Lists to store progress data
iterations = []
best_values = []

def real_time_callback(update):
    if 'iteration' in update and 'best_value' in update:
        iterations.append(update['iteration'])
        best_values.append(update['best_value'])
        
        # Update plot in real-time
        plt.clf()
        plt.plot(iterations, best_values)
        plt.xlabel('Iteration')
        plt.ylabel('Best Value')
        plt.title('Optimization Progress')
        plt.pause(0.1)

# Start execution and monitor with real-time plotting
execution_id = executor.execute_async(
    problem_name="vrp_problem",
    problem_username="logistics",
    optimizer_name="genetic_vrp",
    optimizer_username="evolutionary_algorithms"
)

plt.ion()  # Turn on interactive mode
result = monitor_execution(execution_id, progress_callback=real_time_callback)
plt.ioff()  # Turn off interactive mode
plt.show()

Integration Points

With Local Development

# Test locally first, then run in cloud
from qubots import AutoProblem, AutoOptimizer

# Local testing
problem = AutoProblem.from_repo("examples/tsp")
optimizer = AutoOptimizer.from_repo("examples/genetic_tsp")
local_result = optimizer.optimize(problem)

# Cloud execution with same configuration
cloud_result = execute_playground_optimization(
    problem_name="tsp",
    problem_username="examples",
    optimizer_name="genetic_tsp",
    optimizer_username="examples",
    problem_params=problem.get_parameters(),
    optimizer_params=optimizer.get_parameters()
)

# Compare results
print(f"Local: {local_result.best_value}")
if cloud_result['success']:
    print(f"Cloud: {cloud_result['best_value']}")
else:
    print(f"Cloud execution failed: {cloud_result.get('error_message', 'Unknown error')}")

With Benchmarking

# Use playground for large-scale benchmarking
from qubots import execute_playground_optimization

# Define problems and optimizers
problems = ["tsp_berlin52", "tsp_kroA100", "tsp_lin318"]
optimizers = ["genetic_tsp", "ortools_tsp", "simulated_annealing_tsp"]

# Run benchmarks in cloud
cloud_results = []
for problem in problems:
    for optimizer in optimizers:
        result = execute_playground_optimization(
            problem_name=problem,
            problem_username="benchmarks",
            optimizer_name=optimizer,
            optimizer_username="algorithms"
        )

        if result['success']:
            cloud_results.append({
                "problem": problem,
                "optimizer": optimizer,
                "best_value": result['best_value'],
                "execution_time": result['execution_time']
            })

# Analyze results
for result in cloud_results:
    print(f"{result['problem']} + {result['optimizer']}: "
          f"{result['best_value']:.4f} ({result['execution_time']:.2f}s)")

# Find best performers
best_by_problem = {}
for result in cloud_results:
    problem = result['problem']
    if problem not in best_by_problem or result['best_value'] < best_by_problem[problem]['best_value']:
        best_by_problem[problem] = result

print("\nBest performers by problem:")
for problem, result in best_by_problem.items():
    print(f"{problem}: {result['optimizer']} ({result['best_value']:.4f})")

With Leaderboard Submission

# Use playground results for leaderboard submission
from qubots import execute_playground_optimization, PlaygroundExecutor

# Create executor for leaderboard-eligible execution
executor = PlaygroundExecutor()

# Run optimization on standardized problem
result = executor.execute_optimization(
    problem_name="standardized_tsp_1",
    problem_username="standardized",
    optimizer_name="my_advanced_tsp",
    optimizer_username="my_username"
)

# Submit to leaderboard if successful and eligible
if result.success and result.leaderboard_eligible:
    submission_result = executor.submit_to_leaderboard(
        result=result,
        solver_repository="my_username/my_advanced_tsp",
        solver_config={"population_size": 100, "generations": 1000},
        solver_version="v1.2.0"
    )

    if submission_result:
        print(f"Successfully submitted to leaderboard: {submission_result.get('id', 'unknown')}")
    else:
        print("Leaderboard submission failed")

Error Handling

Result-based Error Handling

from qubots import execute_playground_optimization

result = execute_playground_optimization(
    problem_name="large_tsp",
    problem_username="benchmarks",
    optimizer_name="memory_intensive_optimizer",
    optimizer_username="research"
)

# Check for errors in result
if not result['success']:
    error_message = result.get('error_message', 'Unknown error')
    error_type = result.get('error_type', 'Unknown')

    print(f"Execution failed: {error_message}")
    print(f"Error type: {error_type}")

    # Handle specific error types
    if 'timeout' in error_message.lower():
        print("Consider reducing problem size or increasing timeout")
    elif 'memory' in error_message.lower():
        print("Try a less memory-intensive optimizer")
    elif 'authentication' in error_message.lower():
        print("Check your API token and permissions")
    elif 'not found' in error_message.lower():
        print("Verify model names and usernames")
else:
    print(f"Execution successful: {result['best_value']}")

Robust Execution with Retry

import time
from typing import Dict, Any

def execute_with_retry(max_retries: int = 3, **kwargs) -> Dict[str, Any]:
    """Execute with exponential backoff retry."""
    for attempt in range(max_retries):
        result = execute_playground_optimization(**kwargs)

        if result['success']:
            return result

        error_message = result.get('error_message', '')

        # Don't retry on certain errors
        if any(term in error_message.lower() for term in ['authentication', 'not found', 'permission']):
            print(f"Non-retryable error: {error_message}")
            return result

        if attempt < max_retries - 1:
            wait_time = 2 ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            time.sleep(wait_time)
        else:
            print(f"All {max_retries} attempts failed")
            return result

# Usage
result = execute_with_retry(
    max_retries=3,
    problem_name="tsp_berlin52",
    problem_username="benchmarks",
    optimizer_name="genetic_tsp",
    optimizer_username="algorithms"
)

Developer Notes

Design Decisions

  1. Asynchronous by Default: All executions are asynchronous for scalability
  2. Real-time Monitoring: Live progress updates for better user experience
  3. Resource Management: Automatic resource allocation and cleanup
  4. Error Recovery: Comprehensive error handling and recovery mechanisms

Performance Considerations

  • Connection Pooling: Efficient HTTP connection management
  • Streaming: Real-time result streaming for large optimizations
  • Caching: Cache model information and execution metadata
  • Parallel Execution: Support for multiple concurrent executions

Security Features

  • Authentication: Secure authentication for all API calls
  • Sandboxing: Isolated execution environments
  • Resource Limits: Automatic resource limiting and monitoring
  • Access Control: Proper permission checking for models and executions

Next Steps