IMP logo
IMP Reference Guide  develop.f15d1720a4,2026/04/01
The Integrative Modeling Platform
samplers.py
1 """@namespace IMP.pmi.samplers
2  Sampling of the system.
3 """
4 
5 import IMP
6 import IMP.core
7 from IMP.pmi.tools import get_restraint_set
8 
9 
10 class _SerialReplicaExchange:
11  """Dummy replica exchange class used in non-MPI builds.
12  It should act similarly to IMP.mpi.ReplicaExchange
13  on a single processor.
14  """
15  def __init__(self):
16  self.__params = {}
17 
18  def get_number_of_replicas(self):
19  return 1
20 
21  def create_temperatures(self, tmin, tmax, nrep):
22  return [tmin]
23 
24  def get_my_index(self):
25  return 0
26 
27  def set_my_parameter(self, key, val):
28  self.__params[key] = val
29 
30  def get_my_parameter(self, key):
31  return self.__params[key]
32 
33  def get_friend_index(self, step):
34  return 0
35 
36  def get_friend_parameter(self, key, findex):
37  return self.get_my_parameter(key)
38 
39  def do_exchange(self, myscore, fscore, findex):
40  return False
41 
42  def set_was_used(self, was_used):
43  self.was_used = was_used
44 
45 
46 class _SamplerBase:
47  def __init__(self, model):
48  self.model = model
49  # that is -1 because mc/md has not yet run
50  self.nframe = -1
51  self.simulated_annealing = False
52 
53  def set_simulated_annealing(self, min_temp, max_temp, min_temp_time,
54  max_temp_time):
55  self.simulated_annealing = True
56  self.tempmin = min_temp
57  self.tempmax = max_temp
58  self.timemin = min_temp_time
59  self.timemax = max_temp_time
60 
61  def temp_simulated_annealing(self):
62  if self.nframe % (self.timemin + self.timemax) < self.timemin:
63  value = 0.0
64  else:
65  value = 1.0
66  temp = self.tempmin + (self.tempmax - self.tempmin) * value
67  return temp
68 
69 
70 class MonteCarlo(_SamplerBase):
71  """Sample using Monte Carlo"""
72 
73  # check that isd is installed
74  try:
75  import IMP.isd
76  isd_available = True
77  except ImportError:
78  isd_available = False
79 
80  def __init__(self, model, objects=None, temp=1.0, filterbyname=None,
81  score_moved=False, use_jax=False):
82  """Setup Monte Carlo sampling
83  @param model The IMP Model
84  @param objects What to sample (a list of Movers)
85  @param temp The MC temperature
86  @param filterbyname Not used
87  @param score_moved If True, attempt to speed up sampling by
88  caching scoring function terms on particles that didn't move
89  @param use_jax If set to True, sample the scoring function using
90  JAX instead of IMP's internal C++ implementation (requires
91  that all PMI restraints used have a JAX implementation).
92  """
93  super().__init__(model)
94  self.losp = [
95  "Rigid_Bodies",
96  "Floppy_Bodies",
97  "Nuisances",
98  "X_coord",
99  "Weights"
100  "Surfaces"]
101  self.selfadaptive = False
102  self.temp = temp
103  self.mvs = []
104  self.mvslabels = []
105  self.label = "None"
106  self.movers_data = {}
107  self.use_jax = use_jax
108  self._jax_optimizer = None
109  self._jax_state = None
110 
111  self.mvs = objects
112 
113  # SerialMover
114  self.smv = IMP.core.SerialMover(self.mvs)
115 
116  self.mc = IMP.core.MonteCarlo(self.model)
117  self.mc.set_scoring_function(get_restraint_set(self.model))
118  self.mc.set_return_best(False)
119  self.mc.set_score_moved(score_moved)
120  self.mc.set_kt(self.temp)
121  self.mc.add_mover(self.smv)
122 
123  def set_kt(self, temp):
124  self.temp = temp
125  self.mc.set_kt(temp)
126  if self._jax_state is not None:
127  self._jax_state.temperature = temp
128 
129  def get_mc(self):
130  return self.mc
131 
132  def set_scoring_function(self, objectlist):
133  rs = IMP.RestraintSet(self.model, 1.0, 'sfo')
134  for ob in objectlist:
135  rs.add_restraint(ob.get_restraint())
137  self.mc.set_scoring_function(sf)
138 
139  def set_self_adaptive(self, isselfadaptive=True):
140  self.selfadaptive = isselfadaptive
141 
143  '''
144  Return a dictionary with the mover parameters for nuisance parameters
145  '''
146  output = {}
147  for i in range(self.get_number_of_movers()):
148  mv = self.smv.get_mover(i)
149  name = mv.get_name()
150  if "Nuisances" in name:
151  stepsize = IMP.core.NormalMover.get_from(mv).get_sigma()
152  output[name] = stepsize
153  return output
154 
155  def get_number_of_movers(self):
156  return len(self.smv.get_movers())
157 
158  def get_particle_types(self):
159  return self.losp
160 
161  def optimize(self, nstep):
162  self.nframe += 1
163  if self.use_jax:
164  if self._jax_optimizer is None:
165  self._jax_optimizer = self.mc._get_jax_optimizer(
166  nstep * self.get_number_of_movers())
167  self._jax_state = self._jax_optimizer.get_initial_state()
168  score, self._jax_state = self._jax_optimizer.optimize(
169  self._jax_state)
170  else:
171  score = self.mc.optimize(nstep * self.get_number_of_movers())
172 
173  # apply simulated annealing protocol
174  if self.simulated_annealing:
175  self.set_kt(self.temp_simulated_annealing())
176 
177  # apply self adaptive protocol
178  if self.selfadaptive:
179  if self.use_jax:
180  raise NotImplementedError(
181  "Adaptive protocol is not yet implemented for JAX")
182  for i, mv in enumerate(self.mvs):
183 
184  mvacc = mv.get_number_of_accepted()
185  mvprp = mv.get_number_of_proposed()
186  if mv not in self.movers_data:
187  accept = float(mvacc) / float(mvprp)
188  self.movers_data[mv] = (mvacc, mvprp)
189  else:
190  oldmvacc, oldmvprp = self.movers_data[mv]
191  accept = float(mvacc-oldmvacc) / float(mvprp-oldmvprp)
192  self.movers_data[mv] = (mvacc, mvprp)
193  if accept < 0.05:
194  accept = 0.05
195  if accept > 1.0:
196  accept = 1.0
197 
198  if isinstance(mv, IMP.core.NormalMover):
199  stepsize = mv.get_sigma()
200  if 0.4 > accept or accept > 0.6:
201  mv.set_sigma(stepsize * 2 * accept)
202 
203  if isinstance(mv, IMP.isd.WeightMover):
204  stepsize = mv.get_radius()
205  if 0.4 > accept or accept > 0.6:
206  mv.set_radius(stepsize * 2 * accept)
207 
208  if isinstance(mv, IMP.core.RigidBodyMover):
209  mr = mv.get_maximum_rotation()
210  mt = mv.get_maximum_translation()
211  if 0.4 > accept or accept > 0.6:
212  mv.set_maximum_rotation(mr * 2 * accept)
213  mv.set_maximum_translation(mt * 2 * accept)
214 
215  if isinstance(mv, IMP.pmi.TransformMover):
216  mr = mv.get_maximum_rotation()
217  mt = mv.get_maximum_translation()
218  if 0.4 > accept or accept > 0.6:
219  mv.set_maximum_rotation(mr * 2 * accept)
220  mv.set_maximum_translation(mt * 2 * accept)
221 
222  if isinstance(mv, IMP.core.BallMover):
223  mr = mv.get_radius()
224  if 0.4 > accept or accept > 0.6:
225  mv.set_radius(mr * 2 * accept)
226  return score
227 
228  def get_nuisance_movers(self, nuisances, maxstep):
229  mvs = []
230  for nuisance in nuisances:
231  print(nuisance, maxstep)
232  mvs.append(
233  IMP.core.NormalMover([nuisance],
234  IMP.FloatKeys([IMP.FloatKey("nuisance")]),
235  maxstep))
236  return mvs
237 
238  def get_rigid_body_movers(self, rbs, maxtrans, maxrot):
239  mvs = []
240  for rb in rbs:
241  mvs.append(IMP.core.RigidBodyMover(rb.get_model(), rb,
242  maxtrans, maxrot))
243  return mvs
244 
245  def get_super_rigid_body_movers(self, rbs, maxtrans, maxrot):
246  mvs = []
247  for rb in rbs:
248  if len(rb) == 2:
249  # normal Super Rigid Body
250  srbm = IMP.pmi.TransformMover(self.model, maxtrans, maxrot)
251  elif len(rb) == 3:
252  if isinstance(rb[2], tuple) and len(rb[2]) == 3 \
253  and isinstance(rb[2][0], float) \
254  and isinstance(rb[2][1], float) \
255  and isinstance(rb[2][2], float):
256  # super rigid body with 2D rotation, rb[2] is the axis
257  srbm = IMP.pmi.TransformMover(
258  self.model, IMP.algebra.Vector3D(rb[2]), maxtrans,
259  maxrot)
260  else:
261  print(
262  "Setting up a super rigid body with wrong parameters")
263  raise
264 
265  for xyz in rb[0]:
266  srbm.add_xyz_particle(xyz)
267  for rb in rb[1]:
268  srbm.add_rigid_body_particle(rb)
269  mvs.append(srbm)
270  return mvs
271 
272  def get_floppy_body_movers(self, fbs, maxtrans):
273  mvs = []
274  for fb in fbs:
275  # check is that is a rigid body member:
277  # if so force the particles to move anyway
278  floatkeys = \
280  for fk in floatkeys:
281  fb.set_is_optimized(fk, True)
282  mvs.append(
283  IMP.core.BallMover(fb.get_model(), fb,
284  IMP.FloatKeys(floatkeys),
285  maxtrans))
286  else:
287  # otherwise use the normal ball mover
288  mvs.append(IMP.core.BallMover(fb.get_model(), fb, maxtrans))
289  return mvs
290 
291  def get_X_movers(self, fbs, maxtrans):
292  mvs = []
293  Xfloatkey = IMP.core.XYZ.get_xyz_keys()[0]
294  for fb in fbs:
295  # check is that is a rigid body member:
297  raise ValueError("particle is part of a rigid body")
298  else:
299  # otherwise use the normal ball mover
300  mvs.append(IMP.core.NormalMover([fb], [Xfloatkey], maxtrans))
301  return mvs
302 
303  def get_weight_movers(self, weights, maxstep):
304  mvs = []
305  for weight in weights:
306  if weight.get_number_of_weights() > 1:
307  mvs.append(IMP.isd.WeightMover(weight, maxstep))
308  return mvs
309 
310  def get_surface_movers(self, surfaces, maxtrans, maxrot, refprob):
311  mvs = []
312  for surface in surfaces:
313  mvs.append(IMP.core.SurfaceMover(surface, maxtrans, maxrot,
314  refprob))
315  return mvs
316 
317  def set_label(self, label):
318  self.label = label
319 
320  def get_frame_number(self):
321  return self.nframe
322 
323  def get_output(self):
324  output = {}
325  for i, mv in enumerate(self.smv.get_movers()):
326  mvname = mv.get_name()
327  mvacc = mv.get_number_of_accepted()
328  mvprp = mv.get_number_of_proposed()
329  try:
330  mvacr = float(mvacc) / float(mvprp)
331  except: # noqa: E722
332  mvacr = 0.0
333  output["MonteCarlo_Acceptance_" +
334  mvname + "_" + str(i)] = str(mvacr)
335  if "Nuisances" in mvname:
336  output["MonteCarlo_StepSize_" + mvname + "_" + str(i)] = \
337  str(IMP.core.NormalMover.get_from(mv).get_sigma())
338  if "Weights" in mvname:
339  output["MonteCarlo_StepSize_" + mvname + "_" + str(i)] = \
340  str(IMP.isd.WeightMover.get_from(mv).get_radius())
341  output["MonteCarlo_Temperature"] = str(self.mc.get_kt())
342  output["MonteCarlo_Nframe"] = str(self.nframe)
343  return output
344 
345 
346 class MolecularDynamics(_SamplerBase):
347  """Sample using molecular dynamics"""
348 
349  def __init__(self, model, objects, kt, gamma=0.01, maximum_time_step=1.0,
350  sf=None, use_jax=False):
351  """Setup MD
352  @param model The IMP Model
353  @param objects What to sample. Use flat list of particles
354  @param kt Temperature
355  @param gamma Viscosity parameter
356  @param maximum_time_step MD max time step
357  @param use_jax If set to True, sample the scoring function using
358  JAX instead of IMP's internal C++ implementation (requires
359  that all PMI restraints used have a JAX implementation).
360  """
361  super().__init__(model)
362  if use_jax:
363  raise NotImplementedError("JAX currently only supported for MC")
364 
365  # check if using PMI1 objects dictionary, or just list of particles
366  try:
367  for obj in objects:
368  psamp = obj.get_particles_to_sample()
369  to_sample = psamp['Floppy_Bodies_SimplifiedModel'][0]
370  except: # noqa: E722
371  to_sample = objects
372 
374  self.model, to_sample, kt/0.0019872041, gamma)
375  self.md = IMP.atom.MolecularDynamics(self.model)
376  self.md.set_maximum_time_step(maximum_time_step)
377  if sf:
378  self.md.set_scoring_function(sf)
379  else:
380  self.md.set_scoring_function(get_restraint_set(self.model))
381  self.md.add_optimizer_state(self.ltstate)
382 
383  def set_kt(self, kt):
384  temp = kt/0.0019872041
385  self.ltstate.set_temperature(temp)
386  self.md.assign_velocities(temp)
387 
388  def set_gamma(self, gamma):
389  self.ltstate.set_gamma(gamma)
390 
391  def optimize(self, nsteps):
392  # apply simulated annealing protocol
393  self.nframe += 1
394  if self.simulated_annealing:
395  self.set_kt(self.temp_simulated_annealing())
396  return self.md.optimize(nsteps)
397 
398  def get_output(self):
399  output = {}
400  output["MolecularDynamics_KineticEnergy"] = \
401  str(self.md.get_kinetic_energy())
402  return output
403 
404 
406  """Sample using conjugate gradients"""
407 
408  def __init__(self, model, objects):
409  self.model = model
410  self.nframe = -1
411  self.cg = IMP.core.ConjugateGradients(self.model)
412  self.cg.set_scoring_function(get_restraint_set(self.model))
413 
414  def set_label(self, label):
415  self.label = label
416 
417  def get_frame_number(self):
418  return self.nframe
419 
420  def optimize(self, nstep):
421  self.nframe += 1
422  self.cg.optimize(nstep)
423 
424  def set_scoring_function(self, objectlist):
425  rs = IMP.RestraintSet(self.model, 1.0, 'sfo')
426  for ob in objectlist:
427  rs.add_restraint(ob.get_restraint())
429  self.cg.set_scoring_function(sf)
430 
431  def get_output(self):
432  output = {}
433  output["ConjugatedGradients_Nframe"] = str(self.nframe)
434  return output
435 
436 
438  """Sample using replica exchange"""
439 
440  def __init__(self, model, tempmin, tempmax, samplerobjects, test=True,
441  replica_exchange_object=None):
442  '''
443  samplerobjects can be a list of MonteCarlo or MolecularDynamics
444  '''
445 
446  self.model = model
447  self.samplerobjects = samplerobjects
448  # min and max temperature
449  self.TEMPMIN_ = tempmin
450  self.TEMPMAX_ = tempmax
451 
452  if replica_exchange_object is None:
453  # initialize Replica Exchange class
454  try:
455  import IMP.mpi
456  print('ReplicaExchange: MPI was found. '
457  'Using Parallel Replica Exchange')
458  self.rem = IMP.mpi.ReplicaExchange()
459  except ImportError:
460  print('ReplicaExchange: Could not find MPI. '
461  'Using Serial Replica Exchange')
462  self.rem = _SerialReplicaExchange()
463 
464  else:
465  # get the replica exchange class instance from elsewhere
466  print('got existing rex object')
467  self.rem = replica_exchange_object
468 
469  # get number of replicas
470  nproc = self.rem.get_number_of_replicas()
471 
472  if nproc % 2 != 0 and not test:
473  raise Exception(
474  "number of replicas has to be even. "
475  "set test=True to run with odd number of replicas.")
476  # create array of temperatures, in geometric progression
477  temp = self.rem.create_temperatures(
478  self.TEMPMIN_,
479  self.TEMPMAX_,
480  nproc)
481  # get replica index
482  self.temperatures = temp
483 
484  myindex = self.rem.get_my_index()
485  # set initial value of the parameter (temperature) to exchange
486  self.rem.set_my_parameter("temp", [self.temperatures[myindex]])
487  for so in self.samplerobjects:
488  so.set_kt(self.temperatures[myindex])
489  self.nattempts = 0
490  self.nmintemp = 0
491  self.nmaxtemp = 0
492  self.nsuccess = 0
493 
494  def get_temperatures(self):
495  return self.temperatures
496 
497  def get_my_temp(self):
498  return self.rem.get_my_parameter("temp")[0]
499 
500  def get_my_index(self):
501  return self.rem.get_my_index()
502 
503  def swap_temp(self, nframe, score=None):
504  if score is None:
505  score = self.model.evaluate(False)
506  # get my replica index and temperature
507  _ = self.rem.get_my_index()
508  mytemp = self.rem.get_my_parameter("temp")[0]
509 
510  if mytemp == self.TEMPMIN_:
511  self.nmintemp += 1
512 
513  if mytemp == self.TEMPMAX_:
514  self.nmaxtemp += 1
515 
516  # score divided by kbt
517  myscore = score / mytemp
518 
519  # get my friend index and temperature
520  findex = self.rem.get_friend_index(nframe)
521  ftemp = self.rem.get_friend_parameter("temp", findex)[0]
522  # score divided by kbt
523  fscore = score / ftemp
524 
525  # try exchange
526  flag = self.rem.do_exchange(myscore, fscore, findex)
527 
528  self.nattempts += 1
529  # if accepted, change temperature
530  if (flag):
531  for so in self.samplerobjects:
532  so.set_kt(ftemp)
533  self.nsuccess += 1
534 
535  def get_output(self):
536  output = {}
537  if self.nattempts != 0:
538  output["ReplicaExchange_SwapSuccessRatio"] = str(
539  float(self.nsuccess) / self.nattempts)
540  output["ReplicaExchange_MinTempFrequency"] = str(
541  float(self.nmintemp) / self.nattempts)
542  output["ReplicaExchange_MaxTempFrequency"] = str(
543  float(self.nmaxtemp) / self.nattempts)
544  else:
545  output["ReplicaExchange_SwapSuccessRatio"] = str(0)
546  output["ReplicaExchange_MinTempFrequency"] = str(0)
547  output["ReplicaExchange_MaxTempFrequency"] = str(0)
548  output["ReplicaExchange_CurrentTemp"] = str(self.get_my_temp())
549  return output
550 
551 
552 class MPI_values:
553  def __init__(self, replica_exchange_object=None):
554  """Query values (ie score, and others)
555  from a set of parallel jobs"""
556 
557  if replica_exchange_object is None:
558  # initialize Replica Exchange class
559  try:
560  import IMP.mpi
561  print('MPI_values: MPI was found. '
562  'Using Parallel Replica Exchange')
563  self.rem = IMP.mpi.ReplicaExchange()
564  except ImportError:
565  print('MPI_values: Could not find MPI. '
566  'Using Serial Replica Exchange')
567  self.rem = _SerialReplicaExchange()
568 
569  else:
570  # get the replica exchange class instance from elsewhere
571  print('got existing rex object')
572  self.rem = replica_exchange_object
573 
574  def set_value(self, name, value):
575  self.rem.set_my_parameter(name, [value])
576 
577  def get_values(self, name):
578  values = []
579  for i in range(self.rem.get_number_of_replicas()):
580  v = self.rem.get_friend_parameter(name, i)[0]
581  values.append(v)
582  return values
583 
584  def get_percentile(self, name):
585  value = self.rem.get_my_parameter(name)[0]
586  values = sorted(self.get_values(name))
587  ind = values.index(value)
588  percentile = float(ind)/len(values)
589  return percentile
def __init__
samplerobjects can be a list of MonteCarlo or MolecularDynamics
Definition: samplers.py:440
A Monte Carlo optimizer.
Definition: MonteCarlo.h:44
A class to implement Hamiltonian Replica Exchange.
Maintains temperature during molecular dynamics.
Sample using molecular dynamics.
Definition: samplers.py:346
def get_nuisance_movers_parameters
Return a dictionary with the mover parameters for nuisance parameters.
Definition: samplers.py:142
Miscellaneous utilities.
Definition: pmi/tools.py:1
Modify the transformation of a rigid body.
Simple conjugate gradients optimizer.
Sample using conjugate gradients.
Definition: samplers.py:405
Create a scoring function on a list of restraints.
Move continuous particle variables by perturbing them within a ball.
Modify a surface orientation.
Definition: SurfaceMover.h:37
static FloatKeys get_internal_coordinate_keys()
Definition: rigid_bodies.h:725
Object used to hold a set of restraints.
Definition: RestraintSet.h:41
Simple molecular dynamics simulator.
Code that uses the MPI parallel library.
A mover that perturbs a Weight particle.
Definition: WeightMover.h:20
Modify the transformation of a rigid body.
def __init__
Setup Monte Carlo sampling.
Definition: samplers.py:81
Modify a set of continuous variables using a normal distribution.
Definition: NormalMover.h:23
Basic functionality that is expected to be used by a wide variety of IMP users.
Sample using Monte Carlo.
Definition: samplers.py:70
The general base class for IMP exceptions.
Definition: exception.h:48
VectorD< 3 > Vector3D
Definition: VectorD.h:408
static const FloatKeys & get_xyz_keys()
Get a vector containing the keys for x,y,z.
Applies a list of movers one at a time.
Definition: SerialMover.h:26
static bool get_is_setup(const IMP::ParticleAdaptor &p)
Definition: rigid_bodies.h:768
Sample using replica exchange.
Definition: samplers.py:437
Inferential scoring building on methods developed as part of the Inferential Structure Determination ...