#! /usr/bin/python3 # Last edited on 2021-03-06 11:40:02 by jstolfi import egg_interp import egg_phase import egg_resample import rn import rmxn import sys import math import re # Reads a set of {nf} data files produced by {analyze_egg_shape.py} which # give curvature {cvt} and tangent angle {ang} as a function of arclength # {arc}. Assumes that they are (or should be) of the same shape. # # Assumes that each data file includes a /closer/, a last point that is the same as the # first point except that the {arc} is increments by the total length # of the curve and {ang} is incremented by {2*pi}. # # Scales each curve so that the total arc length is {2*pi}. # # Converts each curve by choosing either arclength {arc} or tangent angle {ang} as paramter # and either curvature {cvt}, osculating radius {R = 1/cvt}, or the anomaly {ang - arc} # as value. # # Tries to align the curves by shifting so that the apparent maximum # value is at {1.5*pi}. # # Resamples them to the same equally-spaced parameter values. Computes # the average curve. # # Finds the argument values where the average curve crosses an appropriate value. # # Writes a file with {nf+2} columns: the chosen parameter, the average value, # and the values of the {nf} aligned and resampled curves. def main(): # egg_interp.test_interpolate(cubic,kt,A) # egg_phase.test_find_phase() # egg_resample.test_resample() # Check arguments: eggname = sys.argv[1]; sys.stderr.write("egg name = %s\n" % eggname) rsmooth = sys.argv[2]; sys.stderr.write("rsmooth = %s\n" % rsmooth) arg = sys.argv[3]; sys.stderr.write("argument = %s\n" % arg) # Curve parameter. assert arg == "arc" or arg == "ang" val = sys.argv[4]; sys.stderr.write("value = %s\n" % val) # Dependent variable. assert val == "rad" or val == "cvt" or val == "ano" ofname = ("tests/out/%s-sm%s-%s-%s.txt" % (eggname,rsmooth,arg,val)) # Output file name ifnames = sys.argv[5:] # File names # Read and adjust the curves: nf = len(ifnames) sys.stderr.write("reading %d files\n" % nf) assert nf > 0 curves = [] np_min = +math.inf # Min number of data points (including closer). for kf in range(nf): curve = read_curve(ifnames[kf]) curve = normalize_curve(curve) curve = convert_curve(curve, arg, val) curves.append(curve) np_min = min(len(curve), np_min) # Shift the curves to same phase and resample to same number of points ns = min(1001, np_min) # Number of samples including the closer. phase = 0 if val == "ano" else (31/16)*math.pi align_and_resample_curves(curves, ns, phase) # Compute the mean curve: mean = compute_mean(curves) lev = 0.0 if val == "ano" else 1.0 # Interesting value for crossings. report_crossings(mean, lev) # Write the curves: write_curves(ofname, mean, curves) return # ---------------------------------------------------------------------- def read_curve(fname): # Reads file {fname} extracting the columns {arc,ang,cvt} # Returns a list of those triples. sys.stderr.write("reading %s\n" % fname) rd = open(fname, 'r', encoding="ISO-8859-1") lnum = 0 curve = [] for line in rd: lnum += 1 line = re.sub(r'[#].*$', '', line) line = line.strip() fld = re.split(r'[ ]+', line) if len(fld) == 0: continue if len(fld) == 1 and fld[0] == '': continue if len(fld) != 9: sys.stderr.write(" File %s, line %d\n" % (fname,lnum)) sys.stderr.write(" bad data line format\n") sys.stderr.write(" '%s'\n" % line) assert False arc = float(fld[0]) cvt = float(fld[7]) ang = float(fld[8]) curve.append((arc, ang, cvt)) rd.close() np = len(curve) sys.stderr.write("read %d data points\n" % np) return curve # ---------------------------------------------------------------------- def normalize_curve(curve): # Given a curve in the form of a list of triples {(arc,ang,cvt)} as # produced by {read_curve}, returns a new list with that data rescaled so that the total # arclength is {2*pi}. Also implicitly rotates the shape and changes the # arclength starting point so that both arclength and tangent angle # start at 0 and end at {2*pi}. np = len(curve) # Including the closer. arc0 = curve[0][0] arcn = curve[np-1][0] totlen = arcn - arc0 tht0 = curve[0][1] thtn = curve[np-1][1] if abs((thtn - tht0) - 2*math.pi) > 1.0e-7: sys.stderr.write("theta[0] = %.9f theta[%d] = %.9f err = %.9f\n" % (tht0,np-1,thtn,(thtn - tht0) - 2*math.pi)) assert False scale = (2*math.pi)/totlen sys.stderr.write("total length %.9f - rescaling by %.9f\n" % (totlen,scale)) res = [] for arc, ang, cvt in curve: arc_n = (arc - arc0)*scale ang_n = ang - tht0 cvt_n = cvt/scale res.append((arc_n, ang_n, cvt_n)) assert res[0][0] == 0 assert res[0][1] == 0 assert abs(res[np-1][0] - 2*math.pi) < 1.0e-8 assert abs(res[np-1][1] - 2*math.pi) < 1.0e-8 assert abs(res[np-1][2] - res[0][2]) < 1.0e-8 return res # ---------------------------------------------------------------------- def convert_curve(curve, arg, val): # Given a curve in the form of a list of triples {(arc,ang,cvt)} as # produced by {read_curve}, returns a list of pairs {(a,v)} where {a} # is the parameter selected by {arg} ("arc" or "ang") and {v} # is the attribute selected by {val} ("cvt", "rad", or # "ano"). # # If {val} is "ano", also normalizes the curve to have # zero sum. np = len(curve) # Includes the closer. res = [] for arc,ang,cvt in curve: if arg == "arc": a = arc elif arg == "ang": a = ang else: assert False, "invalid argument variable" if val == "cvt": v = cvt elif val == "rad": v = 1/cvt elif val == "ano": v = ang - arc else: assert False, "invalid value variable" res.append((a, v,)) if val == "ano": # Normalize to zero sum: v_sum = 0 # Sum of {v} excluding the closer. for k in range(np-1): v_sum += res[k][1] v_avg = v_sum/(np-1) for k in range(np): a, v = res[k] res[k] = (a, v - v_avg) a0, v0 = res[0]; sys.stderr.write("res[0] = ( %.8f %.8f )\n" % (a0,v0)) an, vn = res[np-1]; sys.stderr.write("res[%d] = ( %.8f %.8f )\n" % (np-1,an,vn)) assert a0 == 0 assert abs(an - 2*math.pi) < 1.0e-8 assert abs(vn - v0) < 1.0e-8 # Just to make sure: vm = (v0 + vn)/2 res[0] = (a0, vm) res[np-1] = (a0 + 2*math.pi, vm) return res # ---------------------------------------------------------------------- def align_and_resample_curves(curves, ns, phase): # Given a list of curves, each a lst of pairs {(a,v)} # shifts each curve so that its apparent phase # is {phase} (in the range {[0_2*pi)}, and resamples it to {ns+1} equally spaced samples # (including a closer). # Shift and resample to common point count: sys.stderr.write("aligning and resampling to %d points\n" % ns) nf = len(curves) for kf in range(nf): curve = curves[kf] phasek = egg_phase.find_phase(curve) sys.stderr.write("curve[%d] phase = %.6f\n" % (kf,phasek)) curves[kf] = egg_resample.resample(curve, ns, phasek+phase) assert len(curves[kf]) == ns return # ---------------------------------------------------------------------- def write_curves(ofname, mean, curves): sys.stderr.write("writing %s\n" % ofname) nf = len(curves) ns = len(curves[0]) wr = open(ofname, 'w') for ks in range(ns): ak = curves[0][ks][0] wr.write("%12.9f" % ak) assert mean[ks][0] == ak mk = mean[ks][1] wr.write("%14.9f" % mk) for kf in range(nf): assert curves[kf][ks][0] == ak vk = curves[kf][ks][1] wr.write(" %14.9f" % vk) wr.write("\n") wr.close() return # ---------------------------------------------------------------------- def compute_mean(curves): mean = [] nf = len(curves) ns = len(curves[0]) for ks in range(ns): ak = curves[0][ks][0] tot = 0 for kf in range(nf): assert curves[kf][ks][0] == ak vk = curves[kf][ks][1] tot += vk mk = tot/nf mean.append((ak, mk,)) return mean # ---------------------------------------------------------------------- def report_crossings(curve, lev): # Finds the argument values where the value of the {curve} # crosses the level {lev}. Assumes that the curve has been # scaled and resampled. ns = len(curve) raw = [] # Argument values of crossings, beefore condensing. # Detect all crossings assuming affine interpolation # between samples: for ks in range(ns-1): a0, v0 = curve[ks] a1, v1 = curve[ks+1] if (v0 - lev)*(v1 - lev) <= 0: # Crossing detected: rc = (lev - v0)/(v1 - v0) if v1 != v0 else 0.5 assert 0 <= rc and rc <= 1 ac = (1-rc)*a0 + rc*a1 raw.append(ac) min_sep = math.pi/6 merged = merge_crossings(raw, min_sep) if len(merged) > 0: # Report crossings: sys.stderr.write("crossings of level %.9f:\n" % lev) for a_ini, a_fin in merged: sys.stderr.write(" %6.3f" % ((a_ini + a_fin)/2)) if a_ini != a_fin: sys.stderr.write(" [ %6.3f _ %6.3f ]" % (a_ini, a_fin)) sys.stderr.write("\n") return # ---------------------------------------------------------------------- def merge_crossings(raw, min_sep): # Assumes that {raw} is a list of argument values. Breaks the list # into clusters at gaps of size {min_sep} or more. Returns a list of # pairs {(a_ini,a_fin)}, one per cluster with the first and last # arguments of the cluster. # # Assumes that the last cluster is well-separated from the first one, # even if their distance in the flat {2*pi} circle is less than {min_sep}. merged = [] # List of argument pairs {(a_ini,a_fin)}. if len(raw) > 0: a_ini = raw[0]; a_fin = a_ini for ak in raw: if ak - a_fin > math.pi/6: # Large gap -- end previous cluster: merged.append((a_ini,a_fin)) # Start new cluster: a_ini = ak; a_fin = a_ini else: # Join with previous cluster: a_fin = ak # End last cluster: merged.append((a_ini,a_fin)) return merged # ---------------------------------------------------------------------- main()