I am new to use this OptaPy solver. I'm trying to solve my workload balance problem by using it.
My assignment problem is: I have a list of users and a list of jobs. Each job requires only one user from a user list base on its business requirements. One user can work on multiple jobs. After the assignments, the user's workload should be as even as possible.
The calculation of workload for one user is: user's exists cost + job cost to be assigned divided by user's capacity.
Variance calculation is: σ² = ( Σ (x-μ)² ) / N. X is individual's workload, μ is an average workload among all users. N is the number of users.
My current implementation can assignments based on user's final lowest workload. I still need to get the average workload and then calculate the variance. Unfortunately, I don't get a way to achieve this base on the documentation and example. Anyone can help me on this? Many thanks in advance.
Attached my code here for your reference.
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration
@problem_fact
class Users:
def __init__(self, id, capacity, exists_cost):
self.id = id
self.capacity = capacity
self.exists_cost = exists_cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return (
f"User("
f"id={self.id}, "
f"capacity={self.capacity}, "
f"exists_cost={self.exists_cost})"
)
@planning_entity
class Jobs:
def __init__(self, id, job_id, role_id, user, cost, selected=None):
self.id = id
self.job_id = job_id
self.role_id = role_id
self.user = user
self.cost = cost
self.selected = selected
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ["selected_list"])
def get_selected(self):
return self.selected
def set_selected(self, selected):
self.selected = selected
def __str__(self):
return (
f"Job("
f"id={self.id}, "
f"job_id={self.job_id}, "
f"role_id={self.role_id}, "
f"user={self.user}, "
f"selected={self.selected}, "
f"cost={self.cost}"
f")"
)
def format_list(a_list):
return ',\n'.join(map(str, a_list))
@planning_solution
class Assignments:
def __init__(self, selected_list, job_list, score=None):
self.selected_list = selected_list
self.job_list = job_list
self.score = score
@problem_fact_collection_property(int)
@value_range_provider("selected_list")
def get_selected_list(self):
return self.selected_list
@planning_entity_collection_property(Jobs)
def get_job_list(self):
return self.job_list
@planning_score(HardSoftScore)
def get_score(self):
return self.score
def set_score(self, score):
self.score = score
def __str__(self):
return (
f"Assignments("
f"selected_list={format_list(self.selected_list)},\n"
f"job_list={format_list(self.job_list)},\n"
f"score={str(self.score.toString()) if self.score is not None else 'None'}"
f")"
)
@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
return [job_conflict(constraint_factory),
job_conflict1(constraint_factory),
user_conflict(constraint_factory),
]
def job_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected > 1) \
.penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)
def job_conflict1(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected == 0) \
.penalize("must_one_user", HardSoftScore.ONE_HARD)
def calculation(user, cost):
return (user.exists_cost + cost) * 10000 / user.capacity
def user_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.user,
ConstraintCollectors.conditionally(lambda job: job.selected == 1,
ConstraintCollectors.sum(lambda job: job.cost))) \
.penalize("Minimize Total Cost", HardSoftScore.ONE_SOFT,
lambda user, cost: int(calculation(user, cost)))
def generate_problem():
user_alice = Users('Alice', 100, 45)
user_bob = Users('Bob', 90, 54)
user_chris = Users('Chris', 80, 56)
user_dave = Users('Dave', 80, 52)
selected_list = [0, 1]
job_list = [
Jobs(1, 'Job2', 'Leader', user_bob, 10),
Jobs(2, 'Job2', 'Leader', user_chris, 10),
Jobs(3, 'Job3', 'Leader', user_alice, 5),
Jobs(4, 'Job3', 'Leader', user_bob, 5),
Jobs(5, 'Job6', 'Leader', user_alice, 5),
Jobs(6, 'Job6', 'Leader', user_bob, 5),
Jobs(7, 'Job6', 'Leader', user_dave, 5),
Jobs(8, 'Job7', 'Leader', user_bob, 10),
Jobs(9, 'Job7', 'Leader', user_dave, 10),
]
job = job_list[0]
job.set_selected(selected_list[0])
return Assignments(selected_list, job_list)
solver_config = optapy.config.solver.SolverConfig() \
.withEntityClasses(get_class(Jobs)) \
.withSolutionClass(get_class(Assignments)) \
.withConstraintProviderClass(get_class(constraints)) \
.withTerminationSpentLimit(Duration.ofSeconds(30))
solution = solver_factory_create(solver_config) \
.buildSolver() \
.solve(generate_problem())
print(solution)
Update #2nd
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors, Joiners
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration
@problem_fact
class Users:
def __init__(self, id, capacity, exists_cost):
self.id = id
self.capacity = capacity
self.exists_cost = exists_cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return (
f"User("
f"id={self.id}, "
f"capacity={self.capacity}, "
f"exists_cost={self.exists_cost})"
)
@planning_entity
class Jobs:
def __init__(self, id, job_id, role_id, user, cost, selected=None):
self.id = id
self.job_id = job_id
self.role_id = role_id
self.user = user
self.cost = cost
self.selected = selected
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ["selected_list"])
def get_selected(self):
return self.selected
def set_selected(self, selected):
self.selected = selected
def __str__(self):
return (
f"Job("
f"id={self.id}, "
f"job_id={self.job_id}, "
f"role_id={self.role_id}, "
f"user={self.user}, "
f"selected={self.selected}, "
f"cost={self.cost}"
f")"
)
def format_list(a_list):
return ',\n'.join(map(str, a_list))
@planning_solution
class Assignments:
def __init__(self, selected_list, job_list, score=None):
self.selected_list = selected_list
self.job_list = job_list
self.score = score
@problem_fact_collection_property(int)
@value_range_provider("selected_list")
def get_selected_list(self):
return self.selected_list
@planning_entity_collection_property(Jobs)
def get_job_list(self):
return self.job_list
@planning_score(HardSoftScore)
def get_score(self):
return self.score
def set_score(self, score):
self.score = score
def __str__(self):
return (
f"Assignments("
f"selected_list={format_list(self.selected_list)},\n"
f"job_list={format_list(self.job_list)},\n"
f"score={str(self.score.toString()) if self.score is not None else 'None'}"
f")"
)
@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
return [job_conflict(constraint_factory),
job_conflict1(constraint_factory),
get_total_job_cost_per_user(constraint_factory),
get_user_workload(constraint_factory),
]
def job_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected > 1) \
.penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)
def job_conflict1(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected == 0) \
.penalize("must_one_user", HardSoftScore.ONE_HARD)
def calculation(user, cost):
return (user.exists_cost + cost) * 10000 / user.capacity
def get_total_job_cost_per_user(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(Jobs) \
.groupBy(lambda job: job.user,
ConstraintCollectors.conditionally(lambda job: job.selected == 1,
ConstraintCollectors.sum(lambda job: job.cost)))
def get_user_workload(constraint_factory: ConstraintFactory):
return get_total_job_cost_per_user(constraint_factory) \
.groupBy(lambda user, cost: int(calculation(user, cost))) \
.penalize('Minimize', HardSoftScore.ONE_SOFT, lambda user, cost, workload: workload)
def generate_problem():
user_alice = Users('Alice', 100, 45)
user_bob = Users('Bob', 90, 54)
user_chris = Users('Chris', 80, 56)
user_dave = Users('Dave', 80, 52)
selected_list = [0, 1]
job_list = [
Jobs(1, 'Job2', 'Leader', user_bob, 10),
Jobs(2, 'Job2', 'Leader', user_chris, 10),
Jobs(3, 'Job3', 'Leader', user_alice, 5),
Jobs(4, 'Job3', 'Leader', user_bob, 5),
Jobs(5, 'Job6', 'Leader', user_alice, 5),
Jobs(6, 'Job6', 'Leader', user_bob, 5),
Jobs(7, 'Job6', 'Leader', user_dave, 5),
Jobs(8, 'Job7', 'Leader', user_bob, 10),
Jobs(9, 'Job7', 'Leader', user_dave, 10),
]
job = job_list[0]
job.set_selected(selected_list[0])
return Assignments(selected_list, job_list)
solver_config = optapy.config.solver.SolverConfig() \
.withEntityClasses(get_class(Jobs)) \
.withSolutionClass(get_class(Assignments)) \
.withConstraintProviderClass(get_class(constraints)) \
.withTerminationSpentLimit(Duration.ofSeconds(30))
solution = solver_factory_create(solver_config) \
.buildSolver() \
.solve(generate_problem())
print(solution)