0

I am new to use this OptaPy solver. I'm trying to solve my workload balance problem by using it.

My assignment problem is: I have a list of users and a list of jobs. Each job requires only one user from a user list base on its business requirements. One user can work on multiple jobs. After the assignments, the user's workload should be as even as possible.

The calculation of workload for one user is: user's exists cost + job cost to be assigned divided by user's capacity.

Variance calculation is: σ² = ( Σ (x-μ)² ) / N. X is individual's workload, μ is an average workload among all users. N is the number of users.

My current implementation can assignments based on user's final lowest workload. I still need to get the average workload and then calculate the variance. Unfortunately, I don't get a way to achieve this base on the documentation and example. Anyone can help me on this? Many thanks in advance.

Attached my code here for your reference.

import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
    planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
    get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration


@problem_fact
class Users:
    def __init__(self, id, capacity, exists_cost):
        self.id = id
        self.capacity = capacity
        self.exists_cost = exists_cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return (
            f"User("
            f"id={self.id}, "
            f"capacity={self.capacity}, "
            f"exists_cost={self.exists_cost})"
        )


@planning_entity
class Jobs:
    def __init__(self, id, job_id, role_id, user, cost, selected=None):
        self.id = id
        self.job_id = job_id
        self.role_id = role_id
        self.user = user
        self.cost = cost
        self.selected = selected

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(int, ["selected_list"])
    def get_selected(self):
        return self.selected

    def set_selected(self, selected):
        self.selected = selected

    def __str__(self):
        return (
            f"Job("
            f"id={self.id}, "
            f"job_id={self.job_id}, "
            f"role_id={self.role_id}, "
            f"user={self.user}, "
            f"selected={self.selected}, "
            f"cost={self.cost}"
            f")"
        )


def format_list(a_list):
    return ',\n'.join(map(str, a_list))


@planning_solution
class Assignments:
    def __init__(self, selected_list, job_list, score=None):
        self.selected_list = selected_list
        self.job_list = job_list
        self.score = score

    @problem_fact_collection_property(int)
    @value_range_provider("selected_list")
    def get_selected_list(self):
        return self.selected_list

    @planning_entity_collection_property(Jobs)
    def get_job_list(self):
        return self.job_list

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    def set_score(self, score):
        self.score = score

    def __str__(self):
        return (
            f"Assignments("
            f"selected_list={format_list(self.selected_list)},\n"
            f"job_list={format_list(self.job_list)},\n"
            f"score={str(self.score.toString()) if self.score is not None else 'None'}"
            f")"
        )


@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
    return [job_conflict(constraint_factory),
            job_conflict1(constraint_factory),
            user_conflict(constraint_factory),
            ]


def job_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected > 1) \
        .penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)


def job_conflict1(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected == 0) \
        .penalize("must_one_user", HardSoftScore.ONE_HARD)


def calculation(user, cost):
    return (user.exists_cost + cost) * 10000 / user.capacity


def user_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.user,
                 ConstraintCollectors.conditionally(lambda job: job.selected == 1,
                                                    ConstraintCollectors.sum(lambda job: job.cost))) \
        .penalize("Minimize Total Cost", HardSoftScore.ONE_SOFT,
                  lambda user, cost: int(calculation(user, cost)))


def generate_problem():
    user_alice = Users('Alice', 100, 45)
    user_bob = Users('Bob', 90, 54)
    user_chris = Users('Chris', 80, 56)
    user_dave = Users('Dave', 80, 52)

    selected_list = [0, 1]
    job_list = [
        Jobs(1, 'Job2', 'Leader', user_bob, 10),
        Jobs(2, 'Job2', 'Leader', user_chris, 10),
        Jobs(3, 'Job3', 'Leader', user_alice, 5),
        Jobs(4, 'Job3', 'Leader', user_bob, 5),
        Jobs(5, 'Job6', 'Leader', user_alice, 5),
        Jobs(6, 'Job6', 'Leader', user_bob, 5),
        Jobs(7, 'Job6', 'Leader', user_dave, 5),
        Jobs(8, 'Job7', 'Leader', user_bob, 10),
        Jobs(9, 'Job7', 'Leader', user_dave, 10),
    ]
    job = job_list[0]
    job.set_selected(selected_list[0])

    return Assignments(selected_list, job_list)


solver_config = optapy.config.solver.SolverConfig() \
    .withEntityClasses(get_class(Jobs)) \
    .withSolutionClass(get_class(Assignments)) \
    .withConstraintProviderClass(get_class(constraints)) \
    .withTerminationSpentLimit(Duration.ofSeconds(30))

solution = solver_factory_create(solver_config) \
    .buildSolver() \
    .solve(generate_problem())

print(solution)

Update #2nd

import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
    planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
    get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors, Joiners
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration


@problem_fact
class Users:
    def __init__(self, id, capacity, exists_cost):
        self.id = id
        self.capacity = capacity
        self.exists_cost = exists_cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return (
            f"User("
            f"id={self.id}, "
            f"capacity={self.capacity}, "
            f"exists_cost={self.exists_cost})"
        )


@planning_entity
class Jobs:
    def __init__(self, id, job_id, role_id, user, cost, selected=None):
        self.id = id
        self.job_id = job_id
        self.role_id = role_id
        self.user = user
        self.cost = cost
        self.selected = selected

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(int, ["selected_list"])
    def get_selected(self):
        return self.selected

    def set_selected(self, selected):
        self.selected = selected

    def __str__(self):
        return (
            f"Job("
            f"id={self.id}, "
            f"job_id={self.job_id}, "
            f"role_id={self.role_id}, "
            f"user={self.user}, "
            f"selected={self.selected}, "
            f"cost={self.cost}"
            f")"
        )


def format_list(a_list):
    return ',\n'.join(map(str, a_list))


@planning_solution
class Assignments:
    def __init__(self, selected_list, job_list, score=None):
        self.selected_list = selected_list
        self.job_list = job_list
        self.score = score

    @problem_fact_collection_property(int)
    @value_range_provider("selected_list")
    def get_selected_list(self):
        return self.selected_list

    @planning_entity_collection_property(Jobs)
    def get_job_list(self):
        return self.job_list

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    def set_score(self, score):
        self.score = score

    def __str__(self):
        return (
            f"Assignments("
            f"selected_list={format_list(self.selected_list)},\n"
            f"job_list={format_list(self.job_list)},\n"
            f"score={str(self.score.toString()) if self.score is not None else 'None'}"
            f")"
        )


@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
    return [job_conflict(constraint_factory),
            job_conflict1(constraint_factory),
            get_total_job_cost_per_user(constraint_factory),
            get_user_workload(constraint_factory),
            ]


def job_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected > 1) \
        .penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)


def job_conflict1(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected == 0) \
        .penalize("must_one_user", HardSoftScore.ONE_HARD)


def calculation(user, cost):
    return (user.exists_cost + cost) * 10000 / user.capacity                                            


def get_total_job_cost_per_user(constraint_factory: ConstraintFactory):
    return constraint_factory.for_each(Jobs) \
        .groupBy(lambda job: job.user,
                 ConstraintCollectors.conditionally(lambda job: job.selected == 1,
                                                    ConstraintCollectors.sum(lambda job: job.cost)))


def get_user_workload(constraint_factory: ConstraintFactory):
    return get_total_job_cost_per_user(constraint_factory) \
        .groupBy(lambda user, cost: int(calculation(user, cost))) \
        .penalize('Minimize', HardSoftScore.ONE_SOFT, lambda user, cost, workload: workload)


def generate_problem():
    user_alice = Users('Alice', 100, 45)
    user_bob = Users('Bob', 90, 54)
    user_chris = Users('Chris', 80, 56)
    user_dave = Users('Dave', 80, 52)

    selected_list = [0, 1]
    job_list = [
        Jobs(1, 'Job2', 'Leader', user_bob, 10),
        Jobs(2, 'Job2', 'Leader', user_chris, 10),
        Jobs(3, 'Job3', 'Leader', user_alice, 5),
        Jobs(4, 'Job3', 'Leader', user_bob, 5),
        Jobs(5, 'Job6', 'Leader', user_alice, 5),
        Jobs(6, 'Job6', 'Leader', user_bob, 5),
        Jobs(7, 'Job6', 'Leader', user_dave, 5),
        Jobs(8, 'Job7', 'Leader', user_bob, 10),
        Jobs(9, 'Job7', 'Leader', user_dave, 10),
    ]
    job = job_list[0]
    job.set_selected(selected_list[0])

    return Assignments(selected_list, job_list)


solver_config = optapy.config.solver.SolverConfig() \
    .withEntityClasses(get_class(Jobs)) \
    .withSolutionClass(get_class(Assignments)) \
    .withConstraintProviderClass(get_class(constraints)) \
    .withTerminationSpentLimit(Duration.ofSeconds(30))

solution = solver_factory_create(solver_config) \
    .buildSolver() \
    .solve(generate_problem())

print(solution)
Meccanico
  • 1
  • 1

1 Answers1

0

Variance can be calculated as a composition of ConstraintCollectors (below assume a class DataPoint and you want to minimize variance of 'value'):

def minimize_variance(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(DataPoint)
            .group_by(ConstraintCollectors.average(lambda point: point.value))
            .join(DataPoint)
            .group_by(ConstraintCollectors.compose(
                ConstraintCollectors.sum(lambda avg, point: (point.value - avg)**2),
                ConstraintCollectors.count_bi(),
                lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
            ))
            .penalize('Minimize variance', SimpleScore.ONE,
                      lambda variance: variance)
    )

The int(((diff_sum / max(1, count)) * 100)) is to convert the variance sum to an int (keeping 2 decimals of precision). You can increase or decrease the constant as needed to get more/less precision. You need to penalize by an int in OptaPlanner/OptaPy.

Adapting it to your particular problem, you need to first compute the load for a particular user from Users/Jobs pairs:

def get_users_loads(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Users)
            .join(constraint_factory.for_each(Jobs).filter(lambda job: job.selected == 1), 
                  Joiners.equal(lambda user: user, lambda job: job.user))
            .group_by(lambda user, job: user, ConstraintCollectors.count_bi())
    )

From which you can calculate average load

def get_average_load(constraint_factory: ConstraintFactory):
    return (
        get_users_loads(constraint_factory)
            .group_by(ConstraintCollectors.average(lambda user, load: load))
    )

And combining it all together:

def minimize_user_load_variance(constraint_factory: ConstraintFactory):
    (
        get_user_loads(constraint_factory)
            .join(get_average_load(constraint_factory))
            .group_by(ConstraintCollectors.compose(
                      ConstraintCollectors.sum(lambda user, load, avg: (load - avg)**2),
                      ConstraintCollectors.count_tri(),
                      lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
            ))
            .penalize('Minimize variance', HardSoftScore.ONE_SOFT,
                      lambda variance: variance)
    )

This blog post on fairness in OptaPlanner might be of interest: https://www.optaplanner.org/blog/2017/02/03/FormulaForMeasuringUnfairness.html

Christopher Chianelli
  • 1,163
  • 1
  • 8
  • 8
  • Thank for you quick reply Chris. What I got from your code are: 1. constraints can be stacked. 2. constraint is referenced by another constraint has no penalize. 3 the last constraint in a chain must have penalize. 4. you can rename the calculated column when you use it in another place. I tried to rewrite my code by following these rules, but I still got error. Could you please have a look at my 2nd version code. thanks. – Meccanico Oct 11 '22 at 07:38
  • I updated my 2nd version code in the original post. Please let me know if any official documentation is introducing these kinds of usage. thank you. – Meccanico Oct 11 '22 at 07:53
  • In `get_user_workload`, the lambda in `penalize` is incorrect; you have a UniConstraintStream, so the lambda must take 1 parameter (when you perform a group by, the cardinality of the stream is set to the number of parameters the group by has; 1 parameter = UniStream, 2 parameters = BiStream, etc). Additional, the `groupBy` probably will not do what you expect: currently, you are setting the grouping key to `calculation`, whereas you probably want to sum all calculations (which you do by wrapping it in a `ConstraintCollectors.sum(...)`. – Christopher Chianelli Oct 11 '22 at 13:20
  • https://www.optapy.org/docs/latest/constraint-streams/constraint-streams.html should describe most of the API; In regards to (1, 2, 3), that feature is "joining with a UniConstraintStream"; you can only do it on join, and the constraint stream that is joined with must be a `UniConstraintStream`. (4) is true; we don't look at parameter names, so you can use different names for the same parameter in different places. – Christopher Chianelli Oct 11 '22 at 13:25
  • Chris, thanks for your guide. I realized that the number of parameters in the lower lambda should be equal to the number of items I manipulated in the prior 'data container'. I finally implemented my objective function in that way. However, when I try to change values in the dataset, I encountered an error as following: – Meccanico Oct 13 '22 at 08:16
  • `RuntimeError: An error occurred during solving. This can occur when functions take the wrong number of parameters (ex: a setter that does not take exactly one parameter) or by a function returning an incompatible return type (ex: returning a str in a filter, which expects a bool). This can also occur when an exception is raised when evaluating constraints/getters/setters.` – Meccanico Oct 13 '22 at 08:19
  • I found one similar issue here: https://stackoverflow.com/questions/73293229/optapy-error-runtimeerror-an-error-occurred-during-solving-this-can-occur-whe . I did same changes as you mentioned. but this error still here. Could you help to have a look if this is a bug. My code is uploaded here: https://ideone.com/UBJ6cU. Thank you. – Meccanico Oct 13 '22 at 08:24
  • I changed 'Job6' s cost from 5 to 15, then it happend. – Meccanico Oct 13 '22 at 08:25
  • That a bug in OptaPlanner it seems (reproduced on latest final); thanks for reporting. FWIW, changing the ConstraintStreamImplType from Drools to Bavet works (do `from org.optaplanner.core.api.score.stream import ConstraintStreamImplType`, to import ConstraintStreamImplType and then do `solver_config.getScoreDirectorFactoryConfig().setConstraintStreamImplType(ConstraintStreamImplType.BAVET)` before passing the SolverConfig to `solver_factory_create`). (however, you shouldn't need to change the ConstraintStreamImplType; this is a bug). – Christopher Chianelli Oct 14 '22 at 13:35