处理函数写得不太稳健,会吊死而导致
FutureRetList[i].running()一直认为是 True
粗暴做了个超时认定
看了 concurrent.futures 的一些方法,看不出有什么方法能在外部把进程池里某个进程推倒?
#是否进行多线程处理 MultiProcFlag == True #定义进程数量 ProcessAmount = 12 if MultiProcFlag == True: FutureList = [] FutureRetList = [] FutureStartTimeList = [] FutureProcSuit = [] for i in range(ProcessAmount): FutureList.append(futures.ProcessPoolExecutor(max_workers=1)) for i in range(ProcessAmount): FutureRetList.append(futures.Future()) for i in range(ProcessAmount): FutureProcSuit.append('0') for i in range(ProcessAmount): FutureStartTimeList.append(time.time()) pass # 多进程处理 if MultiProcFlag == True: insertPoolFlag = False # 在 while 循环中判断是否成功加入进程池 while insertPoolFlag == False: for i in range(ProcessAmount): Process_i = str(i + 1) if FutureRetList[i].running() == True: #进行时间计算,如果某进程超时,关闭进程,重新提交任务 tmpTime = (time.clock() - FutureStartTimeList[i]) # 如果进程持续时间超过 15 分钟,认定任务失败,需要重新进行 if tmpTime > 900: FutureList[i].shutdown() # <-恐怕不是这样乱来的? #FutureList[i].xxxx? # <-- how? time.sleep(5) FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,FutureProcSuit[i],countt,ErrorLogFilePath) print("进程池:[" + Process_i + "] [重新提交]了: [" + FutureProcIDs[i] + "]",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}") else: print("进程池:[" + Process_i + "] 已经运行了 [" + '[%.2f] 秒' % (time.clock() - FutureStartTimeList[i])) time.sleep(1) pass else: # 见到有空闲的进程就提交任务 FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath) FutureStartTimeList[i] = time.time() FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止 print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}") insertPoolFlag = True break pass time.sleep(2) else: # 单进程处理 ProcessRet = SProcessFunc("1",TaskDict,Task[j],countt,ErrorLogFilePath) 