Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ortools.sat.python import cp_model
- from dataclasses import dataclass
- from collections.abc import Iterable
- import itertools
- def flatten(l):
- for el in l:
- if isinstance(el, Iterable) and not isinstance(el, (str, bytes)):
- yield from flatten(el)
- else:
- yield el
- # def flatten(container):
- # for i in container:
- # if isinstance(i, (list,tuple)):
- # for j in flatten(i):
- # yield j
- # else:
- # yield i
- model = cp_model.CpModel()
- horizon = 15
- workers = [1, 2, 3, 4] # worker id
- workers_job_num = [3, 1, 1, 2] # how much jobs every worker have
- job_types = [1, 2, 3, 4, 5] # job type id
- job_type_num = [2, 1, 2, 1, 1] #how much jobs every type have
- jobs = [(1, 1, 1, 3, 1), # job id, job type, start, length, worker id
- (2, 1, 6, 3, 1),
- (3, 2, 1, 1, 1),
- (4, 3, 4, 2, 2),
- (5, 3, 10, 2, 3),
- (6, 4, 7, 4, 4),
- (7, 5, 9, 1, 4)]
- intervals_by_worker = []
- bools_by_worker = []
- start_by_worker = []
- end_by_worker = []
- for w in workers:
- intervals_by_type = []
- bools_by_type = []
- start_by_type = []
- end_by_type = []
- for jt in job_types:
- intervals_by_job_by_type = [] # all the jobs of some type for a worker
- bools_by_job_by_type = []
- start_by_job_by_type = []
- end_by_job_by_type = []
- for job in jobs:
- if (jt == job[1]) and (w == job[4]): # if jobType == current type of job and worker assigned to the job
- performed_at = model.NewBoolVar("performed by %dw %dj %dt" % (w, job[0], jt))
- start = model.NewIntVar(0, horizon, "start of %dw %dj %dt" % (w, job[0], jt))
- end = model.NewIntVar(0, horizon, "end of %dw %dj %dt" % (w, job[0], jt))
- optint = model.NewOptionalIntervalVar(start, job[3], end, performed_at, "interval of %dw %dj %dt" % (w, job[0], jt))
- bools_by_job_by_type.append(performed_at)
- intervals_by_job_by_type.append(optint)
- start_by_job_by_type.append(start)
- end_by_job_by_type.append(end)
- intervals_by_type.append(intervals_by_job_by_type)
- bools_by_type.append(bools_by_job_by_type)
- start_by_type.append(start_by_job_by_type)
- end_by_type.append(end_by_job_by_type)
- intervals_by_worker.append(intervals_by_type)
- bools_by_worker.append(bools_by_type)
- start_by_worker.append(start_by_type)
- end_by_worker.append(end_by_type)
- print("eereer")
- ########################################################
- # разделяем работы по рабочему
- jobIntervals_by_worker = []
- for w in intervals_by_worker:
- merged = list(itertools.chain(*w))
- jobIntervals_by_worker.append(merged)
- print("eereer")
- # s = 4
- # l = 1
- # f = s+1
- # sleep_interval = model.NewIntervalVar(s, l, f, "test sleep interval for every worker")
- # sleep_interval = model.NewIntervalVar(1, 1, 2, "test sleep interval for every worker")
- # for iw in range(len(ih)):
- # jobIntervals_by_worker[iw] += flatten(ih[iw])
- # работы одного рабочего
- for ji in jobIntervals_by_worker:
- # ji += [sleep_interval] # append test sleep interval
- model.AddNoOverlap(ji)
- ########################################################
- # разделяем работы по типу работы
- jobIntervals_by_type = []
- for t in job_types:
- jobIntervals_by_type.append([])
- # for i_hour in intervals_by_hour:
- for i_worker in intervals_by_worker:
- for i_type in range(len(i_worker)):
- jobIntervals_by_type[i_type] += flatten(i_worker[i_type])
- # работы одного типа не пересекаются
- for ji in jobIntervals_by_type:
- model.AddNoOverlap(ji)
- ########################################################
- # разделяем performed_at по типам работы
- performedAt_by_type = []
- for t in job_types:
- performedAt_by_type.append([])
- # for b_hour in bools_by_hour:
- for b_worker in bools_by_worker:
- for b_type in range(len(b_worker)):
- performedAt_by_type[b_type] += flatten(b_worker[b_type])
- # Количество performed_at (и соответственно интервалов) должно быть меньше равно необходимому
- for i in range(len(performedAt_by_type)):
- model.Add(cp_model.LinearExpr.Sum(performedAt_by_type[i]) <= job_type_num[i])
- ########################################################
- # разделяем performed_at по рабочим
- # разделяем работы по рабочему
- # jobIntervals_by_worker = []
- performedAt_by_worker = []
- for b in bools_by_worker:
- merged = list(itertools.chain(*b))
- jobIntervals_by_worker.append(merged)
- # performedAt_by_worker = []
- # for t in workers:
- # performedAt_by_worker.append([])
- #
- # for b_hour in bools_by_hour:
- # for b_w in range(len(b_hour)):
- # performedAt_by_worker[b_w] += flatten(b_hour[b_w])
- # Количество performed_at (и соответственно интервалов) должно быть меньше равно необходимому
- for i in range(len(performedAt_by_worker)):
- model.Add(cp_model.LinearExpr.Sum(performedAt_by_worker[i]) <= workers_job_num[i])
- ########################################################
- # maxit = list(itertools.chain(*bools_by_worker))
- model.Maximize(cp_model.LinearExpr.Sum(flatten(bools_by_worker)))
- print("ssssss")
- solver = cp_model.CpSolver()
- solver.parameters.log_search_progress = True
- status = solver.Solve(model)
- print("##################################")
- print("Всего работ", len(jobs))
- print("Распределено работ", solver.ObjectiveValue())
- print("##################################")
- if status == cp_model.FEASIBLE or status == cp_model.OPTIMAL or status == cp_model.UNKNOWN:
- # for h in range(len(bools_by_hour)):
- for w in range(len(bools_by_worker)):
- for jt in range(len(bools_by_worker[w])):
- for job in range(len(bools_by_worker[w][jt])): # jobs of one type
- # print(solver.Value(bools_by_hour[h][w][jt][job]))
- if solver.Value(bools_by_worker[w][jt][job]) == 1:
- print(bools_by_worker[w][jt][job].Name())
- print(solver.Value(start_by_worker[w][jt][job]), solver.Value(end_by_worker[w][jt][job]))
- # print("qwqwqw")
- else:
- print("Unfeasible")
- # сделать список работ для каждого рабочего по типу работы. Количество таких работ <= количеству для рабочего
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement