# Implementation of the module {hotpath_js}. # Last edited on 2021-10-03 14:49:11 by stolfi import hotpath_js import block import block_hp import path import contact import contact_hp import move import move_parms import hacks import rn import sys from math import sqrt, sin, cos, log, exp, floor, ceil, inf, nan, pi def best_path(o, BCS, SMS, Delta, maxcalls, mp_jump): input_data.describe_js(sys.stderr, o, BCS, SMS, Delta, maxcalls, mp_jump) Q, Q_max_rc, BCS_R, SMS_A, SMS_I = initialize_state(BCS, SMS) Qbest, Tbest, ncalls = best_path_aux \ ( Q, BCS_R, SMS_A, Delta, maxcalls, Q_max_rc = 0, lev = 0, ncalls = 0, Pbest = None, mp_jump = mp_jump ) sys.stderr.write("made %d recursive calls to {best_path_aux}\n" % ncalls) if Pbest == None: sys.stderr.write("{best_path} failed to find a valid tool-path\n") else: assert contact.max_rcool(Pbest, SMS) <= 1.0 # Report contact coverage and cooling times: sys.stderr.write("coverage times and cooling time of each contact\n") for k in range(len(SMS)): ct = SMS[k] sys.stderr.write("SMS[%03d] (" % k) tcs = contact.covtimes(Pbest, ct) for i in range(2): sys.stderr.write(" "+ (" . " if tcs[i] == None else ("%7.3f" % tcs[i]))) rcool = contact.rcool(Pbest, ct) sys.stderr.write(" ) " + (" . " if rcool == None else ("%7.3f" % rcool))) sys.stderr.write("\n") return Pbest def best_path_aux \ ( Q, BCS_R, SMS_A, Delta, maxcalls, Q_max_rc, lev, ncalls, Pbest, mp_jump ): # This procedure is like {best_path}, but considers only paths that # begin with the path {Q}, the /current prefix/, which must be the # concatenation of choices of some of the original input blocks. # # A block from the original set {BCS} is /used/ if one of its choices # included in the prefix path {Q}. The parameter {BCS_R} must be the # list of blocks that have not been used yet. # # A seam from the original set {SMS} is is /closed/ if its side # blocks are both used. The seam is /active/ if only one of those # blocks is used, and /inactive/ if neither of those blocks has been # used. The parameter {SMS_A} must be the list of active seams. # # A contact that belongs to a closed seam is /closed/ if both its # sides are traces of {Q}, and is /irrelevant/ otherwise. The parameter # {Q_max_rc} must be {contact.max_rcool(Q,CTSC)} where {CTSC} is the # list of all closed contacts. # # The parameter {lev} should be the level of the recursion, namely how # many of the original blocks have been used. The parameter {ncalls} # should be the number of times this procedure has been entered before # the current call. # # The parameter {Pbest} should be the best valid path found so far, in the # sense of having the smallest total execution time. This argument # shouldbe {None} if no valid path has been found yet. # # The parameters {maxcalls,Delta,mp_jump} are as in {best_path}. # # The procedure returns two results: [0] the best valid path found so # far, or {None} if it could not find any such path; and [1] the given # number {ncalls} plus the count of all recursive calls made by this # call (at least 1). If the procedure can't find a better path than # {Pbest}, it returns {Pbest}. # # The procedure also assumes that the following heuristic speedup # attributes are consistent with the original {BCS} and {SMS} data and # the current prefix path {Q}: # # path_hp.get_owner(oph) for every choice {oph} of every block of {BCS}: # the block {bc} of {BCS} and the index {ich} such that # {oph=block.choice(bc,ich). # # block_hp.get_seams(bc) for very block {bc} of {BCS}: # the seams of {SMS} that have block {bc} as a side. # # block_hp.get_used(bc) for very block {bc} of {BCS}: # The index of the choice of {bc} that was used by {Q}, # {None} if {bc} is still unused. # # block_hp.get_stops(bc) for very block {bc} of {BCS}: # count of inactive directed seams whose side 1 is {bc}. # # contact_hp.get_paths(ct) for every contact {ct} in every seam of {SMS}: # a pair {(L0, L1)} of lists, where {L0} tells which choices of # of blocks of {BCS} include the move that is side 0 of {ct}, # and similarly for {L1}. # We made one more call: ncalls += 1 Q_fabtime = path.fabtime(Q) # Total execution time of {Q} if ncalls < 1000 or (ncalls % 10000) == 1: sys.stderr.write("%s" % ("."*lev)) sys.stderr.write("ncalls = %d Q_max_rc = %.3f Q_fabtime = %.3f\n" % (ncalls,Q_max_rc,Q_fabtime)) # Is {Q} the beginning of a valid path? if excessive_cooling(Q, Q_max_rc, SMS_A, mp_jump, Delta): # No -- Not worth continuing, return what we got so far: return Pbest, ncalls # Coud {Q} be extended to a path better than {Pbest}? rest_mex = min_tot_fabtime(BCS_R) # Minimum time needed to excute the remaing blocks. if Pbest != None and Q_fabtime + rest_mex >= path.fabtime(Pbest): # No -- Not worth continuing, return what we got so far: return Pbest, ncalls # Is {Q} a full path? if len(BCS_R) == 0: # If we got here, {Q} must be valid: assert len(SMS_A) == 0 # With no blocks left, no seams can be active. # If we got here, {Q} it must be better than {Pbest}: assert Pbest == None or Q_fabtime < path.fabtime(Pbest) sys.stderr.write("!! found a better path") sys.stderr.write(" ncalls = %d max_rcool = %.3f fabtime = %.3f\n" % (ncalls,Q_max_rc,Q_fabtime)) Pbest = Q return Pbest, ncalls # Did we do enough work? if Pbest != None and maxcalls != None and ncalls >= maxcalls: return Pbest, ncalls # Find the list {OPHS} of candidates (all the choices from the blocks # of {BCS_R} that could be added to {Q}), sorted in "most promising # first" order: OPHS = candidates(BCS_R, path.pfin(Q), SMS_A) # Try to extend {Q} with each of those choices in turn: for oph in OPHS: # Add {oph} to the current prefix {Q} and update he state accordingly: SMScl, SMSac = affected_seams(oph) Qnew, Qnew_max_rc, BCSRnew, SMSAnew = \ update_state(Q, Q_max_rc, BCS_R, SMS_A, oph, SMScl, SMSac) Pbest, ncalls = best_path_aux \ ( Qnew, BCSRnew, SMSAnew, Delta, maxcalls, Q_max_rc = Qnew_max_rc, lev = lev+1, ncalls = ncalls, Pbest = Pbest, mp_jump = mp_jump ) # Did we do enough work: if Pbest != None and maxcalls != None and ncalls > maxcalls: return Pbest, ncalls # Restore the state to be correct for {Q} and {BCS_R} restore_state(SMScl, SMSac) # We tried all possible extensions of {Q}: return Pbest, ncalls def initialize_state(o, BCS, SMS): # Receives the parameters {o,BCS,SMS} as in {best_path}. # Returns the initial values of the aguments {Q,Q_max_rc,BCS_R,SMS_A,SMS_I} # for {best_path_aux}. # # Also sets the fields of {Block} and {Path} # objects as required by that function. Q = path.make_empty(o) # The initial prefix has no moves but ends at {o}. Q_max_rc = 0 # No contacts are closed by {Q} initially. BCS_R = BCS # All blocks are unused initially; SMS_A = [] # No seams are active initially. # Initialize the heuristic fields of blocks and contacts: for bc in BCS: # No blocks are used initially. block_hp.set_used_choice(bc, None) # Prepare to collect the seams incident on each block {bc}: block_hp.set_seams(bc, ([],[])) # Prepare to count stops by directed seams: block_hp.set_stops(bc, 0) for sm in SMS: # Prepare to collect the choices and moves that cover each contact {ct}: for ct in seam.contacts(sm): contact_hp.set_coverage(ct, ([], [])) for i in range(2): bc = seam.side(sm, i) # Add the seam {sm} to the seams for which {bc} is side {i}: block_hp.get_seams(bc)[i].append(sm) # If the seam is directed, flag side 1 as barred: if seam.is_directed(sm) and i == 1: block_hp.inc_stops(bc, +1) # Collect the choices of {bc} which contain side {i} of the contacts of {sm} for ct in seam.contacts(sm): Li = contact.coverage(ct)[i] mv = contact.side_move(ct, i) for ich in bc.nchoices(bc) oph = bc.choice(bc, ich) imv = path.find_move(oph, mv) if imv != None: Li.append((oph, imv)) # ??? What else ??? return Q, Q_max_rc, BCS_R, SMS_A # ---------------------------------------------------------------------- def affected_seams(oph): # Returns who lists of seams that will be affected # by adding the path {oph} to the current prefix path: # # {SMScl} the seams that will change from active to closed. # {SMSac} the seams that will change from inactive to active. # bc = path_hp.get_owner(oph) # The block of which {oph} is a choice. SMSP = block_hp.get_seams(bc) SMScl = [] SMSac = [] for i in range(2): for sm in SMSP[i]: assert seam.side(sm,i) == bc bc1 = seam.side(sm,1-i) if block_hp.get_used(bc1) != None: # Seam {sm} goes from active to closed: SMScl.append(sm) else: # Seam {sm} goes from inactive to active: SMSac.append(sm) return SMScl, SMSac # ---------------------------------------------------------------------- def update_state(Q, Q_max_rc, BCS_R, SMS_A, oph, SMScl, SMSac): # Assumes that {Q, Q_max_rc, BCS_R, SMS_A} are as described # in{best_path_aux}, and that {oph} is a choice of some # block of {BCS_R}. # # Returns the values {Qnew, Qnew_max_rc, BCSRnew, SMSAnew} of those # variables modified so as to account for the appending of the path # {oph} to the currenct prefix path {Q}. # # Also assumes that {SMScl} is the llst of the seams that will change # from active to closed as a consequence of that appending, and # {SMSac} is the list of those that will change from inactive to # active. # # The procedure also assumes that heuristic-related fields in # {Path},{Block}, etc. are up-to-date for the current prefix path {Q}, # and updates them for the new situation. # Append {oph} to the path {Q}. Let {concat} choose link or jump: use_links = True Qnew = path.concat(Q + [oph,], use_links, mp_jump) # Recompute the max cooling time of contacts closed in {Qnew}: Qnew_max_rc = max(Q_max_rc, closed_max_rcool(Q,oph,Qnew,SMScl)) # Remove from {BSC_R} the block of which {oph} is a choice, and mark it used: bc, ich = path_hp.get_owner(oph) BCSRnew = tuple(bcx for bcx in BCS_R if bcx != bc) assert len(BCSRnew) == len(BCS_R) - 1 block_hp.set_used_choice(bc, ich) # Update the active and inactive contacts to be correct for {Qnew}: SMSAnew = [ sm for sm in SMS_A if not sm in SMScl ] + SMSac # Update the front indicators: update_front(SMScl, SMSac) # Clear the stops that were due to this block: for sm in SMSac: if seam.is_directed(sm) and seam.side(sm, 0) == bc: bc1 = seam.side(sm, 1) block_hp.inc_stops(bc1, -1) return Qnew, Qnew_max_rc, BCSRnew, SMSAnew # ---------------------------------------------------------------------- def update_front(SMScl, SMSac): # Updates the front marks on blocks to account for the changes # in the active seam set. Assumes that {SMScl} and {SMSac} are # the seams that changed to closed and to active, respectively. for sm in SMScl: for i in range(2): bc = seam.side(sm, i) assert block_hp.get_used(bc) != None block_hp.set_front(bc,False) for sm in SMSac: bc0 = seam.side(sm, 0) bc1 = seam.side(sm, 1) if block_hp.get_used(bc0) != None assert block_hp.get_used(bc1) == None block_hp.set_front(bc1,True) else: assert block_hp.get_used(bc0) == None block_hp.set_front(bc0,True) return # ---------------------------------------------------------------------- def excessive_cooling(Q, Q_max_rc, BCS_R, SMS_A, SMS_I, mp_jump, Delta): # The procedure receives the current tentative path {Q}, the maximum # cooling time {Q_max_rc} of all the originl contacts that are closed # by it, the list {BCS_R} of the blocks that have not yet been included # in it, and the list {SMS_A} of active seams. # # The procedure will return {True} only if it determines that ANY # extension of {Q} by the blocks of {BCS_R} will violate the cooling # constraint. If it cannot reach that conclusion, it returns {False}, # meaning "the prefix {Q} may or may not be extensible to a valid full # path". if Q_max_rc > Delta: return True # Estimate the worst cooling time {max_rc_active} for active seams: max_rc_active = 0 for sm in SMS_A: # One side of {sm} must be a used block {bcu}, the other not: bc0 = seam.side(sm, 0) bc1 = seam.side(sm, 1) if block_hp.get_used(bc0) != None; bcu = bc0; bcf = bc1 else: assert not seam.is_directed(sm) bcu = bc1; bcf = bc0 assert block_hp.get_used(bcu) != None assert block_hp.get_used(bcf) == None min_rc = inf # Minimum cooling time for this seam over all choices of the front block: for ct in seam.get_contacts(sm) LP = contact.get_paths(ct) for i in range(2): # Check if side {i} of {ct} has been covered by {Q}: if block_hp.used_choice(contact.get_block(ct, i)) != None: bc contact.get_block(ct, i) tcov = contact.get_covtimes(ct) assert tcov[0] == None or tcov[1] == None # Contact must be active or irrelevant. if tcov[0] != None: # ??? Should ACTIVE contacts only ???" max_rc_active = path.fabtime(Q) - seam.min_tcov(Q, SMS_A) if max_rc_active > 1: return True # ??? Should also estimate the max cooling time among {BCS_R,SMS_I} ??? # Can't see why {Q} can't be extended: return False def block_pini_dist(p, bcp): # Returns the distance from point {p} and the nearest endpoint of the # path that is the block pick {bcp}. bc, ich = block.unpack(bcp) oph = block.choice(bc, ich) dmin = min(rn.dist(p, path.pini(oph)), rn.dist(p, path.pfin(oph))) return dmin def candidates(BCS_R, p, SMS_A): # Receives the list {BCS_R} of blocks that have not been incorporated yet in # {Q}, the final point {p} of {Q}, and the list of still-relevant # seams {SMS_A} (which have at least one side that is not covered by # {Q} and is covered by at least one choice of some block in {BCS_R}) # # Returns the list {OPHS} of candidates for the next # element to be added to {Q}. That is alist of block choices (paths), containing # all possible choices from all blocks of {BCS_R} that could be added next to {Q}, # sorted by priority order. # Mark the front blocks: for bc in BCS_R: block_hp.set_front(bc, False) for sm in SMS_A: for i in range(2): bc = seam.side(sm, i) if block_hp.get_used(bc) != None: block_hp.set_front(bc, True) # Identifies blocks that must be excluded from consideration # because they are side 1 # inactive directed seam. # Identified barred block_hp.set_barred(bc, False) def chdist(oph): d = rn.dist(p, path.pini(oph)) return d # Generates a list {OPHS} containing # all choices of all blocks in {BCS_R} that are not blocked # by inactive directed seams. OPHS = [] for bc in BCS_R if not blocked(bc,SMS_I): nch = block.nchoices(bc) for ich in range(nch): oph = block.choice(bc, ich) OPHS.append(oph) # Identifies the /front/ candidates of {OPHS}, which are # choices of blocks which are sides of active seams. nph = len(OPHS) is_front = [False]*nph # Elem {is_front[k]} is true if cand {OPHS[k]} is front. is_cand = [True]*nph # Elem {is_cand[k]} is true if block {OPHS[k]} can be candidate. for sm in SMS_A: front = is_front(bc, SMS_A) for ct in SMS_R: # Get the block picks of {OPHS} that contain sides of this contact: kc = [[], []] # Elem {kc[i]} is list of indices {k}such that {OPHS[k]} has side {i} of {ct}. for k in range(nph): bcpk = OPHS[k] ophk = block.pick(bcpk) for i in range(2): mvi = contact.side_move(ct,i) if path.find_move(ophk, mvi) != None: kb[i].append(k) if len(kb[0]) != None and kb[1] != None: # Contact is inactive: pass elif kb[0] != None: # Block {BCS[kb[0]]} should be front: block_is_front[kb[0]] = True elif kb[1] != None: # Block {BCS[kb[1]]} should be front: block_is_front[kb[1]] = True else: # Neither side of {ct} is in {BCS_R}: assert False, "contact is not relevant" # # for ct in SMS_R: # Get the blocks of {BCS_R} that contain sides of this contact: kb = [None, None] # Indices of blocks that contain side 0 and side 1. for i in range(2): mvi = contact.side_move(ct,i) kb[i] = block.find_with_move(BCS_R, mvi) assert kb[i] == None or (kb[i] >= 0 and kb[i] < nb) # if kb[0] != None and kb[1] != None: # Contact is inactive: pass if contact.is_directed(ct???): # Block {kb[1]} can't be added next because {kb[0]} must be added first. is_cand[kb[1]] = False elif kb[0] != None: # Block {BCS[kb[0]]} should be front: is_front[kb[0]] = True elif kb[1] != None: # Block {BCS[kb[1]]} should be front: is_front[kb[1]] = True else: # Neither side of {ct} is in {BCS_R}: assert False, "contact is not relevant" # Now collect the two lists of block picks: BCPF = []; BCPE = [] for k in range(nb): if is_cand[k]: bc = BCS_R[k] nch = block.nchoices(bc) for ich in range(nch): bcp = (bc, ich) if is_front[k]: BCPF.append(bcp) else: BCPE.append(bcp) # Now sort the lists and concatenate: BCPF.sort(key=chdist) BCPE.sort(key=chdist) OPHS = BCPF + BCPE return tuple(OPHS)