在Python中寻找雇用K名工人的最小费用
在实际工作中,往往需要将一项大型任务分配给多名工人同时完成,而这些工人的费用也不同。为了保证任务的完成同时节省费用,我们需要找到一种方法能够以最小费用雇用K名工人来完成任务。接下来我们将使用Python语言来解决这个问题。
问题描述
有N个工人和N项任务,每个工人可以只完成一项任务。每个工人完成任务i所需要的费用为cost[i][j]。需要从这N个工人中选择K名工人完成这N项任务。如何选择这K名工人,才能使得完成这N项任务的总费用最小。
解决思路
我们可以将这个问题转化为一个 bipartite matching 问题。可以将工人看作x集合,任务看作y集合,费用看作权重。然后我们就可以使用二分图匹配的算法来解决这个问题。下面就从建图,生成匹配图,使用匈牙利算法和计算最小费用四个部分来详细介绍。
建图
我们先建立一个含有N个工人和N个任务的连接图,其中工人节点集合用X表示,任务节点集合用Y表示。为了每个工人只完成一项任务,这断开图中所有在工人节点集合X中的边和在任务节点集合Y中的边。接下来,需要遍历所有的工人和任务,针对这个图的每一条边e,添加一条边,其权重为e的费用。而为了最小化费用,我们将需要在每条边的权重中添加一个负符号。这样才能保证Dijkstra算法的正确性。
import math
n = 3 # 工人和任务的数量
k = 3 # 选择k名工人
INF = -int(math.pow(2, 31)) # 初始化无穷小
cost = [[0, 3, -4], [-3, 0, 5], [4, -5, 0]]
class BipartiteMatching:
def __init__(self, n, k, INF, cost):
self.n = n # 工人和任务的数量
self.k = k # 选择k名工人
self.INF = INF # 初始化无穷小
self.cost = [[] for i in range(n)] # 二维数组用于存储费用
def generateGraph(self):
for i in range(self.n):
for j in range(self.n):
self.cost[i].append(-self.INF)
for k in range(self.n):
self.cost[i][j] = max(self.cost[i][j], cost[i][k] - cost[j][k])
self.cost[i][j] = -self.cost[i][j]
上述代码中我们首先定义了三个变量:n表示工人和任务的数量,k表示选择k名工人,INF初始化为无限小,通过cost数组来存储二维矩阵表示费用。当cost[i][j]的值表示让第i个工人完成j个任务所需要的费用。接下来定义了BipartiteMatching类。在类的初始化函数中我们首先将n,k,INF,cost设置为实例变量,其次通过循环依次初始化cost数组中的每一个元素。最后我们在每条边的权重中添加了一个负符号,这样就保证了Dijkstra算法的正确性。
生成匹配图
在生成匹配图中,我们需要使用到匈牙利算法。这个算法可以在任意二分图中找到最大匹配。通常,在许多算法中,匈牙利算法被用于构造可行解,同时提供了一个优秀的最优化算法的基础。在本问题中,我们需要先将图转化为最大权最大匹配的图,然后再将其转化为最小权最大匹配的图。
class BipartiteMatching:
def generateMatchingGraph(self):
# 初始化pairs、lx和ly
pairs = [-1] * self.n
lx = [-self.INF] * self.n
ly = [0] * self.n
# 求解pairs
for i in range(self.n):
slack = [self.INF] * self.n
slackx = [-1] * self.n
sy = [False] * self.n
sy[i] = True
q = [i]
flag = False
while not flag:
k = -1
for j in q:
for l in range(self.n):
if not sy[l]:
if lx[j] + ly[l] - self.cost[j][l] < slack[l]:
slack[l] = lx[j] + ly[l] - self.cost[j][l]
slackx[l] = j
if k == -1 or slack[l] < slack[k]:
k = l
delta = slack[k]
for j in range(self.n):
if sy[j]:
lx[pairs[j]] -= delta
ly[j] += delta
else:
slack[j] -= delta
lx[i] += delta
sy[k] = True
q.append(k)
if pairs[k] == -1:
flag = True
else:
j = pairs[k]
for l in range(self.n):
if not sy[l]:
if lx[j] + ly[l] - self.cost[j][l] < slack[l]:
slack[l] = lx[j] + ly[l] - self.cost[j][l]
slackx[l] = j
q.append(pairs[k])
while k != -1:
j = slackx[k]
pairs[k], j = j, pairs[k]
k = pairs[j]
# 转化为最小权重最大匹配的图
for i in range(self.n):
self.cost[i] = list(map(lambda x: x+lx[i]+ly, self.cost[i]))
上述代码中我们定义了BipartiteMatching类,并在类中定义了一个生成匹配图的方法generateMatchingGraph。在这个方法中,我们首先初始化pairs、lx和ly,pairs用于存储每个工人分配到的任务编号,lx和ly分别表示工人节点和任务节点的顶标(初始都为0)。然后我们通过循环求解pairs数组。为了找到增广路径,我们需要借助一个slack数组表示关联了任务节点的顶部集合中的所有节点的深度(即匈牙利树的深度)和其匹配的最小边权值之和与其所关联工人节点的顶部集合中所有节点的顶标之和的差值。
最后,我们将图转化为最小权重最大匹配的图。我们需要将每个任务节点的费用减去其关联工人顶标和任务顶标的差值在转化为相反数。这样就得到了最小费用最大匹配的图。
使用匈牙利算法
在我们得到最小费用最大匹配的图之后,我们就可以使用匈牙利算法来完成对最小费用的解的求解。由于我们已经将匹配图转化为最小权重最大匹配的图,所以我们需要找到的就是最大匹配(最小权重的和)。
class BipartiteMatching:
def hungarian(self):
res = 0
for i in range(self.n):
self.generateMatchingGraph()
match = [-1] * self.n
lx = [-self.INF] * self.n
ly = [-self.INF] * self.n
matches = 0
# 进行最大匹配计算
for j in range(self.n):
if match[j] == -1:
path = [-1] * self.n slack = [self.INF] * self.n
slackx = [-1] * self.n
sy = [False] * self.n
q = []
q.append(j)
flag = False
while not flag:
k = -1
for l in q:
for i in range(self.n):
if not sy[i]:
if lx[l] + ly[i] - self.cost[l][i] < slack[i]:
slack[i] = lx[l] + ly[i] - self.cost[l][i]
slackx[i] = l
if k == -1 or slack[i] < slack[k]:
k = i
delta = slack[k]
for l in range(self.n):
if sy[l]:
lx[match[l]] -= delta
ly[l] += delta
else:
slack[l] -= delta
lx[j] += delta
sy[k] = True
q.append(match[k])
if match[k] != -1:
continue
ap = k
cp = j
while True:
if cp == -1:
break
t = match[cp]
match[cp] = ap
match[ap] = cp
ap = t
cp = path[cp]
matches += 1
flag = True
if self.n - matches < self.k:
return False # 工人人数小于K个时需要返回False
# 计算最小费用
k = 0
for i in range(self.n):
if match[i] != -1:
k += self.cost[match[i]][i]
res += k
return res
上述代码中我们定义了BipartiteMatching类,并在类中定义了一个求解最小费用的方法hungarian。在这个方法中,我们首先在外循环中循环执行n次(因为我们要选择n个工人完成n项任务),并通过generateMatchingGraph方法计算得到最小费用的最大匹配图。
接下来,我们定义match、lx、ly和matches。match用于存储每个工人分配到的任务编号,lx和ly分别表示工人节点和任务节点的顶标,初始值均为负无穷。matches用于存储计算得到的最大匹配数。
我们执行内循环,判断是否存在未匹配的工人节点。如果存在就找到从该工人节点开始的增广路径,同时将匹配数加1。在找到增广路径之后,我们需要注意将匹配图转化为最小权重的最大匹配图。
最后,在每次循环结束后我们需要判断选出来的工人人数是不是小于K,如果是的话,需要返回False。
完整代码
“`python
import math
n = 3 # 工人和任务的数量
k = 3 # 选择k名工人
INF = -int(math.pow(2, 31)) # 初始化无穷小
cost = [[0, 3, -4], [-3, 0, 5], [4, -5, 0]]
class BipartiteMatching:
def init(self, n, k, INF, cost):
self.n = n # 工人和任务的数量
self.k = k # 选择k名工人
self.INF = INF # 初始化无穷小
self.cost = [[] for i in range(n)] # 二维数组用于存储费用
def generateGraph(self):
for i in range(self.n):
for j in range(self.n):
self.cost[i].append(-self.INF)
for k in range(self.n):
self.cost[i][j] = max(self.cost[i][j], cost[i][k] - cost[j][k])
self.cost[i][j] = -self.cost[i][j]
def generateMatchingGraph(self):
# 初始化pairs、lx和ly
pairs = [-1] * self.n
lx = [-self.INF] * self.n
ly = [0] * self.n
# 求解pairs
for i in range(self.n):
slack = [self.INF] * self.n
slackx = [-1] * self.n
sy = [False] * self.n
sy[i] = True
q = [i]
flag = False
while not flag:
k = -1
for j in q:
for l in range(self.n):
if not sy[l]:
if lx[j] + ly[l] - self.cost[j][l] < slack[l]:
slack[l] = lx[j] + ly[l] - self.cost[j][l]
slackx[l] = j
if k == -1 or slack[l] < slack[k]:
k = l
delta = slack[k]
for j in range(self.n):
if sy[j]:
lx[pairs[j]] -= delta
ly[j] += delta
else:
slack[j] -= delta
lx[i] += delta
sy[k] = True
q.append(k)
if pairs[k] == -1:
flag = True
else:
j = pairs[k]
for l in range(self.n):
if not sy[l]:
if lx[j] + ly[l] - self.cost[j][l] < slack[l]:
slack[l] = lx[j] + ly[l] - self.cost[j][l]
slackx[l] = j
q.append(pairs[k])
while k != -1:
j = slackx[k]
pairs[k], j = j, pairs[k]
k = pairs[j]
# 转化为最小权重最大匹配的图
for i in range(self.n):
self.cost[i] = list(map(lambda x: x+lx[i]+ly, self.cost[i]))
def hungarian(self):
res = 0
for i in range(self.n):
self.generateMatchingGraph()
match = [-1] * self.n
lx = [-self.INF] * self.n
ly = [-self.INF] * self.n
matches = 0
# 进行最大匹配计算
for j in range(self.n):
if match[j] == -1:
path = [-1] * self.n
slack = [self.INF] * self.n
slackx = [-1] * self.n
sy = [False] * self.n
sy[j] = True
q = [j]
flag = False
while not flag:
k = -1
for l in q:
for i in range(self.n):
if not sy[i]:
if lx[l] + ly[i] - self.cost[l][i] < slack[i]:
slack[i] = lx[l] + ly[i] - self.cost[l][i]
slackx[i] = l
if k == -1 or slack[i] < slack[k]:
k = i
delta = slack[k]
for l in range(self.n):
if sy[l]:
lx[match[l]] -= delta
ly[l] += delta
else:
slack[l] -= delta
lx[j] += delta
sy[k] = True
q.append(match[k])
if match[k] != -1:
continue
ap = k
cp = j
while True:
if cp == -1:
break
t = match[cp]
match[cp] = ap
match[ap] = cp
ap = t
cp = path[cp]
matches += 1
flag = True
if self.n - matches < self.k:
return False # 工人人数小于K个时需要返回False
# 计算最小费用
k = 0
for i in range(self.n):
if match[i] != -1:
k += self.cost[match[i]][i]
res += k
return res
bm = BipartiteMatching(n, k, INF, cost)
bm.generateGraph()
res = bm.hungarian()
if res False:
print(“无法完成工作分配”)
else:
print(“最小花费为:”, -res)