Playground Integration
The playground_integration.py
file implements the integration between the Qubots framework and the Rastion platform’s cloud playground. It provides cloud execution, real-time monitoring, and seamless integration with the web-based playground interface.
File Purpose
The playground integration provides:
- Cloud Execution: Run optimizations in the cloud with scalable resources
- Real-time Monitoring: Live progress tracking and result streaming
- Web Integration: Seamless integration with the playground web interface
- Resource Management: Automatic resource allocation and cleanup
- Result Synchronization: Sync results between local and cloud environments
Core Classes
PlaygroundExecutor
Main class for executing optimizations in the cloud:
class PlaygroundExecutor:
"""
Executor for running optimizations in the Rastion cloud playground.
Handles cloud execution, monitoring, and result retrieval.
"""
def __init__(self, client: Optional[RastionClient] = None):
"""
Initialize playground executor.
Args:
client: Rastion client for API access (uses global client if None)
"""
self.client = client or get_global_client()
self.active_executions: Dict[str, Dict[str, Any]] = {}
self.execution_callbacks: Dict[str, List[Callable]] = {}
PlaygroundResult
Container for playground execution results:
@dataclass
class PlaygroundResult:
"""Result from playground execution."""
# Execution information
execution_id: str # Unique execution identifier
status: str # "pending", "running", "completed", "failed"
# Optimization results
optimization_result: Optional[OptimizationResult] = None
# Execution metadata
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
total_runtime_seconds: Optional[float] = None
# Resource usage
cpu_time_seconds: Optional[float] = None
memory_usage_mb: Optional[float] = None
# Configuration used
problem_config: Dict[str, Any] = field(default_factory=dict)
optimizer_config: Dict[str, Any] = field(default_factory=dict)
# Progress information
progress_updates: List[Dict[str, Any]] = field(default_factory=list)
# Error information
error_message: Optional[str] = None
error_details: Optional[Dict[str, Any]] = None
ModelInfo
Information about models available in the playground:
@dataclass
class ModelInfo:
"""Information about a model in the playground."""
name: str # Model name
username: str # Owner username
model_type: str # "problem" or "optimizer"
description: str # Model description
tags: List[str] = field(default_factory=list)
# Version information
version: str = "latest"
available_versions: List[str] = field(default_factory=list)
# Compatibility
compatible_with: List[str] = field(default_factory=list)
# Usage statistics
usage_count: int = 0
average_rating: float = 0.0
# Metadata
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
Core Functions
execute_playground_optimization
High-level function for cloud optimization:
def execute_playground_optimization(
problem_name: str,
problem_username: str,
optimizer_name: str,
optimizer_username: str,
problem_params: Optional[Dict[str, Any]] = None,
optimizer_params: Optional[Dict[str, Any]] = None,
execution_config: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable] = None,
client: Optional[RastionClient] = None
) -> PlaygroundResult:
"""
Execute optimization in the cloud playground.
Args:
problem_name: Name of the problem model
problem_username: Username of problem owner
optimizer_name: Name of the optimizer model
optimizer_username: Username of optimizer owner
problem_params: Parameters for problem configuration
optimizer_params: Parameters for optimizer configuration
execution_config: Configuration for cloud execution
progress_callback: Callback for progress updates
client: Rastion client (uses global if None)
Returns:
PlaygroundResult with execution results
Raises:
PlaygroundError: If execution fails
AuthenticationError: If not authenticated
NotFoundError: If models not found
"""
Execution Flow:
- Validation: Validate model names and parameters
- Authentication: Ensure client is authenticated
- Submission: Submit execution request to playground
- Monitoring: Monitor execution progress
- Result Retrieval: Retrieve and parse results
- Cleanup: Clean up cloud resources
get_available_models
Retrieve available models in the playground:
def get_available_models(
model_type: Optional[str] = None,
username: Optional[str] = None,
tags: Optional[List[str]] = None,
client: Optional[RastionClient] = None
) -> List[ModelInfo]:
"""
Get list of available models in the playground.
Args:
model_type: Filter by "problem" or "optimizer"
username: Filter by username
tags: Filter by tags
client: Rastion client
Returns:
List of ModelInfo objects
"""
monitor_execution
Monitor ongoing execution:
def monitor_execution(
execution_id: str,
progress_callback: Optional[Callable] = None,
poll_interval: float = 2.0,
client: Optional[RastionClient] = None
) -> PlaygroundResult:
"""
Monitor playground execution until completion.
Args:
execution_id: ID of execution to monitor
progress_callback: Callback for progress updates
poll_interval: Polling interval in seconds
client: Rastion client
Returns:
Final PlaygroundResult
"""
PlaygroundExecutor Methods
execute_async
Asynchronous execution with monitoring:
def execute_async(
self,
problem_name: str,
problem_username: str,
optimizer_name: str,
optimizer_username: str,
problem_params: Optional[Dict[str, Any]] = None,
optimizer_params: Optional[Dict[str, Any]] = None,
execution_config: Optional[Dict[str, Any]] = None
) -> str:
"""
Start asynchronous execution in playground.
Args:
problem_name: Name of problem model
problem_username: Problem owner username
optimizer_name: Name of optimizer model
optimizer_username: Optimizer owner username
problem_params: Problem parameters
optimizer_params: Optimizer parameters
execution_config: Execution configuration
Returns:
Execution ID for monitoring
"""
get_execution_status
Check execution status:
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""
Get current status of execution.
Args:
execution_id: ID of execution
Returns:
Status information dictionary
"""
cancel_execution
Cancel running execution:
def cancel_execution(self, execution_id: str) -> bool:
"""
Cancel a running execution.
Args:
execution_id: ID of execution to cancel
Returns:
True if cancellation successful
"""
add_progress_callback
Add callback for progress updates:
def add_progress_callback(
self,
execution_id: str,
callback: Callable[[Dict[str, Any]], None]
) -> None:
"""
Add progress callback for execution.
Args:
execution_id: ID of execution
callback: Callback function for progress updates
"""
Usage Examples
Basic Cloud Execution
from qubots import execute_playground_optimization
import qubots.rastion as rastion
# Authenticate with platform
rastion.authenticate("your_api_token")
# Execute optimization in cloud with logging
def log_callback(level, message, source):
print(f"[{level}] {source}: {message}")
result = execute_playground_optimization(
problem_name="tsp_problem",
problem_username="community",
optimizer_name="genetic_tsp",
optimizer_username="research_group",
problem_params={
"n_cities": 50,
"city_distribution": "random"
},
optimizer_params={
"population_size": 100,
"generations": 500,
"mutation_rate": 0.1
},
log_callback=log_callback
)
print(f"Success: {result['success']}")
if result['success']:
print(f"Best value: {result['best_value']}")
print(f"Runtime: {result['execution_time']:.2f} seconds")
print(f"Iterations: {result.get('iterations', 'N/A')}")
# Access dashboard data
if 'dashboard' in result:
dashboard = result['dashboard']
print(f"Dashboard metrics: {dashboard.get('metrics', {})}")
else:
print(f"Error: {result.get('error_message', 'Unknown error')}")
Direct Execution with Real-time Monitoring
from qubots import PlaygroundExecutor
# Create executor with callbacks
def progress_callback(message, progress):
print(f"Progress: {message} ({progress:.1f}%)")
def log_callback(level, message, source):
print(f"[{level}] {source}: {message}")
executor = PlaygroundExecutor(
progress_callback=progress_callback,
log_callback=log_callback
)
# Execute with real-time monitoring
result = executor.execute_optimization(
problem_name="maxcut_problem",
problem_username="examples",
optimizer_name="ortools_maxcut",
optimizer_username="examples",
problem_params={"n_vertices": 30, "density": 0.4},
optimizer_params={"time_limit": 120.0}
)
# Process results
if result.success:
print(f"Optimization completed: {result.best_value}")
print(f"Solution: {result.best_solution}")
print(f"Metadata: {result.metadata}")
else:
print(f"Execution failed: {result.error_message}")
Directory-based Execution
from qubots import execute_playground_optimization
# Execute using local model directories
result = execute_playground_optimization(
problem_dir="./my_tsp_problem",
optimizer_dir="./my_genetic_optimizer",
problem_params={"n_cities": 50},
optimizer_params={"population_size": 100, "generations": 500}
)
print(f"Success: {result['success']}")
if result['success']:
print(f"Best value: {result['best_value']}")
print(f"Execution time: {result['execution_time']:.2f}s")
# Access dashboard data
if 'dashboard' in result:
dashboard = result['dashboard']
print(f"Dashboard plots: {len(dashboard.get('plots', []))}")
print(f"Dashboard metrics: {dashboard.get('metrics', {})}")
Batch Execution
from qubots import execute_playground_optimization
# Test multiple optimizers on same problem
optimizers = [
("genetic_tsp", "algorithms"),
("simulated_annealing_tsp", "algorithms"),
("ortools_tsp", "algorithms")
]
results = []
for optimizer_name, username in optimizers:
result = execute_playground_optimization(
problem_name="tsp_berlin52",
problem_username="benchmarks",
optimizer_name=optimizer_name,
optimizer_username=username
)
if result['success']:
results.append((optimizer_name, result['best_value']))
print(f"{optimizer_name}: {result['best_value']:.4f}")
else:
print(f"{optimizer_name}: Failed - {result.get('error_message', 'Unknown error')}")
# Find best result
if results:
best_optimizer, best_value = min(results, key=lambda x: x[1])
print(f"Best optimizer: {best_optimizer} (value: {best_value:.4f})")
Real-time Progress Tracking
from qubots.playground_integration import monitor_execution
import matplotlib.pyplot as plt
# Lists to store progress data
iterations = []
best_values = []
def real_time_callback(update):
if 'iteration' in update and 'best_value' in update:
iterations.append(update['iteration'])
best_values.append(update['best_value'])
# Update plot in real-time
plt.clf()
plt.plot(iterations, best_values)
plt.xlabel('Iteration')
plt.ylabel('Best Value')
plt.title('Optimization Progress')
plt.pause(0.1)
# Start execution and monitor with real-time plotting
execution_id = executor.execute_async(
problem_name="vrp_problem",
problem_username="logistics",
optimizer_name="genetic_vrp",
optimizer_username="evolutionary_algorithms"
)
plt.ion() # Turn on interactive mode
result = monitor_execution(execution_id, progress_callback=real_time_callback)
plt.ioff() # Turn off interactive mode
plt.show()
Integration Points
With Local Development
# Test locally first, then run in cloud
from qubots import AutoProblem, AutoOptimizer
# Local testing
problem = AutoProblem.from_repo("examples/tsp")
optimizer = AutoOptimizer.from_repo("examples/genetic_tsp")
local_result = optimizer.optimize(problem)
# Cloud execution with same configuration
cloud_result = execute_playground_optimization(
problem_name="tsp",
problem_username="examples",
optimizer_name="genetic_tsp",
optimizer_username="examples",
problem_params=problem.get_parameters(),
optimizer_params=optimizer.get_parameters()
)
# Compare results
print(f"Local: {local_result.best_value}")
if cloud_result['success']:
print(f"Cloud: {cloud_result['best_value']}")
else:
print(f"Cloud execution failed: {cloud_result.get('error_message', 'Unknown error')}")
With Benchmarking
# Use playground for large-scale benchmarking
from qubots import execute_playground_optimization
# Define problems and optimizers
problems = ["tsp_berlin52", "tsp_kroA100", "tsp_lin318"]
optimizers = ["genetic_tsp", "ortools_tsp", "simulated_annealing_tsp"]
# Run benchmarks in cloud
cloud_results = []
for problem in problems:
for optimizer in optimizers:
result = execute_playground_optimization(
problem_name=problem,
problem_username="benchmarks",
optimizer_name=optimizer,
optimizer_username="algorithms"
)
if result['success']:
cloud_results.append({
"problem": problem,
"optimizer": optimizer,
"best_value": result['best_value'],
"execution_time": result['execution_time']
})
# Analyze results
for result in cloud_results:
print(f"{result['problem']} + {result['optimizer']}: "
f"{result['best_value']:.4f} ({result['execution_time']:.2f}s)")
# Find best performers
best_by_problem = {}
for result in cloud_results:
problem = result['problem']
if problem not in best_by_problem or result['best_value'] < best_by_problem[problem]['best_value']:
best_by_problem[problem] = result
print("\nBest performers by problem:")
for problem, result in best_by_problem.items():
print(f"{problem}: {result['optimizer']} ({result['best_value']:.4f})")
With Leaderboard Submission
# Use playground results for leaderboard submission
from qubots import execute_playground_optimization, PlaygroundExecutor
# Create executor for leaderboard-eligible execution
executor = PlaygroundExecutor()
# Run optimization on standardized problem
result = executor.execute_optimization(
problem_name="standardized_tsp_1",
problem_username="standardized",
optimizer_name="my_advanced_tsp",
optimizer_username="my_username"
)
# Submit to leaderboard if successful and eligible
if result.success and result.leaderboard_eligible:
submission_result = executor.submit_to_leaderboard(
result=result,
solver_repository="my_username/my_advanced_tsp",
solver_config={"population_size": 100, "generations": 1000},
solver_version="v1.2.0"
)
if submission_result:
print(f"Successfully submitted to leaderboard: {submission_result.get('id', 'unknown')}")
else:
print("Leaderboard submission failed")
Error Handling
Result-based Error Handling
from qubots import execute_playground_optimization
result = execute_playground_optimization(
problem_name="large_tsp",
problem_username="benchmarks",
optimizer_name="memory_intensive_optimizer",
optimizer_username="research"
)
# Check for errors in result
if not result['success']:
error_message = result.get('error_message', 'Unknown error')
error_type = result.get('error_type', 'Unknown')
print(f"Execution failed: {error_message}")
print(f"Error type: {error_type}")
# Handle specific error types
if 'timeout' in error_message.lower():
print("Consider reducing problem size or increasing timeout")
elif 'memory' in error_message.lower():
print("Try a less memory-intensive optimizer")
elif 'authentication' in error_message.lower():
print("Check your API token and permissions")
elif 'not found' in error_message.lower():
print("Verify model names and usernames")
else:
print(f"Execution successful: {result['best_value']}")
Robust Execution with Retry
import time
from typing import Dict, Any
def execute_with_retry(max_retries: int = 3, **kwargs) -> Dict[str, Any]:
"""Execute with exponential backoff retry."""
for attempt in range(max_retries):
result = execute_playground_optimization(**kwargs)
if result['success']:
return result
error_message = result.get('error_message', '')
# Don't retry on certain errors
if any(term in error_message.lower() for term in ['authentication', 'not found', 'permission']):
print(f"Non-retryable error: {error_message}")
return result
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"All {max_retries} attempts failed")
return result
# Usage
result = execute_with_retry(
max_retries=3,
problem_name="tsp_berlin52",
problem_username="benchmarks",
optimizer_name="genetic_tsp",
optimizer_username="algorithms"
)
Developer Notes
Design Decisions
- Asynchronous by Default: All executions are asynchronous for scalability
- Real-time Monitoring: Live progress updates for better user experience
- Resource Management: Automatic resource allocation and cleanup
- Error Recovery: Comprehensive error handling and recovery mechanisms
- Connection Pooling: Efficient HTTP connection management
- Streaming: Real-time result streaming for large optimizations
- Caching: Cache model information and execution metadata
- Parallel Execution: Support for multiple concurrent executions
Security Features
- Authentication: Secure authentication for all API calls
- Sandboxing: Isolated execution environments
- Resource Limits: Automatic resource limiting and monitoring
- Access Control: Proper permission checking for models and executions
Next Steps