Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ortools.sat.python import cp_model
- from dataclasses import dataclass
- def flatten(container):
- for i in container:
- if isinstance(i, (list,tuple)):
- for j in flatten(i):
- yield j
- else:
- yield i
- model = cp_model.CpModel()
- horizon = 15
- workers = [1, 2, 3, 4] # worker id
- workers_job_num = [3, 1, 1, 2] # how much jobs every worker have
- job_types = [1, 2, 3, 4, 5] # job type id
- job_type_num = [2, 1, 2, 1, 1] #how much jobs every type have
- jobs = [(1, 1, 1, 3, 1), # job id, job type, start, length, worker id
- (2, 1, 6, 3, 1),
- (3, 2, 1, 1, 1),
- (4, 3, 4, 2, 2),
- (5, 3, 10, 2, 3),
- (6, 4, 7, 4, 4),
- (7, 5, 9, 1, 4)]
- intervals_by_hour = []
- bools_by_hour = []
- start_by_hour = []
- end_by_hour = []
- for h in range(horizon):
- intervals_by_worker = []
- bools_by_worker = []
- start_by_worker = []
- end_by_worker = []
- for w in workers:
- intervals_by_type = []
- bools_by_type = []
- start_by_type = []
- end_by_type = []
- for jt in job_types:
- intervals_by_job_by_type = [] # all the jobs of some type for a worker
- bools_by_job_by_type = []
- start_by_job_by_type = []
- end_by_job_by_type = []
- for job in jobs:
- if (jt == job[1]) and (w == job[4]): # if jobType == current type of job and worker assigned to the job
- performed_at = model.NewBoolVar("performed by %dh %dw %dj %dt" % (h, w, job[0], jt))
- start = model.NewIntVar(h, h, "start of %dh %dw %dj %dt" % (h, w, job[0], jt))
- end = model.NewIntVar(0, horizon, "end of %dh %dw %dj %dt" % (h, w, job[0], jt))
- optint = model.NewOptionalIntervalVar(start, job[3], end, performed_at, "interval of %dh %dw %dj %dt" % (h, w, job[0], jt))
- bools_by_job_by_type.append(performed_at)
- intervals_by_job_by_type.append(optint)
- start_by_job_by_type.append(start)
- end_by_job_by_type.append(end)
- intervals_by_type.append(intervals_by_job_by_type)
- bools_by_type.append(bools_by_job_by_type)
- start_by_type.append(start_by_job_by_type)
- end_by_type.append(end_by_job_by_type)
- intervals_by_worker.append(intervals_by_type)
- bools_by_worker.append(bools_by_type)
- start_by_worker.append(start_by_type)
- end_by_worker.append(end_by_type)
- intervals_by_hour.append(intervals_by_worker)
- bools_by_hour.append(bools_by_worker)
- start_by_hour.append(start_by_worker)
- end_by_hour.append(end_by_worker)
- print("eereer")
- ########################################################
- # разделяем работы по рабочему
- jobIntervals_by_worker = []
- for w in workers:
- jobIntervals_by_worker.append([])
- s = 4
- l = 1
- f = s+1
- sleep_interval = model.NewIntervalVar(s, l, f, "test sleep interval for every worker")
- # sleep_interval = model.NewIntervalVar(1, 1, 2, "test sleep interval for every worker")
- for ih in intervals_by_hour:
- for iw in range(len(ih)):
- jobIntervals_by_worker[iw] += flatten(ih[iw])
- # jobs of worker
- for ji in jobIntervals_by_worker:
- ji += [sleep_interval] # append test sleep interval
- model.AddNoOverlap(ji)
- ########################################################
- # jobs by job type
- jobIntervals_by_type = []
- for t in job_types:
- jobIntervals_by_type.append([])
- for i_hour in intervals_by_hour:
- for i_worker in i_hour:
- for i_type in range(len(i_worker)):
- jobIntervals_by_type[i_type] += flatten(i_worker[i_type])
- # jobs of one type dont overlap
- for ji in jobIntervals_by_type:
- model.AddNoOverlap(ji)
- ########################################################
- # performed_at by job type
- performedAt_by_type = []
- for t in job_types:
- performedAt_by_type.append([])
- for b_hour in bools_by_hour:
- for b_worker in b_hour:
- for b_type in range(len(b_worker)):
- performedAt_by_type[b_type] += flatten(b_worker[b_type])
- # performed_at (and their intervals) <= needed amount of jobs
- for i in range(len(performedAt_by_type)):
- model.Add(cp_model.LinearExpr.Sum(performedAt_by_type[i]) <= job_type_num[i])
- ########################################################
- # performed_at by workers
- performedAt_by_worker = []
- for t in workers:
- performedAt_by_worker.append([])
- for b_hour in bools_by_hour:
- for b_w in range(len(b_hour)):
- performedAt_by_worker[b_w] += flatten(b_hour[b_w])
- # performed_at (and their intervals) <= needed amount of jobs
- for i in range(len(performedAt_by_worker)):
- model.Add(cp_model.LinearExpr.Sum(performedAt_by_worker[i]) <= workers_job_num[i])
- ########################################################
- model.Maximize(cp_model.LinearExpr.Sum(flatten(bools_by_hour)))
- solver = cp_model.CpSolver()
- solver.parameters.log_search_progress = True
- status = solver.Solve(model)
- print("##################################")
- print("Amount of jobs: ", len(jobs))
- print("Jobs scheduled: ", solver.ObjectiveValue())
- print("##################################")
- if status == cp_model.FEASIBLE or status == cp_model.OPTIMAL or status == cp_model.UNKNOWN:
- for h in range(len(bools_by_hour)):
- for w in range(len(bools_by_hour[h])):
- for jt in range(len(bools_by_hour[h][w])):
- for job in range(len(bools_by_hour[h][w][jt])): # jobs of one type
- # print(solver.Value(bools_by_hour[h][w][jt][job]))
- if solver.Value(bools_by_hour[h][w][jt][job]) == 1:
- print(bools_by_hour[h][w][jt][job].Name())
- print(solver.Value(start_by_hour[h][w][jt][job]), solver.Value(end_by_hour[h][w][jt][job]))
- # print("qwqwqw")
- else:
- print("Unfeasible")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement