1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Simulation cases.
21 """
22
23 from .gendata import SingleAssignDict, AttributeDict
26 """
27 Arrangement registry class. A simulation is a callable that returns a case
28 object.
29 """
33 arrangements = ArrangementRegistry()
51 """
52 Generic case information abstract class. It's the base class that all case
53 information classes should subclass, to form hierarchical information
54 object.
55 """
56 __metaclass__ = CaseInfoMeta
57 defdict = {}
59 """
60 Consult self dictionary for attribute. It's a shorthand.
61 """
62 if name == '__setstate__':
63 raise AttributeError
64 return self[name]
66 """
67 Save to self dictionary first, then self object table.
68
69 @note: This method is overriden as a stupid-preventer. It makes
70 attribute setting consistent with attribute getting.
71 """
72 if name in self:
73 self[name] = value
74 else:
75 super(CaseInfo, self).__setattr__(name, value)
77 """
78 Set to self with the dot-separated key.
79 """
80 tokens = key.split('.', 1)
81 fkey = tokens[0]
82 if len(tokens) == 2:
83 self[fkey]._set_through(tokens[1], val)
84 else:
85 self[fkey] = val
86 - def __init__(self, _defdict=None, *args, **kw):
87 """
88 Assign default values to self after initiated.
89
90 @keyword _defdict: customized defdict; internal use only.
91 @type _defdict: dict
92 """
93 super(CaseInfo, self).__init__(*args, **kw)
94
95 if _defdict is None:
96 defdict = self.defdict
97 else:
98 defdict = dict(self.defdict)
99 defdict.update(_defdict)
100
101 keygrp = dict()
102 for key in defdict.keys():
103 if key is None or key == '':
104 continue
105 tokens = key.split('.', 1)
106 if len(tokens) == 2:
107 fkey, rkey = tokens
108 keygrp.setdefault(fkey, dict())[rkey] = defdict[key]
109 else:
110 fkey = tokens[0]
111 keygrp[fkey] = defdict[fkey]
112
113 for fkey in keygrp.keys():
114 data = keygrp[fkey]
115 if isinstance(data, dict):
116 self[fkey] = CaseInfo(_defdict=data)
117 elif isinstance(data, type):
118 try:
119 self[fkey] = data()
120 except TypeError:
121 self[fkey] = data
122 else:
123 self[fkey] = data
124
126 """
127 Base class for simulation cases.
128
129 init() and run() are the two primary methods responsible for the
130 execution of the simulation case object. Both methods accept a keyword
131 parameter ``level'' which indicates the run level of the run:
132 - run level 0: fresh run (default),
133 - run level 1: restart run,
134 - run level 2: initialization only.
135
136 @ivar runhooks: a special list containing all the hook objects to be run.
137 @itype runhooks: solvcon.hook.HookList
138 """
139
140 CSEFN_DEFAULT = 'solvcon.dump.case.obj'
141
142 from . import conf, batch
143 defdict = {
144
145 'execution.fpdtype': conf.env.fpdtypestr,
146 'execution.batch': batch.Batch,
147 'execution.resources': dict,
148 'execution.stop': False,
149 'execution.time': 0.0,
150 'execution.time_increment': 0.0,
151 'execution.step_init': 0,
152 'execution.step_current': None,
153 'execution.steps_run': None,
154 'execution.steps_stride': 1,
155 'execution.marchret': None,
156 'execution.neq': 0,
157 'execution.var': dict,
158 'execution.varstep': None,
159 'execution.ncore': -1,
160
161 'dynamic.inputfn': 'solvcon.input',
162 'dynamic.bakfn': 'solvcon.input.bak',
163 'dynamic.preserve': False,
164
165 'io.abspath': False,
166 'io.rootdir': None,
167 'io.basedir': None,
168 'io.basefn': None,
169 'io.empty_jobdir': False,
170 'io.solver_output': False,
171
172 'condition.mtrllist': list,
173
174 'solver.solvertype': None,
175 'solver.solverobj': None,
176
177 'log.time': dict,
178 }
179 del conf, batch
180 from .helper import info
181
182 - def _log_start(self, action, msg='', postmsg=' ... '):
183 """
184 Print to user and record start time for certain action.
185
186 @param action: action key.
187 @type action: str
188 @keyword msg: trailing message for the action key.
189 @type msg: str
190 @return: nothing.
191 """
192 from time import time
193 info = self.info
194 tarr = [0,0,0]
195 tarr[0] = time()
196 self.log.time[action] = tarr
197 info(
198 info.prefix * (info.width-info.level*info.nchar),
199 travel=1
200 )
201 info('\nStart %s%s%s' % (action, msg, postmsg))
202 info(
203 '\n' + info.prefix * (info.width-info.level*info.nchar) + '\n',
204 )
205
206 - def _log_end(self, action, msg='', postmsg=' . '):
207 """
208 Print to user and record end time for certain action.
209
210 @param action: action key.
211 @type action: str
212 @keyword msg: supplemental message.
213 @type msg: str
214 @return: nothing
215 """
216 from time import time
217 info = self.info
218 tarr = self.log.time.setdefault(action, [0,0,0])
219 tarr[1] = time()
220 tarr[2] = tarr[1] - tarr[0]
221 info(
222 info.prefix * (info.width-info.level*info.nchar) + \
223 '\nEnd %s%s%sElapsed time (sec) = %g' % (
224 action, msg, postmsg, tarr[2]
225 )
226 )
227 info(
228 '\n' + info.prefix * (info.width-(info.level-1)*info.nchar) + '\n',
229 travel=-1
230 )
231
233 """
234 Initiailize the basic case. Set through keyword parameters.
235 """
236 import os
237 from .hook import HookList
238
239 initpairs = list()
240 for cinfok in self.defdict.keys():
241 lkey = cinfok.split('.')[-1]
242 initpairs.append((cinfok, kw.pop(lkey, None)))
243
244 super(BaseCase, self).__init__(**kw)
245
246 for cinfok, val in initpairs:
247 if val is not None:
248 self._set_through(cinfok, val)
249
250 self.runhooks = HookList(self)
251
252 if self.io.abspath:
253 self.io.basedir = os.path.abspath(self.io.basedir)
254 if self.io.basedir is not None and not os.path.exists(self.io.basedir):
255 os.makedirs(self.io.basedir)
256
258 """
259 Dynamically execute the codes stored in the input file specified by
260 the case.
261
262 @return: nothing
263 """
264 import os
265 import traceback
266 from cStringIO import StringIO
267 if not self.dynamic.inputfn: return
268 try:
269
270 if os.path.exists(self.dynamic.inputfn):
271 f = open(self.dynamic.inputfn, 'r')
272 codes = f.read()
273 f.close()
274 else:
275 codes = ''
276 if codes.strip():
277
278 f = open(self.dynamic.inputfn, 'w')
279 if self.dynamic.preserve:
280 f.write(codes)
281 f.close()
282
283 f = open(self.dynamic.bakfn, 'a')
284 f.write('\n### %s step %d/%d\n' % (self.io.basefn,
285 self.execution.step_current, self.execution.steps_run,
286 ))
287 f.write(codes)
288 f.close()
289
290 exec(codes)
291 except Exception, e:
292 f = StringIO()
293 f.write('\n@@@ dynamic execution at step %d @@@' %
294 self.execution.step_current)
295 f.write('\nCode:\n %s\n' % codes)
296 traceback.print_exc(file=f)
297 self.info(f.getvalue())
298 f.close()
299
300 self.dynamic.preserve = False
301
302 - def init(self, level=0):
303 """
304 Initialize solver. Nothing inside now.
305
306 @keyword level: run level; higher level does less work.
307 @type level: int
308
309 @return: nothing.
310 """
311 pass
312
313 - def run(self, level=0):
314 """
315 Run the simulation case; time marching.
316
317 @keyword level: run level; higher level does less work.
318 @type level: int
319
320 @return: nothing.
321 """
322
323 self._log_start('run', msg=' '+self.io.basefn)
324 self.info("\n")
325
326 self.execution.step_current = 0
327 self.runhooks('preloop')
328 self._log_start('loop_march')
329 while self.execution.step_current < self.execution.steps_run:
330 self.runhooks('premarch')
331 self.execution.marchret = self.solver.solverobj.march(
332 self.execution.step_current*self.execution.time_increment,
333 self.execution.time_increment)
334 self.execution.step_current += 1
335 self.runhooks('postmarch')
336 self._log_start('loop_march')
337 self.runhooks('postloop')
338
339 self._log_end('run')
340
341 - def cleanup(self, signum=None, frame=None):
342 import signal
343 if signum == signal.SIGINT:
344 raise KeyboardInterrupt
345
346 @classmethod
348 """
349 Decorate simulation functions. This function asserts required
350 signature which is necessary for a function to be a valid simulation
351 function. Moreover, all the simulation function should be decorated by
352 this decorator.
353
354 @return: simulation function.
355 @rtype: callable
356 """
357 import signal
358 import cPickle as pickle
359 from .batch import Batch
360 if casename is None: casename = func.__name__
361 def simu(*args, **kw):
362 kw.pop('casename', None)
363 resources = kw.pop('resources', dict())
364 batch = kw.get('batch', Batch)
365 submit = kw.pop('submit')
366 use_mpi = kw.pop('use_mpi', False)
367 postpone = kw.pop('postpone', False)
368 runlevel = kw.pop('runlevel')
369
370 if runlevel == 1:
371 case = pickle.load(open(cls.CSEFN_DEFAULT, 'rb'))
372 else:
373 case = func(casename=casename, *args, **kw)
374
375 try:
376 if submit:
377 sbm = batch(case, arnname=casename, use_mpi=use_mpi,
378 **resources)
379 sbm(runlevel=runlevel, postpone=postpone)
380 else:
381 signal.signal(signal.SIGTERM, case.cleanup)
382 signal.signal(signal.SIGINT, case.cleanup)
383 case.init(level=runlevel)
384 case.info('\n')
385 case.run(level=runlevel)
386 case.cleanup()
387 except:
388 case.cleanup()
389 raise
390 return case
391
392 cls.arrangements[casename] = simu
393 arrangements[casename] = simu
394 return simu
395
397 """
398 Base class for multi-dimensional cases using block.
399
400 @ivar pythonpaths: extra python paths.
401 @itype pythonpaths: list
402 """
403 defdict = {
404
405 'execution.npart': None,
406 'execution.step_restart': None,
407 'execution.steps_dump': None,
408
409 'io.mesher': None,
410 'io.meshfn': None,
411 'io.domain.with_arrs': True,
412 'io.domain.with_whole': True,
413 'io.domain.wholefn': None,
414 'io.domain.splitfns': None,
415 'io.rkillfn': 'solvcon.kill.sh',
416 'io.dump.csefn': 'solvcon.dump.case.obj',
417 'io.dump.svrfntmpl': 'solvcon.dump.solver%s.obj',
418
419 'condition.bcmap': None,
420 'condition.bcmod': None,
421
422 'solver.use_incenter': False,
423 'solver.domaintype': None,
424 'solver.domainobj': None,
425 'solver.dealer': None,
426 'solver.envar': dict,
427 'solver.ibcthread': False,
428 }
429
431 self.pythonpaths = kw.pop('pythonpaths', [])
432 super(BlockCase, self).__init__(**kw)
433
434 @property
436 """
437 Determine if self should do parallel or not.
438
439 @return: 0 means sequential; 1 means local parallel.
440 @rtype: int
441 """
442 from . import domain
443 domaintype = self.solver.domaintype
444 if domaintype == domain.Domain:
445 assert self.execution.npart == None
446 flag_parallel = 0
447 elif domaintype == domain.Collective:
448 assert isinstance(self.execution.npart, int)
449 flag_parallel = 1
450 elif domaintype == domain.Distributed:
451 assert isinstance(self.execution.npart, int)
452 flag_parallel = 2
453 else:
454 raise TypeError, 'domaintype shouldn\'t be %s' % domaintype
455 return flag_parallel
456
458 """
459 Dump case and remote solver objects for later restart.
460
461 @return: nothing
462 """
463 import cPickle as pickle
464 dealer = self.solver.dealer
465 flag_parallel = self.is_parallel
466
467 self.execution.step_restart = self.execution.step_current
468
469 if flag_parallel:
470 for iblk in range(len(self.solver.domainobj)):
471 dealer[iblk].cmd.dump(self.io.dump.svrfntmpl % str(iblk))
472 else:
473 self.solver.domainobj.unbind()
474 self.solver.solverobj.unbind()
475 svrholds = dict()
476 for key in ['mesg',]:
477 svrholds[key] = getattr(self.solver.solverobj, key)
478 setattr(self.solver.solverobj, key, None)
479
480 self.solver.dealer = None
481 pickle.dump(self, open(self.io.dump.csefn, 'wb'),
482 pickle.HIGHEST_PROTOCOL)
483
484 if flag_parallel:
485 self.solver.dealer = dealer
486 else:
487 for key in svrholds:
488 setattr(self.solver.solverobj, key, svrholds[key])
489 self.solver.solverobj.bind()
490 self.solver.domainobj.bind()
491
492 - def cleanup(self, signum=None, frame=None):
496
497
498
499
500
501
502
503 - def init(self, level=0):
571
573 """
574 Return a block for init.
575
576 @return: a block object.
577 @rtype: solvcon.block.Block
578 """
579 import os, gzip
580 from .io.gmsh import Gmsh
581 from .io.genesis import Genesis
582 from .io.gambit import GambitNeutral
583 from .io.block import BlockIO
584 from .io.domain import DomainIO
585 meshfn = self.io.meshfn
586 bcmapper = self.condition.bcmap
587 self.info('mesh file: %s\n' % meshfn)
588 if callable(self.io.mesher):
589 self._log_start('create_block')
590 obj = self.io.mesher(self)
591 self._log_end('create_block')
592 elif os.path.isdir(meshfn):
593 dof = DomainIO(dirname=meshfn)
594 obj, whole, split = dof.load(bcmapper=bcmapper,
595 with_arrs=self.io.domain.with_arrs,
596 with_whole=self.io.domain.with_whole, with_split=False,
597 return_filenames=True, domaintype=self.solver.domaintype)
598 self.io.domain.wholefn = whole
599 self.io.domain.splitfn = split
600 elif '.msh' in meshfn:
601 self._log_start('create_gmsh_object')
602 if meshfn.endswith('.gz'):
603 stream = gzip.open(meshfn)
604 else:
605 stream = open(meshfn)
606 gmh = Gmsh(stream)
607 gmh.load()
608 stream.close()
609 self._log_end('create_gmsh_object')
610 self._log_start('convert_gmsh_to_block')
611 obj = gmh.toblock(bcname_mapper=bcmapper,
612 use_incenter=self.solver.use_incenter)
613 self._log_end('convert_msh_to_block')
614 elif meshfn.endswith('.g'):
615 self._log_start('create_genesis_object')
616 gn = Genesis(meshfn)
617 gn.load()
618 gn.close_file()
619 self._log_end('create_genesis_object')
620 self._log_start('convert_genesis_to_block')
621 obj = gn.toblock(bcname_mapper=bcmapper,
622 use_incenter=self.solver.use_incenter)
623 self._log_end('convert_genesis_to_block')
624 elif '.neu' in meshfn:
625 self._log_start('read_neu_data', msg=' from %s'%meshfn)
626 if meshfn.endswith('.gz'):
627 stream = gzip.open(meshfn)
628 else:
629 stream = open(meshfn)
630 data = stream.read()
631 stream.close()
632 self._log_end('read_neu_data')
633 self._log_start('create_neu_object')
634 neu = GambitNeutral(data)
635 self._log_end('create_neu_object')
636 self._log_start('convert_neu_to_block')
637 obj = neu.toblock(bcname_mapper=bcmapper,
638 use_incenter=self.solver.use_incenter)
639 self._log_end('convert_neu_to_block')
640 elif '.blk' in meshfn:
641 self._log_start('load_block')
642 obj = BlockIO().load(stream=meshfn, bcmapper=bcmapper)
643 self._log_end('load_block')
644 else:
645 raise ValueError(meshfn)
646 return obj
647
649 """
650 Return keywords to initialize solvers.
651
652 @return: keywords
653 @rtype: dict
654 """
655 return dict(
656 ncore=self.execution.ncore,
657 neq=self.execution.neq,
658 fpdtype=self.execution.fpdtype,
659 enable_mesg=self.io.solver_output,
660 )
661
662
675 """
676 @return: nothing
677 """
678 self.solver.solverobj.bind()
679 self.solver.solverobj.ibcthread = self.solver.ibcthread
680 self.solver.domainobj.bind()
682 """
683 @return: nothing
684 """
685 dealer = self.solver.dealer
686 solvertype = self.solver.solvertype
687 dom = self.solver.domainobj
688 nblk = dom.nblk
689 for iblk in range(nblk):
690 svrkw = self.make_solver_keywords()
691 self.info('solver #%d/(%d-1): ' % (iblk, nblk))
692 if dom.presplit:
693 dealer[iblk].create_solver(self.condition.bcmap,
694 self.io.meshfn, self.io.domain.splitfn[iblk],
695 iblk, nblk, solvertype, svrkw)
696 self.runhooks.drop_anchor(dealer[iblk])
697 else:
698 sbk = dom[iblk]
699 svr = solvertype(sbk, **svrkw)
700 self.info('sending ... ')
701 svr.svrn = iblk
702 svr.nsvr = nblk
703 self.runhooks.drop_anchor(svr)
704 svr.unbind()
705 dealer[iblk].remote_setattr('muscle', svr)
706 self.info('done.\n')
707 self.info('Bind/Init ... ')
708 for sdw in dealer: sdw.cmd.bind()
709 for sdw in dealer: sdw.cmd.remote_setattr('ibcthread',
710 self.solver.ibcthread)
711 for sdw in dealer: sdw.cmd.init()
712 dealer.barrier()
713 self.info('done.\n')
727
728
730 """
731 Make dealer and create workers for the dealer.
732
733 @return: worker manager.
734 @rtype: solvcon.rpc.Dealer
735 """
736 from .rpc import Dealer
737 nblk = self.solver.domainobj.nblk
738 flag_parallel = self.is_parallel
739 if flag_parallel == 1:
740 family = None
741 create_workers = self._create_workers_local
742 elif flag_parallel == 2:
743 family = 'AF_INET'
744 create_workers = self._create_workers_remote
745 dealer = Dealer(family=family)
746 create_workers(dealer, nblk)
747 return dealer
749 from .conf import env
750 if env.command != None:
751 ops, args = env.command.opargs
752 if getattr(ops, 'use_profiler'):
753 return (
754 ops.profiler_dat+'%d'%iblk,
755 ops.profiler_log+'%d'%iblk,
756 ops.profiler_sort,
757 )
758 return None
765 import os, sys
766 from .rpc import DEFAULT_AUTHKEY
767 from .conf import env
768 info = self.info
769 authkey = DEFAULT_AUTHKEY
770 paths = dict([(key, os.environ.get(key, '').split(':')) for key in
771 'LD_LIBRARY_PATH',
772 'PYTHONPATH',
773 ])
774 paths['PYTHONPATH'].extend(self.pythonpaths)
775 paths['PYTHONPATH'].insert(0, self.io.rootdir)
776
777 info('Appoint remote workers')
778 bat = self.execution.batch(self)
779 nodelist = bat.nodelist()
780 if env.command != None and env.command.opargs[0].compress_nodelist:
781 info(' (compressed)')
782 if env.mpi:
783 info(' (head excluded for MPI)')
784 info(':\n')
785 iworker = 0
786 for node in nodelist:
787 info(' %s' % node.name)
788 port = bat.create_worker(node, authkey,
789 envar=self.solver.envar, paths=paths,
790 profiler_data=self._get_profiler_data(iworker))
791 info(' worker #%d created' % iworker)
792 dealer.appoint(node.address, port, authkey)
793 info(' and appointed.\n')
794 iworker += 1
795 if len(dealer) != nblk:
796 raise IndexError('%d != %d' % (len(dealer), nblk))
797
798 if self.io.rkillfn:
799 f = open(self.io.rkillfn, 'w')
800 f.write("""#!/bin/sh
801 nodes="
802 %s
803 "
804 for node in $nodes; do rsh $node killall %s; done
805 """ % (
806 '\n'.join([node.name for node in nodelist]),
807 os.path.split(sys.executable)[-1],
808 ))
809
810
812 """
813 Make interconnections for distributed solver objects.
814
815 @return: nothing
816 """
817 dom = self.solver.domainobj
818 dealer = self.solver.dealer
819 dwidth = len(str(dom.nblk-1))
820 oblk = -1
821 for iblk, jblk in dom.ifparr:
822 if iblk != oblk:
823 if oblk != -1:
824 self.info('.\n')
825 self.info(('%%0%dd ->' % dwidth) % iblk)
826 oblk = iblk
827 dealer.bridge((iblk, jblk))
828 self.info((' %%0%dd'%dwidth) % jblk)
829 self.info('.\n')
830 dealer.barrier()
831
832
834 """
835 Exchange meta data.
836
837 @return: nothing
838 """
839 dom = self.solver.domainobj
840 dealer = self.solver.dealer
841 nblk = dom.nblk
842 iflists = dom.make_iflist_per_block()
843 self.info('Interface exchanging pairs (%d phases):\n' % len(
844 iflists[0]))
845 dwidth = len(str(nblk-1))
846 for iblk in range(nblk):
847 ifacelist = iflists[iblk]
848 sdw = dealer[iblk]
849 sdw.cmd.init_exchange(ifacelist)
850
851 self.info(('%%0%dd ->' % dwidth) % iblk)
852 for pair in ifacelist:
853 if pair < 0:
854 stab = '-' * (2*dwidth+1)
855 else:
856 stab = '-'.join([('%%0%dd'%dwidth)%item for item in pair])
857 self.info(' %s' % stab)
858 self.info('\n')
859
861 """
862 Exchange metric data for solver.
863
864 @return: nothing
865 """
866 dealer = self.solver.dealer
867 for arrname in self.solver.solvertype._interface_init_:
868 for sdw in dealer: sdw.cmd.exchangeibc(arrname,
869 with_worker=True)
870
871
872
873
874
875
876
877
878
879
880
881
882
883 - def run(self, level=0):
884 """
885 Run the simulation case; time marching.
886
887 @keyword level: run level; higher level does less work.
888 @type level: int
889
890 @return: nothing.
891 """
892 self._log_start('run', msg=' (level %d) %s' % (level, self.io.basefn))
893 self.execution.step_current = self.execution.step_init
894 if level < 1:
895 self._run_provide()
896 self._run_preloop()
897 if level < 2:
898 self._run_march()
899 self._run_postloop()
900 self._run_exhaust()
901 else:
902 self.dump()
903 self._run_final()
904 self._log_end('run', msg=' '+self.io.basefn)
905
906
929
931 from time import time as timer
932 dealer = self.solver.dealer
933 flag_parallel = self.is_parallel
934 self.log.time['solver_march'] = 0.0
935 self.info('\n')
936 self._log_start('loop_march')
937 while self.execution.step_current < self.execution.steps_run:
938
939 self._dynamic_execute()
940 if self.execution.stop: break
941
942 if self.execution.steps_dump != None and \
943 self.execution.step_current != self.execution.step_restart and \
944 self.execution.step_current%self.execution.steps_dump == 0:
945 self.dump()
946
947 self.runhooks('premarch')
948
949 solver_march_marker = timer()
950 steps_stride = self.execution.steps_stride
951 time_increment = self.execution.time_increment
952 time = self.execution.step_current*time_increment
953 if flag_parallel:
954 for sdw in dealer: sdw.cmd.march(time, time_increment,
955 steps_stride, with_worker=True)
956 self.execution.marchret = [sdw.recv() for sdw in dealer]
957 else:
958 self.execution.marchret = self.solver.solverobj.march(time,
959 time_increment, steps_stride)
960 self.execution.time += time_increment*steps_stride
961 self.log.time['solver_march'] += timer() - solver_march_marker
962 self.execution.step_current += steps_stride
963
964 self.runhooks('postmarch')
965
966 self._log_end('loop_march')
967 self.info('\n')
968
969
970 - def _run_postloop(self):
971 dealer = self.solver.dealer
972 flag_parallel = self.is_parallel
973
974 if flag_parallel:
975 for sdw in dealer: sdw.cmd.postloop()
976 else:
977 self.solver.solverobj.postloop()
978 self.runhooks('postloop')
996
997
998
999
1000
1001
1002