Houdini PCG: 对多线程的Foreach节点做负载均衡
Houdini的Foreach节点可以使用Compile节点进行多线程加速。
Foreach的运行时间取决于运行最慢的线程,同时造成较快的线程长时间处于闲置状态。
该方法意在运行Foreach前,对若干组同Class的Prims/Points进行线程分配(在该方法中用thread属性体现),以达到负载均衡、算力不浪费。
因此问题抽象为了“将n个数分配到m个组中,使每个组中数的和尽量相近”。
import math
import psutil
# 将数组分成n个数组,每个数组的和尽量接近
def GetAvgArr(number_list, group_list, arr_num):
avg_arrays = []
group_arrays = []
if len(number_list) == 0 or len(number_list) < arr_num:
return avg_arrays, group_arrays
# 1. 计算平均值
sum_value = 0
mean_value = 0
number_list_float = []
for num in number_list:
number_list_float.append(float(num))
sum_value += float(num)
mean_value = sum_value / float(arr_num)
for cnt in range(0, arr_num):
# print("mean = ", mean_value)
arr = []
group_arr = []
if cnt == arr_num-1:
# 最后一组,返回数组剩余所有数
avg_arrays.append(transFloatToIntList(number_list_float))
group_arrays.append(group_list)
break
# 如果最大的数 max >= mean,这个数单独一个组
if len(number_list_float) > 0 and number_list_float[0] >= mean_value:
arr = [number_list_float[0]]
avg_arrays.append(transFloatToIntList(arr))
group_arr =[group_list[0]]
group_arrays.append(group_arr)
sum_value = sum_value - number_list_float[0]
# 重新计算剩下partition的平均值
mean_value = sum_value / float(arr_num-len(avg_arrays))
else:
# 否则寻找一组数据
arr, group_arr, _ = getList(number_list_float, group_list, mean_value, math.pow(mean_value, 2))
avg_arrays.append(transFloatToIntList(arr))
group_arrays.append(group_arr)
# 将已经形成一组的数据从原数组中移除,准备寻找下一组数据
number_list_float = removeFromFloatList(number_list_float, arr)
group_list = removeFromFloatList(group_list, group_arr)
return avg_arrays, group_arrays
# 将[]float转为[]int
def transFloatToIntList(float_list):
res = []
for item in float_list:
res.append(int(item))
return res
# 将在 remove_nums 中出现过的数字从 originalList 中移除
def removeFromFloatList(original_list, remove_nums):
res = []
start = 0
for remove in remove_nums:
for i in range(start, len(original_list)):
if original_list[i] == remove:
res.extend(original_list[start:i])
start = i + 1
break
res.extend(original_list[start:])
return res
def getList(arr, group_arr, delta, distance):
res = []
res1 = []
if len(arr) == 0:
return res, res1, -1
for i in range(0, len(arr)-1):
if delta == arr[i]:
res.append(arr[i])
res1.append(group_arr[i])
return res, res1, 0
elif delta < arr[i]:
continue
elif delta > arr[i]:
if i == 0:
res.append(arr[i])
res1.append(group_arr[i])
delta = delta - arr[i]
distance = math.pow(delta, 2)
tmp, tmp1, d = getList(arr[i+1:], group_arr[i+1:], delta, distance)
res.extend(tmp)
res1.extend(tmp1)
return res, res1, d
else:
dis1 = math.pow(arr[i-1]-delta, 2)
dis2 = math.pow(delta-arr[i], 2)
if dis1 > dis2:
res.append(arr[i])
res1.append(group_arr[i])
delta = delta - arr[i]
tmp, tmp1, d = getList(arr[i+1:], group_arr[i+1:], delta, dis2)
res.extend(tmp)
res1.extend(tmp1)
return res, res1, d
else:
tmp, tmp1, d = getList(arr[i:], group_arr[i:], delta, dis2)
if dis1 > d:
res.extend(tmp)
res1.extend(tmp1)
return res, res1, d
res.append(arr[i-1])
res1.append(group_arr[i-1])
return res, res1, dis1
dis = math.pow(delta-arr[len(arr)-1], 2)
if dis < distance:
return arr[len(arr)-1:], group_arr[len(group_arr)-1:], dis
return [], [], -1
partition_list = []
group_list = []
for prim in hou.pwd().geometry().prims():
partition_list.append(prim.attribValue("count"))
group_list.append(prim.attribValue("class"))
arrays, group_arrays = GetAvgArr(partition_list, group_list, psutil.cpu_count())
for prim in hou.pwd().geometry().prims():
for i, a in enumerate(group_arrays):
if prim.attribValue("class") in a:
prim.setAttribValue("thread", i)#thread属性会被填入到foreach end节点中的Piece Attribute之中。
break
结果如下图:
28个连通块被分配到了16个线程之中,避免了12次多余的循环。

- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0协议,完整转载请注明来自 零度冰山
评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果
