PIDStepper
is pretty archaic (last substantive change was about fourteen years ago) and, as you've found, is neither documented nor used anywhere publicly. Since we don't test it, either, we should probably cornfield it.
That said, after rummaging around on my hard drive, I've found a few cases where I experimented with it and, amazingly, it still works.
Using a trivial diffusion problem as an example:
import fipy as fp
from fipy.tools import numerix
nx = 50
dx = 1.
L = nx * dx
mesh = fp.Grid1D(nx=nx, dx=dx)
x = mesh.cellCenters[0]
phi = fp.CellVariable(mesh=mesh, name=r"$\phi$", hasOld=True)
phi.value = 0.
phi.setValue(1., where=(x > L/2. - L/10.) & (x < L/2. + L/10.))
viewer = fp.Viewer(vars=phi, datamin=-0.1, datamax=1.1)
D = 1.
eq = fp.TransientTerm() == fp.DiffusionTerm(D * (1 - phi))
total_elapsed = 0.
dt = 100. * dx**2 / (2 * D)
totaltime = 1000.
checkpoints = (fp.numerix.arange(int(totaltime / dt)) + 1) * dt
To solve this with PIDStepper
, you need to define a sweepFn()
to actually solve your equation(s). The vardata
argument is a tuple of tuples, each of which holds a CellVariable
to solve for, the equation to apply, and the old-style boundary conditions to use (all of this radically predates coupled equations or constraints). The dt
argument is the adapted time step to attempt. *args
and **kwargs
are any additional agruments you wish to pass at each step. This function must return an error normalized to one; here we return the L1 norm of var - var.old
normalized to the L1 norm of var.old
.
def L1error(var):
denom = numerix.L1norm(var.old)
return numerix.L1norm(var - var.old) / (denom + (denom == 0))
def sweepFn(vardata, dt, *args, **kwargs):
error = 0.
for var, eqn, bcs in vardata:
res = eqn.sweep(var=var,
dt=dt,
boundaryConditions=bcs)
error = max(error, L1error(var))
return error
You can optionally define a successFn()
to perform after a successful adaptive solution step. The vardata
argument is as above. The dt
argument is the time step that was requested. The dtPrev
argument is the time step that was actually taken. The elapsed
argument is how much time is elapsed for this time step. *args
and **kwargs
are any additional agruments you wish to pass at each step and must be the same as for sweepFn()
. Here, we'll want access to the viewer
, the list of time steps that were taken dts
, the list of elapsed times each was taken at elapseds
, and the global clock total_elapsed
.
def successFn(vardata, dt, dtPrev, elapsed, viewer, dts, elapseds, total_elapsed, *args, **kwargs):
dts.append(dtPrev)
elapseds.append(total_elapsed + elapsed)
viewer.plot()
You could also optionally define a failFn()
to perform whenever sweepFn()
returns an error greater than one. This would take the same arguments as successFn()
.
Finally, instantiate a stepper
with appropriate vardata
and call step()
for each desired time step. Pass the desired time step dt
, the time step to try first dtTry
, the smallest time step to allow dtMin
, the last time step attempted dtPrev
, the sweepFn()
we just defined, the successFn()
we just defined, and the optional arguments that our successFn()
makes use of.
from fipy.steppers import PIDStepper
stepper = PIDStepper(vardata=((phi, eq, ()),))
pid_dts = []
pid_elapseds = []
dtMin = dt / 1000
dtTry = dtMin
dtPrev = dtTry
for until in checkpoints:
dtPrev, dtTry = stepper.step(dt=dt, dtTry=dtTry,
dtMin=dtMin, dtPrev=dtPrev,
sweepFn=sweepFn,
successFn=successFn,
viewer=viewer,
dts=pid_dts,
elapseds=pid_elapseds,
total_elapsed=total_elapsed)
total_elapsed += dt
You could also do a similar thing by defining a PseudoRKQSStepper
:
stepper = PseudoRKQSStepper(vardata=((phi, eq, ()),))
and use the same sweepFn()
and successFn()
.
My current practice
While this works, I found the whole process of defining and passing around helper functions to be kind of opaque and more trouble than it's worth. Until now, I couldn't be bothered to document it or show anybody else how I'd used it.
These days, I do something more like this:
explicit_dts = []
explicit_elapseds = []
dt = dt / 1000
for until in checkpoints:
while total_elapsed < until:
phi.updateOld()
dt_until = (until - total_elapsed)
dt_save = dt
if dt_until < dt:
dt = dt_until
res = eq.sweep(var=phi, dt=dt)
if L1error(phi) < 1.:
total_elapsed += dt
explicit_dts.append(dt)
explicit_elapseds.append(total_elapsed)
dt = dt_save
dt *= 1.2
viewer.plot()
else:
phi.value = phi.old.value
dt /= 2.
This involves about the same amount of code in your script (a little less, actually) and none of the additional 100+ lines of code in PIDStepper
and Stepper
.
None of these have been tuned, and this diffusion problem isn't much of a test as it's unconditionally stable, but once they get going, all exhibit about the same acceleration.
