IMP  2.4.0
The Integrative Modeling Platform
demux_trajs.py
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 import sys
5 import os
6 import re
7 import shutil
8 
9 
10 class LogStep:
11 
12  def __init__(self, stepno, statline, header):
13  self.stepno = stepno
14  self.stats = statline
15  self.header = header
16  self.dumps = {}
17  self.trajs = {}
18 
19  def add(self, ftype, category, *data):
20  if ftype == 'dump':
21  self.dumps[category] = data[0]
22  elif ftype == 'traj':
23  self.trajs[category] = {'ftype': data[0],
24  'fullpath': data[1],
25  'stepno': data[2],
26  'tail': data[3]}
27  else:
28  raise ValueError("unknown file type")
29 
30  def get_stats_header(self):
31  return self.header
32 
33  def get_stats(self):
34  return self.stats
35 
36  def get_dumps(self):
37  return self.dumps
38 
39  def get_trajs(self):
40  return self.trajs
41 
42 
43 class LogHolder:
44 
45  """Manages information on a given simulation.
46  Assumes the existence of a _stats.txt file, and handles more files if
47  available.
48  folder : the folder which contains _stats.txt
49  prefix : the stats file is supposed to be prefix+'_stats.txt'
50  """
51 
52  def __init__(self, folder, prefix):
53  self.folder = folder
54  self.prefix = prefix
55  # verify that stats file exists
56  self.stats_file = os.path.join(folder, prefix + 'stats.txt')
57  if not os.path.isfile(self.stats_file):
58  raise ValueError('cannot find stats file %s' % self.stats_file)
59  # scan for other files
60  files = {}
61  for fl in os.listdir(folder):
62  # get variable part of file, exclude stats file
63  match = re.match(prefix + r'(.*)', fl)
64  if match is None:
65  continue
66  tail = match.group(1)
67  if tail == 'stats.txt':
68  continue
69  # get file category and add to files dict
70  category = tail.split('_')[0]
71  if not category in files:
72  files[category] = []
73  files[category].append(tail)
74  # see if there are multiple files in the same category, and store them
75  self.dumpfiles = {}
76  self.trajfiles = {}
77  for cat, fnames in files.items():
78  if len(fnames) > 1 \
79  or os.path.splitext(fnames[0].split('_')[-1])[0].isdigit():
80  # there are multiple files, no need to understand their content
81  if not cat in self.dumpfiles:
82  self.dumpfiles[cat] = []
83  for fname in fnames:
84  # parse tail and find index number
85  indexno = int(os.path.splitext(fname.split('_')[1])[0])
86  self.dumpfiles[cat].append((indexno, fname))
87  self.dumpfiles[cat] = dict(self.dumpfiles[cat])
88  # make sure there are no duplicate index numbers
89  if len(self.dumpfiles[cat]) != \
90  len(set(self.dumpfiles[cat].keys())):
91  raise ValueError("found duplicates in %s %s %s"
92  % (folder, prefix, fname))
93  else:
94  # this is a trajectory, need to be able to parse it
95  fname = fnames[0]
96  ext = os.path.splitext(fname)[1]
97  if ext.startswith('.rmf'):
98  self.trajfiles[cat] = (ext[1:], fname)
99  else:
100  raise ValueError("Unknown extension: %s in file %s"
101  % (ext, fname))
102 
103  def get_stats_header(self):
104  if not hasattr(self, 'stats_handle'):
105  self.stats_handle = open(self.stats_file)
106  # read the file and guess the number of lines
107  # for now, be compatible with only one line
108  self.stats_handle.readline()
109  self.stats_first_line = self.stats_handle.readline()
110  if self.stats_first_line.startswith('#'):
111  raise ValueError('stats file must be 1-line only')
112  self.stats_handle = open(self.stats_file)
113  self.stats_header = self.stats_handle.readline()
114  return self.stats_header
115 
116  def get_first_stats_line(self):
117  # make sure file is open, skip header
118  self.get_stats_header()
119  return self.stats_first_line
120 
121  def _get_next_stats(self):
122  # make sure file is open, skip header
123  self.get_stats_header()
124  for line in self.stats_handle:
125  yield line
126 
127  def items(self):
128  """iterate over all time steps"""
129  # open stats file, store header and loop over stats file
130  for stat in self._get_next_stats():
131  # extract step number and create LogStep
132  stepno = int(stat.split()[1])
133  step = LogStep(stepno, stat, self.get_stats_header())
134  # get other files' entries at this step if available
135  for cat, df in self.dumpfiles.items():
136  if stepno in df:
137  fullpath = os.path.join(self.folder,
138  self.prefix + df[stepno])
139  step.add('dump', cat, fullpath)
140  for cat, tf in self.trajfiles.items():
141  fullpath = os.path.join(self.folder,
142  self.prefix + tf[1])
143  step.add('traj', cat, tf[0], fullpath, stepno, tf[1])
144  # yield a LogStep containing these entries
145  yield step
146 
147 
148 class Demuxer:
149 
150  """uses column to demux a replica trajectory. Assumes column points to a
151  float or integer type, which is allowed to change over time. Attribution is
152  based on order of float params. State 0 will be lowest param etc. Use
153  reverse=True to start with highest.
154  """
155 
156  def __init__(self, logs, outfolder, column, reverse=False):
157  self.logs = logs
158  self.reverse = reverse
159  self.column = column
160  self.outfolder = outfolder
161  self.stat_handles = {}
162  self.traj_handles_in = {}
163  self.traj_handles_out = {}
164  self.folders = {}
165  # create needed folders
166  if not os.path.isdir(outfolder):
167  os.mkdir(outfolder)
168  for l in range(len(self.logs)):
169  fname = os.path.join(outfolder, 'p%d' % l)
170  if not os.path.isdir(fname):
171  os.mkdir(fname)
172  self.folders[l] = fname
173  # make sure every log has the same header
174  h0 = self.logs[0].get_stats_header()
175  for log in self.logs[1:]:
176  if h0 != log.get_stats_header():
177  raise ValueError("headers must be identical!")
178  # get column number from header
179  tokens = [idx for idx, i in enumerate(h0.split()) if self.column in i]
180  if len(tokens) == 0:
181  raise ValueError("column %d not found in this header\n%s"
182  % (column, h0))
183  elif len(tokens) > 1:
184  raise ValueError("column %d found multiple times!\n%s"
185  % (column, h0))
186  self.colno = tokens[0]
187 
188  def get_param(self, statline):
189  return float(statline.split()[self.colno])
190 
191  def _write_step_stats(self, stateno, lstep):
192  # check if stats file is open
193  if stateno not in self.stat_handles:
194  self.stat_handles[stateno] = open(
195  os.path.join(self.folders[stateno],
196  str(stateno) + '_stats.txt'), 'w')
197  self.stat_handles[stateno].write(lstep.get_stats_header())
198  # write stats
199  self.stat_handles[stateno].write(lstep.get_stats())
200 
201  def _write_step_dump(self, stateno, lstep):
202  for cat, fname in lstep.get_dumps().items():
203  shutil.copyfile(fname, os.path.join(self.folders[stateno],
204  str(stateno) + '_' + cat + fname.split(cat)[1]))
205 
206  def _write_traj_rmf(self, infile, instep, outfile, stateno, cat):
207  import RMF
208  # make sure infile is open
209  if infile not in self.traj_handles_in:
210  src = RMF.open_rmf_file_read_only(infile)
211  self.traj_handles_in[infile] = src
212  src = self.traj_handles_in[infile]
213  # make sure outfile is open
214  if outfile not in self.traj_handles_out:
215  dest = RMF.create_rmf_file(outfile)
216  self.traj_handles_out[outfile] = dest
217  RMF.clone_file_info(src, dest)
218  RMF.clone_hierarchy(src, dest)
219  RMF.clone_static_frame(src, dest)
220  dest = self.traj_handles_out[outfile]
221  # clone frame
222  frameid = src.get_frames()[instep - 1]
223  src.set_current_frame(frameid)
224  dest.add_frame(src.get_name(frameid), src.get_type(frameid))
225  RMF.clone_loaded_frame(src, dest)
226 
227  def _write_step_traj(self, stateno, lstep):
228  # loop over categories
229  for cat, data in lstep.get_trajs().items():
230  destfile = os.path.join(self.outfolder, 'p' + str(stateno),
231  str(stateno) + '_' + data['tail'])
232  if data['ftype'].startswith('rmf'):
233  self._write_traj_rmf(data['fullpath'], data['stepno'],
234  destfile, stateno, cat)
235  else:
236  raise ValueError("unknown trajectory file type")
237 
238  def _write_step(self, stateno, lstep):
239  self._write_step_stats(stateno, lstep)
240  self._write_step_dump(stateno, lstep)
241  self._write_step_traj(stateno, lstep)
242 
243  def write(self):
244  # loop over time steps
245  log_iterators = [list(l.items()) for l in self.logs]
246  print("Demuxing", len(log_iterators), "replicas")
247  for idx, steps in enumerate(zip(*log_iterators)):
248  if idx % 10 == 0 and idx > 0:
249  print("step", idx, '\r', end=' ')
250  sys.stdout.flush()
251  # assign state numbers to these logs
252  params = [(self.get_param(i.get_stats()), i) for i in steps]
253  params.sort(reverse=self.reverse)
254  # write them
255  for i in range(len(params)):
256  self._write_step(i, params[i][1])
257  print("Done")
258 
259 
260 def get_prefix(folder):
261  rval = [re.match(r'(.*_)stats.txt', f) for f in os.listdir(folder)]
262  rval = [i for i in rval if i]
263  if len(rval) != 1:
264  raise ValueError("stats file not unique, found %d" % len(rval))
265  return rval[0].group(1)
266 
267 if __name__ == '__main__':
268  if len(sys.argv) == 1 or len(sys.argv) > 4:
269  sys.exit("""demux_trajs.py column [infolder [outfolder]]
270  expects r?? folders in infolder and will write p?? folders in outfolder.
271  infolder must contain a _stats.txt file which will contain a header.
272  column must be a substring matching to one of the columns in the
273  _stats.txt files. It will typically be a temperature, or a state number.
274  That column will be used for demuxing. Folders are optional and will be
275  taken as ./ if not indicated.
276  """)
277  column = sys.argv[1]
278  if len(sys.argv) == 3:
279  infolder = sys.argv[2]
280  outfolder = './'
281  elif len(sys.argv) == 4:
282  infolder = sys.argv[2]
283  outfolder = sys.argv[3]
284  else:
285  infolder = './'
286  outfolder = './'
287  # loop over infolder and read stats files
288  folders = [os.path.join(infolder, f)
289  for f in os.listdir(infolder) if re.match(r'r\d+', f)]
290  replica_logs = [LogHolder(f, prefix)
291  for f, prefix in zip(folders, map(get_prefix, folders))]
292  demux = Demuxer(replica_logs, outfolder, column, reverse=True)
293  demux.write()
def items
iterate over all time steps
Definition: demux_trajs.py:127
Manages information on a given simulation.
Definition: demux_trajs.py:43
uses column to demux a replica trajectory.
Definition: demux_trajs.py:148