# Implementation of the {hotpath} module. # Last edited on 2021-11-10 16:00:08 by stolfi import hotpath import path import move import block import contact import job_parms import rn import plot_data import txt_read from math import sqrt, sin, cos, log, exp, floor, ceil, inf, nan, pi import time def solve(preph, BCS, SMS, sufph, Qbest, report, get_cands, tcpulim, mp_jump) tcpuini = time.clock_gettime(CLOCK_THREAD_CPUTIME_ID) tcpufin = tcpuini + tcpulim Qbest, ncalls = solve_aux \ ( preph, BCS, SMS, sufph, report = report, get_cands = get_cands, tcpufin = tcpufin, Qbest = Qbest, mp_jump = mp_jump, ncalls = 0, lev = 0 ) tcputot = time.clock_gettime(CLOCK_THREAD_CPUTIME_ID) - tcpuini return Qbest, ncalls, tcputot # ---------------------------------------------------------------------- def solve_2(preph, BCS, SMS, sufph, Qbest, get_cands, tcpulim, mp_jump): Qbest_1, ncalls_1, tcputot_1 = solve \ ( preph, BCS, SMS, report = report, get_cands = get_cands_prune , tcpulim = 0, Qbest = None, mp_jump = mp_jump ) Qbest_2, ncalls_2, tcputot_2 = solve \ ( preph, BCS, SMS, report = report, get_cands = get_cands, tcpulim = tcpulim, Qbest = Qbest_1, mp_jump = mp_jump ) return Qbest_2, ncalls_1, tcputot_1, ncalls_2, tcputot_2 # ---------------------------------------------------------------------- def solve_aux(Q, BCR, SMR, sufph, report, get_cands, tcpufin, Qbest, mp_jump, ncalls, lev): Q_fabtime = path.fabtime(Q) # Time already taken by {t} Q_valid = seam.valid(Q, SMR) tcpunow = time.clock_gettime(CLOCK_THREAD_CPUTIME_ID) if report != None: report(Q, Q_fabtime, Q_valid, ncalls, tcpufin - tcpunow, BCR, SMR, sufph) # We made one more call: ncalls += 1 # Could {Q} be the beginning of a valid path? if not predvalid(Q, BCR, sufph, SMR): # No -- Not worth continuing, return what we got so far: return Qbest, ncalls # Could {Q} be extended to a path better than {Qbest}? BCR_fabtime = rest_min_fabtime(BCR, sufph) # Minimum time needed to excute the remaing blocks. if Qbest != None and Q_fabtime + BCR_fabtime >= path.fabtime(Qbest): # No -- Not worth continuing, return what we got so far: return Qbest, ncalls # Is Q a full path? if len(BCR) == 0 and sufph == None: # If we got here, it must be valid: assert len(SMR) == 0 # If we got here, it must be better than {Qbest}: assert Qbest == None or Q_fabtime < path.fabtime(Qbest) wr.write("!! found a better path") wr.write(" n_calls = %d fab_time = %.3f\n" % (ncalls, Q_fabtime)) return Qbest, ncalls # Did we do enough work? if Qbest != None and tcpunow >= tcpufin: return Qbest, ncalls # Try to extend {Q} with each of the blocks in {BCR}, in all possible choices: CANDS = get_cands(Q, BCR, sufph, SMR) for bc, ich in CANDS: cph = block.choice(bc, ich) use_links = False Q_plus = path.concat((Q, cph,), use_links, mp_jump) BCR_minus = remove_block(BCR, bc) SMR_minus = remove_seams(SMR, bc) if ncalls%100000 == 0: wr.write("!! current best path") wr.write(" n_calls = %d valid = %s fab_time = %.3f\n" % (ncalls, Q_valid, Q_fabtime)) Qbest, ncalls = solve_aux \ ( Q_plus, BCR_minus, SMR_minus, report = report, get_cands = get_cands, tcpufin = tcpufin, Qbest = Qbest, mp_jump = mp_jump, ncalls = ncalls, lev = lev+1 ) # Did we do enough work: tcpunow = time.clock_gettime(CLOCK_THREAD_CPUTIME_ID) if Qbest != None and tcpunow >= tcpufin: return Qbest, ncalls # We tried all possible extensions of {Q}: return Qbest, ncalls # ---------------------------------------------------------------------- def predvalid(Q, BCR, sufph, SMR) ??? # ---------------------------------------------------------------------- def get_cands_prune(Q, BCR, sufph, SMR): # Get all the blocks that have seams with {Q} def find_raster_link(qEnd, bc, ip, SMR): B_aux = block.choice(bc, ip) for ct in SMR: bc0 = contact.side_block(ct, 0) bc1 = contact.side_block(ct, 1) if bc0 == bc or bc1 == bc: if contact.get_raster_link(ct) != [None, None]: p_ini = path.pini(B_aux) links = contact.get_raster_link(ct) if links[0] != None: p0 = path.pini(links[0]) q0 = path.pfin(links[0]) if qEnd == p0 and p_ini == q0: return links[0] elif qEnd == q0 and p_ini == p0: return path.rev(links[0]) if links[1] != None: p1 = path.pini(links[1]) q1 = path.pfin(links[1]) if qEnd == p1 and p_ini == q1: return links[1] elif qEnd == q1 and p_ini == p1: return path.rev(links[1]) return None BCR_AUX = [] NEW_BCPS = [] if y != None: for bc in BCR: y_aux = block.get_y(bc, 1) if y > y_aux: BCR_AUX.append(bc) if BCR_AUX == []: BCR_AUX = [BCR[0]] for bc in BCR_AUX: np = block.nchoices(bc) raster_link = find_raster_link(qEnd, bc, 0, SMR) bcp = (bc, 0, raster_link) NEW_BCPS.append(bcp) if np > 2: raster_link = find_raster_link(qEnd, bc, 2, SMR) bcp = (bc, 2, raster_link) NEW_BCPS.append(bcp) return tuple(NEW_BCPS) def candidatesA(qEnd, BCR, SMR): def opdist(bcp): bc, ip, raster_link = bcp oph = block.choice(bc, ip) d = rn.dist(qEnd, path.pini(oph)) return d def find_raster_link(qEnd, bc, ip, ct): B_aux = block.choice(bc, ip) if contact.get_raster_link(ct) != [None, None]: pIni = path.pini(B_aux) links = contact.get_raster_link(ct) if links[0] != None: p0 = path.pini(links[0]) q0 = path.pfin(links[0]) if qEnd == p0 and pIni == q0: return links[0] elif qEnd == q0 and pIni == p0: return path.rev(links[0]) if links[1] != None: p1 = path.pini(links[1]) q1 = path.pfin(links[1]) if qEnd == p1 and pIni == q1: return links[1] elif qEnd == q1 and pIni == p1: return path.rev(links[1]) return None ### BOC = [] # Blocks with open contacts. BCC = [] # Blocks without open contacts. for ct in SMR: bc0 = contact.side_block(ct, 0) bc1 = contact.side_block(ct, 1) done0 = block.used(bc0) done1 = block.used(bc1) if (done0 == True and done1 == False) or (done0 == False and done1 == True): bc = bc0 if done0 == False else bc1 np = block.nchoices(bc) for ip in range(np): raster_link = find_raster_link(qEnd, bc, ip, ct) bcp = (bc, ip, raster_link) if bcp not in BOC and bc in BCR: BOC.append(bcp) BCR = remove_block(BCR, bc) for bc in BCR: np = block.nchoices(bc) for ip in range(np): bcp = (bc, ip, None) if bcp not in BOC and bcp not in BCC: BCC.append(bcp) BOC.sort(key = opdist) BCC.sort(key = opdist) NEW_BCPS = BOC + BCC return tuple(NEW_BCPS) def candidatesB(qEnd, BCR, SMR): def opdist(bcp): bc, ip, raster_link = bcp oph = block.choice(bc, ip) d = rn.dist(qEnd, path.pini(oph)) return d def find_raster_link(qEnd, bc, ip, SMR): B_aux = block.choice(bc, ip) for ct in SMR: bc0 = contact.side_block(ct, 0) bc1 = contact.side_block(ct, 1) if bc0 == bc or bc1 == bc: if contact.get_raster_link(ct) != [None, None]: p_ini = path.pini(B_aux) links = contact.get_raster_link(ct) if links[0] != None: p0 = path.pini(links[0]) q0 = path.pfin(links[0]) if qEnd == p0 and p_ini == q0: return links[0] elif qEnd == q0 and p_ini == p0: return path.rev(links[0]) if links[1] != None: p1 = path.pini(links[1]) q1 = path.pfin(links[1]) if qEnd == p1 and p_ini == q1: return links[1] elif qEnd == q1 and p_ini == p1: return path.rev(links[1]) return None y_dict = dict() y_list = list() for bc in BCR: y_aux = block.get_y(bc, 1) not_in_list = True for y in y_dict.keys(): if txt_read.is_equal(y, y_aux, 0.05): y_dict[y].append(bc) not_in_list = False if not_in_list: y_dict[y_aux] = [] y_dict[y_aux].append(bc) y_list.append(y_aux) NEW_BCR = [] y_list.sort() for key in y_list: BCR_AUX = [] for bc in y_dict[key]: np = block.nchoices(bc) for ip in range(np): bcp = (bc, ip, None) raster_link = find_raster_link(qEnd, bc, ip, SMR) bcp = (bc, ip, raster_link) BCR_AUX.append(bcp) BCR_AUX.sort(key = opdist) NEW_BCR = NEW_BCR + BCR_AUX return tuple(NEW_BCR) ########## def remove_block(BCR, bc): BSminus = tuple(bcx for bcx in BCR if bcx != bc) return BSminus def remove_contacts(SMR): SMR_minus = tuple(ctx for ctx in SMR if not exclude_contact(ctx)) return SMR_minus def exclude_contact(ct): bc0 = contact.side_block(ct, 0) bc1 = contact.side_block(ct, 1) done0 = block.used(bc0) done1 = block.used(bc1) if (done0 and done1) == False: return False return True ########## def describe_input(wr, BS, CS, start_point, number_lines, Delta, max_time, parms): wr.write('------- PARAMETERS:\n') job_parms.write(wr, None, parms, None) wr.write("----------------------------------------------------------------------\n") wr.write('------- DETAILS:\n') wr.write("num_blocks = %d\n" % len(BS)) wr.write("number of lines = %d\n" % number_lines) wr.write("number of contacts = %d\n" % len(CS)) wr.write("average choices per block = %.3f\n" % blocks_avg_choices(BS)) wr.write("lower bound to execution time = %.3f s\n" % (blocks_min_starttime(start_point, BS, parms) + rest_min_fabtime(BS))) wr.write("lower bound to max cooling time = %.3f s\n" % blocks_min_max_tcool(CS, BS, parms)) if Delta == +inf: wr.write("no limit on cooling time\n") else: wr.write("max allowed cooling time = %.3f s\n" % Delta) if max_time == None: wr.write("no runtime limit\n") else: wr.write("max allowed time = %d\n" % max_time) block_times(wr, BS) wr.write("----------------------------------------------------------------------\n") return # ---------------------------------------------------------------------- def blocks_avg_choices(BS): # Returns the geometric average of the number of choices # of the blocks in the list {BS}. slog = 0 for bc in BS: np = block.nchoices(bc) assert np >=1 slog += log(np) avg = exp(slog/len(BS)) return avg def blocks_min_starttime(p, BCR, parms): # Given a point {p} and the list {BCR} of blocks that have not been yet # included in the tentative tool-path, returns the minimum time needed # to jump to the nearest starting point of any choice of any of those # blocks. dmin = +inf for bc in BCR: for ip in range(block.nchoices(bc)): oph = block.choice(bc, ip) di = rn.dist(p, path.pini(oph)) if di < dmin: dmin = di assert dmin != +inf tmin = nozzle_travel_time(dmin, True, None, parms) return tmin # ---------------------------------------------------------------------- def rest_min_fabtime(BCR): # Given the list{BCR} of blocks that have not been yet included in the # tentative tool-path, returns an estimate of the minimum time needed to # execute them, even if the best alternative could be chosen in each # block. mex = 0 for bc in BCR: mex += block.min_fabtime(bc) return mex def blocks_min_max_tcool(SMR,BCR,parms): # Given a list of contacts {SMR} and a list of blocks {BCR}, returns a # lower bound to the value of {contact.max_tcool(P,SMR)} for any path {P} # built from those blocks. # # The list {SMR} must contain only relevant contacts, that have at least # one side covered by one of blocks of {BCR}. # # For each contact {ct} of {SMR}, it finds the two blocks {bc0} and {bc1} # of {BCR} that include the sides of {ct}. They must both exist, be # unique, and distinct. Computes a min cooling time {tcm(ct)} assuming # that the best possible choices {ph0,ph1} of those blocks that contain # the sides of {ct} are added in sequence to the tool-path. Ignores any # choice of either block that does not contain those sides, assuming # that the contact {ct} will become irrelevant if that choice is # selected for the tool-path. # # Then it returns the maximum of {tcm(ct)} among all contacts {ct} of {SMR}. # If {SMR} is empty, returns 0. tcmax = 0 for ct in SMR: tcmin = blocks_min_tcool(BCR, ct, parms) assert tcmin < +inf, "some side of a contact is not covered by {BCR}" if tcmin > tcmax: tcmax = tcmin return tcmax # ---------------------------------------------------------------------- def blocks_min_tcool(BS, ct, parms): mv0 = contact.side(ct,0) mv1 = contact.side(ct,1) # Find the two blocks that have {mv0} and {mv1}: bc0 = None bc1 = None for bc in BS: has0 = block.find_choice_with_move(bc, mv0) has1 = block.find_choice_with_move(bc, mv1) assert not (has0 != None and has1 != None), "contact has two sides in same block" if has0 != None: assert bc0 == None, "same move occurs in two different blocks" bc0 = bc if has1 != None: assert bc1 == None, "same move occurs in two different blocks" bc1 = bc if bc0 != None and bc1 != None: break # Now {bc0} and {bc1} are the blocks that contain {mv0} and {mv1}, if any. assert bc0 != None or bc1 != None, "contact is not relevant to blocks of {BS}" if bc0 != None and bc1 != None: assert bc0 != bc1, "contact has two sides on the same block" tcmin = block_pair_min_tcool(bc0, bc1, ct, parms) else: tcmin = +inf return tcmin def block_pair_min_tcool(bc0, bc1, ct, parms): tcmin = +inf # Enumerate choices of {bc0}: for ip0 in range(block.nchoices(bc0)): # Get choice {oph0} and the cooling time {tc0} to the end of it: oph0 = block.choice(bc0, ip0) tcs0 = contact.covtimes(path.rev(oph0), ct) assert tcs0[0] == None or tcs0[1] == None, "contact has both sides on {bc0}" tc0 = tcs0[0] if tcs0[0] != None else tcs0[1] if tc0 != None: p0 = path.pfin(oph0) # Enumerate choices of {bc1}: for ip1 in range(block.nchoices(bc1)): # Get choice {oph1} and the cooling time {tc1} to the end of it: oph1 = block.choice(bc1, ip1) tcs1 = contact.covtimes(oph1, ct) assert tcs1[0] == None or tcs1[1] == None, "contact has both sides on {bc1}" tc1 = tcs1[0] if tcs1[0] != None else tcs1[1] if tc1 != None: # Compute min time for jump or link between them: p1 = path.pini(oph1) dist = rn.dist(p0, p1) tjmp = nozzle_travel_time(dist, True, None, parms) # Jump time. tlnk = nozzle_travel_time(dist, False, None, parms) # Link time. Assumes possible. # Cooling time for this pair of choices: tc = tc0 + min(tjmp, tlnk) + tc1 if tc < tcmin: tcmin = tc return tcmin def block_times(wr, BS): wr.write('------- BLOCKS CHOICE TIME:\n') for ib in range(len(BS)): bc = BS[ib] nl = block.get_lines(bc) np = block.nchoices(bc) wr.write('--- BLOCK %d: %d LINES\n' % (ib, nl)) for ip in range(np): oph = block.choice(bc, ip) choice_time = path.fabtime(oph) wr.write('CHOICE %d | %.3f\n' % (ip, choice_time)) return def nozzle_travel_time(dpq, jmp, dpm, parms): acc = parms['acceleration'] # Acceleration/deceleration at ends (mm/s^2). if jmp: vel_cru = parms['job_jump_speed'] # Cruise speed(mm/s). tud = parms['extrusion_on_off_time'] # Nozzle up/down time (s). else: # ??? Speed should be an attribute of the move.??? vel_cru = parms['job_filling_speed'] # Cruise speed(mm/s). tud = 0 # Figure out acceleration, cruise, and deceleration times and distances: acc_time = 0 if acc == inf else vel_cru/acc # Time accelerating or decelerating to{vel}. if acc != inf and vel_cru*acc_time >= dpq: # Not enough space to reach cruise speed: acc_time = sqrt(dpq/acc) acc_dist = dpq/2 vel_max = acc*acc_time cru_dist = 0 cru_time = 0 else: # Enough distance to reach cruise speed: acc_dist = 0 if acc == inf else vel_cru*acc_time/2 # Distance while accelerating or decelerating. vel_max = vel_cru cru_dist = dpq - 2*acc_dist # Cruise distance. cru_time = cru_dist/vel_cru # if dpm != None and dpm <= 0: # sys.stderr.write("acc_dist = %8.2f cru_dist = %8.2f\n" % (acc_dist, cru_dist)) # sys.stderr.write("acc_time = %8.2f cru_time = %8.2f\n" % (acc_time, cru_time)) # sys.stderr.write("vel_max = %8.2f\n" % vel_max) # Compute the passage time {ttot}: ttot = 0 if dpm == None or dpm >= dpq: # Acceleration, cruise, deceleration ttot += 2 * acc_time + cru_time elif dpm > acc_dist + cru_dist: # Passage time is during deceleration: ttot += acc_time + cru_time dcm = dpm - (acc_dist + cru_dist) # Distance after start of decel. delta = vel_max*vel_max - 2*dcm*acc # sys.stderr.write("%8.2f %8.2f %8.2f %8.2f\n" % (dcm, vel_max, acc, delta)) tcm = (vel_max - sqrt(delta))/acc # Extra time to passage. ttot += tcm elif dpm >= acc_dist: # Passage time is during cruising phase: ttot += acc_time dam = dpm - acc_dist # Distance after acceleration. tam = dam/vel_cru # Extra time to passage. ttot += tam elif dpm >= 0: # Passage during acceleration: tpm = sqrt(2*dpm/acc) # Extra time to passage. ttot += tpm else: # Passage at very start: pass return ttot + 2*tud def print_contact_table(wr, BS, CS, MS): nbc = len(BS) nct = len(CS) nmv = (0 if MS == None else len(MS)) def fmt1(i, c, n): fmt = c + "%0" + ("2" if n < 100 else "3")+ "d" return fmt % i def fmt2(i, j, c, n): fmt = c + "%0" + ("2" if n < 100 else "3")+ "d:%d" return fmt % (i,j) # Table header: ctsp = " "*len(fmt2(0,0, "C", nct)) mvsp = " "*len(fmt1(0, "M", nmv)) bcds = "-"*len(fmt2(0,0, "B", nbc)) wr.write("%s %s " % (ctsp,mvsp)) for ib in range(nbc): bci = BS[ib] npi = block.nchoices(bci) for jpi in range(npi): wr.write(" " + (fmt2(ib,jpi, "B", nbc))) wr.write("\n") for i in range(nct): cti = CS[i] for j in range(2): ctID = fmt2(i, j, "C", nct) mvij = contact.side(cti,j) if MS == None: mvID = mvsp else: r = MS.index(mvij) mvID = fmt1(r, "M", nmv) wr.write("%s %s " % (ctID,mvID)) for k in range(nbc): bck = BS[k] npk = block.nchoices(bck) for l in range(npk): ophkl = block.choice(bck, l) t = path.find(ophkl, mvij) if t == None: wr.write(" " + bcds) else: wr.write(" %*d" % (len(bcds), t)) wr.write("\n") wr.write("\n") return # ---------------------------------------------------------------------- ########## def describe_solution(wr, Qbest, CS, Delta, ncalls, ntime): wr.write("----------------------------------------------------------------------\n") wr.write("Delta: %f\n" % Delta) wr.write("num_calls = %s\n" % str(ncalls)) wr.write("exec_time = %s\n" % str(ntime)) Q_maxtc = contact.max_tcool(Qbest, CS) wr.write("max_tcool = %.3f\n" % Q_maxtc) Q_fabtime = path.fabtime(Qbest) wr.write("fab_time = %.3f\n" % Q_fabtime) wr.write("num_jumps = %d\n" % Qbest.number_jumps) wr.write("jump_time = %.3f\n" % Qbest.jump_time) wr.write("jump_len = %.3f\n" % Qbest.jump_len) wr.write("raster_time = %.3f\n" % Qbest.raster_time) wr.write("raster_len = %.3f\n" % Qbest.raster_len) # Report contact coverage and cooling times: wr.write("coverage times and cooling time of each contact\n") for k in range(len(CS)): ct = CS[k] wr.write("CS[%03d] (" % k) tcs = contact.covtimes(Qbest, ct) for i in range(2): wr.write(" "+ (" . " if tcs[i] == None else ("%7.3f" % tcs[i]))) tcool = contact.tcool(Qbest, ct) wr.write(" ) " + (" . " if tcool == None else ("%7.3f" % tcool))) wr.write("\n") wr.write("----------------------------------------------------------------------\n") return def describe_fail(wr, Delta, ncalls, ntime): wr.write("----------------------------------------------------------------------\n") wr.write('! Qbest is None\n') wr.write("Delta: %f\n" % Delta) wr.write("num_calls = %s\n" % str(ncalls)) wr.write("exec_time = %s\n" % str(ntime)) wr.write("fab_time = None\n") wr.write("max_tcool = None\n") wr.write("num_blocks = None\n") wr.write("num_jumps = None\n") wr.write("jump_time = None\n") wr.write("jump_len = None\n") wr.write("raster_time = None\n") wr.write("raster_len = None\n") return ########## def remove_first_jump(Q): first_move = path.elem(Q, 0) if not move.is_jump(first_move): Qbest = Q else: moves_list = [] for k in range(1, path.nelems(Q)): omvk = path.elem(Q, k) moves_list.append(omvk) Qbest = path.from_moves(moves_list) return Qbest