Creating VRP Qubots

This example shows how to create qubots for the Vehicle Routing Problem (VRP) - finding optimal routes for multiple vehicles to serve customers while respecting capacity constraints. You’ll learn to build both problem and optimizer qubots, connect them, run locally, and upload to Rastion.

Problem Overview

VRP optimizes routes for a fleet of vehicles to serve customers with known demands while minimizing total travel distance and respecting vehicle capacity constraints.

Applications: Delivery logistics, waste collection, school bus routing, field service scheduling.

Creating the VRP Problem Qubot

Step 1: Problem Implementation

Create qubot.py for the VRP problem:

# qubot.py
import numpy as np
import random
import math
from qubots import BaseProblem, ProblemMetadata, ProblemType, ObjectiveType

class VRPProblem(BaseProblem):
    def __init__(self, n_customers=20, n_vehicles=3, vehicle_capacity=100, seed=42):
        self.n_customers = n_customers
        self.n_vehicles = n_vehicles
        self.vehicle_capacity = vehicle_capacity

        # Generate random problem instance
        np.random.seed(seed)
        random.seed(seed)
        self.depot = (50, 50)  # Depot at center
        self.customers = self._generate_customers()
        self.distance_matrix = self._calculate_distances()

        # Set metadata
        metadata = ProblemMetadata(
            name="VRP Problem",
            description=f"Vehicle routing with {n_customers} customers and {n_vehicles} vehicles",
            problem_type=ProblemType.COMBINATORIAL,
            objective_type=ObjectiveType.MINIMIZE,
            dimension=n_customers * n_vehicles,
            variable_bounds=[(0, n_customers)] * (n_customers * n_vehicles)
        )
        super().__init__(metadata)

    def _generate_customers(self):
        """Generate random customer locations and demands."""
        customers = []
        for i in range(self.n_customers):
            x = random.uniform(10, 90)
            y = random.uniform(10, 90)
            demand = random.randint(5, 25)
            customers.append({
                'id': i + 1,
                'x': x,
                'y': y,
                'demand': demand
            })
        return customers

    def _calculate_distances(self):
        """Calculate distance matrix including depot."""
        n = self.n_customers + 1  # +1 for depot
        distances = np.zeros((n, n))

        # Depot is at index 0
        depot_x, depot_y = self.depot

        for i in range(1, n):  # Customer indices start at 1
            customer = self.customers[i - 1]
            # Distance from depot to customer
            distances[0][i] = math.sqrt((depot_x - customer['x'])**2 + (depot_y - customer['y'])**2)
            distances[i][0] = distances[0][i]

            # Distance between customers
            for j in range(i + 1, n):
                other_customer = self.customers[j - 1]
                dist = math.sqrt((customer['x'] - other_customer['x'])**2 +
                               (customer['y'] - other_customer['y'])**2)
                distances[i][j] = dist
                distances[j][i] = dist

        return distances

    def evaluate_solution(self, solution):
        """Calculate total distance for VRP solution."""
        if not self.is_valid_solution(solution):
            return float('inf')

        total_distance = 0.0

        for route in solution:
            if not route:  # Skip empty routes
                continue

            route_distance = 0.0
            # Distance from depot to first customer
            route_distance += self.distance_matrix[0][route[0]]

            # Distance between consecutive customers
            for i in range(len(route) - 1):
                route_distance += self.distance_matrix[route[i]][route[i + 1]]

            # Distance from last customer back to depot
            route_distance += self.distance_matrix[route[-1]][0]

            total_distance += route_distance

        return total_distance

    def is_valid_solution(self, solution):
        """Check if VRP solution is valid."""
        if len(solution) != self.n_vehicles:
            return False

        # Check all customers are served exactly once
        served_customers = set()
        for route in solution:
            for customer in route:
                if customer < 1 or customer > self.n_customers:
                    return False
                if customer in served_customers:
                    return False
                served_customers.add(customer)

        if len(served_customers) != self.n_customers:
            return False

        # Check capacity constraints
        for route in solution:
            route_demand = sum(self.customers[customer - 1]['demand'] for customer in route)
            if route_demand > self.vehicle_capacity:
                return False

        return True

    def get_random_solution(self):
        """Generate random valid VRP solution."""
        customers = list(range(1, self.n_customers + 1))
        random.shuffle(customers)

        solution = [[] for _ in range(self.n_vehicles)]

        for customer in customers:
            # Find vehicle with capacity for this customer
            customer_demand = self.customers[customer - 1]['demand']

            for vehicle in range(self.n_vehicles):
                current_demand = sum(self.customers[c - 1]['demand'] for c in solution[vehicle])
                if current_demand + customer_demand <= self.vehicle_capacity:
                    solution[vehicle].append(customer)
                    break

        return solution

Step 2: Problem Configuration

Create config.json for the VRP problem:

{
  "type": "problem",
  "class_name": "VRPProblem",
  "default_params": {
    "n_customers": 20,
    "n_vehicles": 3,
    "vehicle_capacity": 100,
    "seed": 42
  },
  "parameters": {
    "n_customers": {
      "type": "integer",
      "default": 20,
      "min": 5,
      "max": 200,
      "description": "Number of customers to serve"
    },
    "n_vehicles": {
      "type": "integer",
      "default": 3,
      "min": 1,
      "max": 20,
      "description": "Number of available vehicles"
    },
    "vehicle_capacity": {
      "type": "integer",
      "default": 100,
      "min": 50,
      "max": 500,
      "description": "Maximum capacity per vehicle"
    },
    "seed": {
      "type": "integer",
      "default": 42,
      "description": "Random seed for problem generation"
    }
  },
  "metadata": {
    "name": "VRP Problem",
    "description": "Vehicle routing problem with capacity constraints",
    "domain": "logistics",
    "tags": ["vrp", "routing", "logistics", "combinatorial"]
  }
}

Creating the VRP Optimizer Qubot

Step 3: Optimizer Implementation

Create qubot.py for a genetic algorithm optimizer:

# qubot.py
import random
import numpy as np
from qubots import BaseOptimizer, OptimizationResult, OptimizerMetadata, OptimizerType

class GeneticVRPOptimizer(BaseOptimizer):
    def __init__(self, population_size=50, generations=100, mutation_rate=0.1, crossover_rate=0.8):
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate

        # Set metadata
        metadata = OptimizerMetadata(
            name="Genetic VRP Optimizer",
            description="Genetic algorithm for vehicle routing problems",
            optimizer_type=OptimizerType.METAHEURISTIC,
            required_parameters=["population_size", "generations"],
            optional_parameters=["mutation_rate", "crossover_rate"],
            parameter_ranges={
                "population_size": (10, 200),
                "generations": (10, 1000),
                "mutation_rate": (0.01, 0.5),
                "crossover_rate": (0.5, 1.0)
            }
        )
        super().__init__(metadata)

    def optimize(self, problem, progress_callback=None, log_callback=None):
        """Solve VRP using genetic algorithm."""
        # Initialize population
        population = [problem.get_random_solution() for _ in range(self.population_size)]
        best_solution = None
        best_value = float('inf')

        for generation in range(self.generations):
            # Evaluate population
            fitness_scores = []
            for individual in population:
                fitness = problem.evaluate_solution(individual)
                fitness_scores.append(fitness)

                if fitness < best_value:
                    best_value = fitness
                    best_solution = [route.copy() for route in individual]

            # Log progress
            if log_callback and generation % 20 == 0:
                log_callback('info', f'Generation {generation}: Best = {best_value:.2f}', 'optimizer')

            # Create new population
            new_population = []

            # Elitism: keep best solutions
            elite_size = max(1, self.population_size // 10)
            elite_indices = sorted(range(len(fitness_scores)), key=lambda i: fitness_scores[i])[:elite_size]
            for idx in elite_indices:
                new_population.append([route.copy() for route in population[idx]])

            # Generate offspring
            while len(new_population) < self.population_size:
                # Selection
                parent1 = self._tournament_selection(population, fitness_scores)
                parent2 = self._tournament_selection(population, fitness_scores)

                # Crossover
                if random.random() < self.crossover_rate:
                    child = self._crossover(parent1, parent2, problem)
                else:
                    child = [route.copy() for route in parent1]

                # Mutation
                if random.random() < self.mutation_rate:
                    child = self._mutate(child, problem)

                new_population.append(child)

            population = new_population

        return OptimizationResult(
            best_solution=best_solution,
            best_value=best_value,
            is_feasible=True,
            iterations=self.generations,
            termination_reason="max_generations_reached"
        )

    def _tournament_selection(self, population, fitness_scores, tournament_size=3):
        """Select individual using tournament selection."""
        tournament_indices = random.sample(range(len(population)), min(tournament_size, len(population)))
        best_idx = min(tournament_indices, key=lambda i: fitness_scores[i])
        return [route.copy() for route in population[best_idx]]

    def _crossover(self, parent1, parent2, problem):
        """Order crossover adapted for VRP."""
        # Collect all customers from both parents
        all_customers1 = [customer for route in parent1 for customer in route]
        all_customers2 = [customer for route in parent2 for customer in route]

        # Use order crossover on the customer sequence
        if len(all_customers1) != len(all_customers2):
            return [route.copy() for route in parent1]

        n = len(all_customers1)
        if n == 0:
            return [[] for _ in range(problem.n_vehicles)]

        start, end = sorted(random.sample(range(n), 2))

        child_sequence = [-1] * n
        child_sequence[start:end] = all_customers1[start:end]

        pointer = end
        for customer in all_customers2[end:] + all_customers2[:end]:
            if customer not in child_sequence:
                child_sequence[pointer % n] = customer
                pointer += 1

        # Convert back to routes respecting capacity constraints
        return self._sequence_to_routes(child_sequence, problem)

    def _mutate(self, individual, problem):
        """Swap mutation within and between routes."""
        mutated = [route.copy() for route in individual]

        # Collect all customers
        all_customers = [customer for route in mutated for customer in route]
        if len(all_customers) < 2:
            return mutated

        # Swap two random customers
        i, j = random.sample(range(len(all_customers)), 2)
        all_customers[i], all_customers[j] = all_customers[j], all_customers[i]

        # Convert back to routes
        return self._sequence_to_routes(all_customers, problem)

    def _sequence_to_routes(self, customer_sequence, problem):
        """Convert customer sequence to valid routes."""
        routes = [[] for _ in range(problem.n_vehicles)]

        for customer in customer_sequence:
            # Find vehicle with capacity for this customer
            customer_demand = problem.customers[customer - 1]['demand']

            for vehicle in range(problem.n_vehicles):
                current_demand = sum(problem.customers[c - 1]['demand'] for c in routes[vehicle])
                if current_demand + customer_demand <= problem.vehicle_capacity:
                    routes[vehicle].append(customer)
                    break

        return routes

Step 4: Optimizer Configuration

Create config.json for the genetic algorithm:

{
  "type": "optimizer",
  "class_name": "GeneticVRPOptimizer",
  "default_params": {
    "population_size": 50,
    "generations": 100,
    "mutation_rate": 0.1,
    "crossover_rate": 0.8
  },
  "parameters": {
    "population_size": {
      "type": "integer",
      "default": 50,
      "min": 10,
      "max": 200,
      "description": "Number of solutions in population"
    },
    "generations": {
      "type": "integer",
      "default": 100,
      "min": 10,
      "max": 1000,
      "description": "Number of generations to evolve"
    },
    "mutation_rate": {
      "type": "number",
      "default": 0.1,
      "min": 0.01,
      "max": 0.5,
      "description": "Probability of mutation"
    },
    "crossover_rate": {
      "type": "number",
      "default": 0.8,
      "min": 0.5,
      "max": 1.0,
      "description": "Probability of crossover"
    }
  },
  "metadata": {
    "name": "Genetic VRP Optimizer",
    "description": "Genetic algorithm for vehicle routing optimization",
    "optimizer_type": "metaheuristic",
    "tags": ["genetic", "evolutionary", "vrp", "logistics"]
  }
}

Connecting Problem and Optimizer

Step 5: Local Testing

Test your qubots locally before uploading:

# test_local.py
from qubot import VRPProblem, GeneticVRPOptimizer

# Create problem instance
problem = VRPProblem(n_customers=15, n_vehicles=3, vehicle_capacity=80, seed=42)
print(f"Created VRP with {problem.n_customers} customers and {problem.n_vehicles} vehicles")

# Create optimizer instance
optimizer = GeneticVRPOptimizer(
    population_size=30,
    generations=50,
    mutation_rate=0.15,
    crossover_rate=0.9
)
print(f"Created genetic VRP optimizer")

# Solve the problem
print("Solving VRP...")
result = optimizer.optimize(problem)

print(f"Total distance: {result.best_value:.2f}")
print(f"Valid solution: {problem.is_valid_solution(result.best_solution)}")
print(f"Generations: {result.iterations}")

# Analyze solution
for i, route in enumerate(result.best_solution):
    if route:  # Skip empty routes
        route_demand = sum(problem.customers[customer - 1]['demand'] for customer in route)
        print(f"Vehicle {i+1}: {route} (demand: {route_demand}/{problem.vehicle_capacity})")

Uploading to Rastion

Step 6: Upload Problem Qubot

Upload your VRP problem to Rastion:

# upload_problem.py
import qubots.rastion as rastion

# Authenticate with Rastion
rastion.authenticate("your_api_token")

# Upload the VRP problem
problem_result = rastion.upload_qubots_model(
    model_path="./vrp_problem",  # Directory containing qubot.py and config.json
    model_name="vrp_problem",
    description="VRP problem with configurable customers and vehicles",
    tags=["vrp", "routing", "logistics", "combinatorial"],
    public=True
)

print(f"Problem uploaded: {problem_result['repository_url']}")

Step 7: Upload Optimizer Qubot

Upload your genetic algorithm optimizer:

# upload_optimizer.py
import qubots.rastion as rastion

# Upload the genetic VRP optimizer
optimizer_result = rastion.upload_qubots_model(
    model_path="./genetic_vrp_optimizer",  # Directory with qubot.py and config.json
    model_name="genetic_vrp_optimizer",
    description="Genetic algorithm for VRP with specialized crossover",
    tags=["genetic", "evolutionary", "vrp", "metaheuristic"],
    public=True
)

print(f"Optimizer uploaded: {optimizer_result['repository_url']}")

Using AutoLoading Functions

Step 8: Load and Use Your Qubots

Once uploaded, use AutoProblem and AutoOptimizer to load your qubots:

from qubots import AutoProblem, AutoOptimizer

# Load your uploaded problem
problem = AutoProblem.from_repo("your_username/vrp_problem", override_params={
    "n_customers": 25,
    "n_vehicles": 4,
    "vehicle_capacity": 120,
    "seed": 123
})

# Load your uploaded optimizer
optimizer = AutoOptimizer.from_repo("your_username/genetic_vrp_optimizer", override_params={
    "population_size": 80,
    "generations": 150,
    "mutation_rate": 0.12,
    "crossover_rate": 0.85
})

# Solve the problem
result = optimizer.optimize(problem)

print(f"Solution found!")
print(f"Total distance: {result.best_value:.2f}")
print(f"Runtime: {result.runtime_seconds:.3f}s")
print(f"Valid solution: {problem.is_valid_solution(result.best_solution)}")

# Analyze routes
total_demand = 0
for i, route in enumerate(result.best_solution):
    if route:
        route_demand = sum(problem.customers[customer - 1]['demand'] for customer in route)
        total_demand += route_demand
        print(f"Vehicle {i+1}: {len(route)} customers, demand {route_demand}/{problem.vehicle_capacity}")

print(f"Total demand served: {total_demand}")

Step 9: Load Others’ Qubots

Use qubots created by the community:

# Load community VRP problems
small_vrp = AutoProblem.from_repo("benchmarks/vrp_small_20")
medium_vrp = AutoProblem.from_repo("benchmarks/vrp_medium_50")

# Load community optimizers
sa_optimizer = AutoOptimizer.from_repo("algorithms/simulated_annealing_vrp")
tabu_optimizer = AutoOptimizer.from_repo("algorithms/tabu_search_vrp")

# Compare your optimizer against others
problems = [small_vrp, medium_vrp]
optimizers = [optimizer, sa_optimizer, tabu_optimizer]

for prob in problems:
    print(f"\nSolving {prob.metadata.name}:")
    for opt in optimizers:
        result = opt.optimize(prob)
        n_routes = len([route for route in result.best_solution if route])
        print(f"  {opt.metadata.name}: {result.best_value:.2f} ({n_routes} routes)")

Summary

You’ve learned how to create VRP qubots from scratch:

  1. Problem Qubot: Implemented VRPProblem class with customer generation and route evaluation
  2. Optimizer Qubot: Created GeneticVRPOptimizer with specialized VRP crossover and mutation
  3. Configuration: Set up config.json files for both qubots
  4. Local Testing: Connected problem and optimizer to verify functionality
  5. Upload to Rastion: Made qubots available to the community
  6. AutoLoading: Used AutoProblem and AutoOptimizer to load and run qubots

Your qubots are now ready for benchmarking, cloud execution, and sharing with the community!