Package solvcon :: Module case
[hide private]
[frames] | no frames]

Source Code for Module solvcon.case

   1  # -*- coding: UTF-8 -*- 
   2  # 
   3  # Copyright (C) 2008-2010 Yung-Yu Chen <yyc@solvcon.net>. 
   4  # 
   5  # This program is free software; you can redistribute it and/or modify 
   6  # it under the terms of the GNU General Public License as published by 
   7  # the Free Software Foundation; either version 2 of the License, or 
   8  # (at your option) any later version. 
   9  # 
  10  # This program is distributed in the hope that it will be useful, 
  11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  13  # GNU General Public License for more details. 
  14  # 
  15  # You should have received a copy of the GNU General Public License along 
  16  # with this program; if not, write to the Free Software Foundation, Inc., 
  17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
  18   
  19  """ 
  20  Simulation cases. 
  21  """ 
  22   
  23  from .gendata import SingleAssignDict, AttributeDict 
24 25 -class ArrangementRegistry(SingleAssignDict, AttributeDict):
26 """ 27 Arrangement registry class. A simulation is a callable that returns a case 28 object. 29 """
30 - def __setitem__(self, key, value):
31 assert callable(value) 32 super(ArrangementRegistry, self).__setitem__(key, value)
33 arrangements = ArrangementRegistry() # overall registry singleton.
34 35 -class CaseInfoMeta(type):
36 """ 37 Meta class for case information class. 38 """
39 - def __new__(cls, name, bases, namespace):
40 newcls = super(CaseInfoMeta, cls).__new__(cls, name, bases, namespace) 41 # incremental modification of defdict. 42 defdict = {} 43 for base in bases: 44 defdict.update(getattr(base, 'defdict', {})) 45 defdict.update(newcls.defdict) 46 newcls.defdict = defdict 47 # create different simulation registry objects for case classes. 48 newcls.arrangements = ArrangementRegistry() 49 return newcls
50 -class CaseInfo(dict):
51 """ 52 Generic case information abstract class. It's the base class that all case 53 information classes should subclass, to form hierarchical information 54 object. 55 """ 56 __metaclass__ = CaseInfoMeta 57 defdict = {}
58 - def __getattr__(self, name):
59 """ 60 Consult self dictionary for attribute. It's a shorthand. 61 """ 62 if name == '__setstate__': 63 raise AttributeError 64 return self[name]
65 - def __setattr__(self, name, value):
66 """ 67 Save to self dictionary first, then self object table. 68 69 @note: This method is overriden as a stupid-preventer. It makes 70 attribute setting consistent with attribute getting. 71 """ 72 if name in self: 73 self[name] = value 74 else: 75 super(CaseInfo, self).__setattr__(name, value)
76 - def _set_through(self, key, val):
77 """ 78 Set to self with the dot-separated key. 79 """ 80 tokens = key.split('.', 1) 81 fkey = tokens[0] 82 if len(tokens) == 2: 83 self[fkey]._set_through(tokens[1], val) 84 else: 85 self[fkey] = val
86 - def __init__(self, _defdict=None, *args, **kw):
87 """ 88 Assign default values to self after initiated. 89 90 @keyword _defdict: customized defdict; internal use only. 91 @type _defdict: dict 92 """ 93 super(CaseInfo, self).__init__(*args, **kw) 94 # customize defdict. 95 if _defdict is None: 96 defdict = self.defdict 97 else: 98 defdict = dict(self.defdict) 99 defdict.update(_defdict) 100 # parse first hierarchy to form key groups. 101 keygrp = dict() 102 for key in defdict.keys(): 103 if key is None or key == '': 104 continue 105 tokens = key.split('.', 1) 106 if len(tokens) == 2: 107 fkey, rkey = tokens 108 keygrp.setdefault(fkey, dict())[rkey] = defdict[key] 109 else: 110 fkey = tokens[0] 111 keygrp[fkey] = defdict[fkey] 112 # set up first layer keys recursively. 113 for fkey in keygrp.keys(): 114 data = keygrp[fkey] 115 if isinstance(data, dict): 116 self[fkey] = CaseInfo(_defdict=data) 117 elif isinstance(data, type): 118 try: 119 self[fkey] = data() 120 except TypeError: 121 self[fkey] = data 122 else: 123 self[fkey] = data
124
125 -class BaseCase(CaseInfo):
126 """ 127 Base class for simulation cases. 128 129 init() and run() are the two primary methods responsible for the 130 execution of the simulation case object. Both methods accept a keyword 131 parameter ``level'' which indicates the run level of the run: 132 - run level 0: fresh run (default), 133 - run level 1: restart run, 134 - run level 2: initialization only. 135 136 @ivar runhooks: a special list containing all the hook objects to be run. 137 @itype runhooks: solvcon.hook.HookList 138 """ 139 140 CSEFN_DEFAULT = 'solvcon.dump.case.obj' 141 142 from . import conf, batch 143 defdict = { 144 # execution related. 145 'execution.fpdtype': conf.env.fpdtypestr, 146 'execution.batch': batch.Batch, 147 'execution.resources': dict, # for batch. 148 'execution.stop': False, 149 'execution.time': 0.0, 150 'execution.time_increment': 0.0, 151 'execution.step_init': 0, 152 'execution.step_current': None, 153 'execution.steps_run': None, 154 'execution.steps_stride': 1, 155 'execution.marchret': None, 156 'execution.neq': 0, # number of unknowns. 157 'execution.var': dict, # for Calculator hooks. 158 'execution.varstep': None, # the step for which var and dvar are valid. 159 'execution.ncore': -1, # number of cores to use in solver. 160 # dynamic related. 161 'dynamic.inputfn': 'solvcon.input', 162 'dynamic.bakfn': 'solvcon.input.bak', 163 'dynamic.preserve': False, 164 # io related. 165 'io.abspath': False, # flag to use abspath or not. 166 'io.rootdir': None, 167 'io.basedir': None, 168 'io.basefn': None, 169 'io.empty_jobdir': False, 170 'io.solver_output': False, 171 # conditions. 172 'condition.mtrllist': list, 173 # solver. 174 'solver.solvertype': None, 175 'solver.solverobj': None, 176 # logging. 177 'log.time': dict, 178 } 179 del conf, batch 180 from .helper import info 181
182 - def _log_start(self, action, msg='', postmsg=' ... '):
183 """ 184 Print to user and record start time for certain action. 185 186 @param action: action key. 187 @type action: str 188 @keyword msg: trailing message for the action key. 189 @type msg: str 190 @return: nothing. 191 """ 192 from time import time 193 info = self.info 194 tarr = [0,0,0] 195 tarr[0] = time() 196 self.log.time[action] = tarr 197 info( 198 info.prefix * (info.width-info.level*info.nchar), 199 travel=1 200 ) 201 info('\nStart %s%s%s' % (action, msg, postmsg)) 202 info( 203 '\n' + info.prefix * (info.width-info.level*info.nchar) + '\n', 204 )
205
206 - def _log_end(self, action, msg='', postmsg=' . '):
207 """ 208 Print to user and record end time for certain action. 209 210 @param action: action key. 211 @type action: str 212 @keyword msg: supplemental message. 213 @type msg: str 214 @return: nothing 215 """ 216 from time import time 217 info = self.info 218 tarr = self.log.time.setdefault(action, [0,0,0]) 219 tarr[1] = time() 220 tarr[2] = tarr[1] - tarr[0] 221 info( 222 info.prefix * (info.width-info.level*info.nchar) + \ 223 '\nEnd %s%s%sElapsed time (sec) = %g' % ( 224 action, msg, postmsg, tarr[2] 225 ) 226 ) 227 info( 228 '\n' + info.prefix * (info.width-(info.level-1)*info.nchar) + '\n', 229 travel=-1 230 )
231
232 - def __init__(self, **kw):
233 """ 234 Initiailize the basic case. Set through keyword parameters. 235 """ 236 import os 237 from .hook import HookList 238 # populate value from keywords. 239 initpairs = list() 240 for cinfok in self.defdict.keys(): 241 lkey = cinfok.split('.')[-1] 242 initpairs.append((cinfok, kw.pop(lkey, None))) 243 # initialize with the left keywords. 244 super(BaseCase, self).__init__(**kw) 245 # populate value from keywords. 246 for cinfok, val in initpairs: 247 if val is not None: 248 self._set_through(cinfok, val) 249 # create runhooks. 250 self.runhooks = HookList(self) 251 # expand basedir. 252 if self.io.abspath: 253 self.io.basedir = os.path.abspath(self.io.basedir) 254 if self.io.basedir is not None and not os.path.exists(self.io.basedir): 255 os.makedirs(self.io.basedir)
256
257 - def _dynamic_execute(self):
258 """ 259 Dynamically execute the codes stored in the input file specified by 260 the case. 261 262 @return: nothing 263 """ 264 import os 265 import traceback 266 from cStringIO import StringIO 267 if not self.dynamic.inputfn: return 268 try: 269 # load codes. 270 if os.path.exists(self.dynamic.inputfn): 271 f = open(self.dynamic.inputfn, 'r') 272 codes = f.read() 273 f.close() 274 else: 275 codes = '' 276 if codes.strip(): 277 # clear/preserve code file. 278 f = open(self.dynamic.inputfn, 'w') 279 if self.dynamic.preserve: 280 f.write(codes) 281 f.close() 282 # backup codes. 283 f = open(self.dynamic.bakfn, 'a') 284 f.write('\n### %s step %d/%d\n' % (self.io.basefn, 285 self.execution.step_current, self.execution.steps_run, 286 )) 287 f.write(codes) 288 f.close() 289 # run codes. 290 exec(codes) 291 except Exception, e: 292 f = StringIO() 293 f.write('\n@@@ dynamic execution at step %d @@@' % 294 self.execution.step_current) 295 f.write('\nCode:\n %s\n' % codes) 296 traceback.print_exc(file=f) 297 self.info(f.getvalue()) 298 f.close() 299 # reset preservation flag. 300 self.dynamic.preserve = False
301
302 - def init(self, level=0):
303 """ 304 Initialize solver. Nothing inside now. 305 306 @keyword level: run level; higher level does less work. 307 @type level: int 308 309 @return: nothing. 310 """ 311 pass
312
313 - def run(self, level=0):
314 """ 315 Run the simulation case; time marching. 316 317 @keyword level: run level; higher level does less work. 318 @type level: int 319 320 @return: nothing. 321 """ 322 # start log. 323 self._log_start('run', msg=' '+self.io.basefn) 324 self.info("\n") 325 # prepare for time marching. 326 self.execution.step_current = 0 327 self.runhooks('preloop') 328 self._log_start('loop_march') 329 while self.execution.step_current < self.execution.steps_run: 330 self.runhooks('premarch') 331 self.execution.marchret = self.solver.solverobj.march( 332 self.execution.step_current*self.execution.time_increment, 333 self.execution.time_increment) 334 self.execution.step_current += 1 335 self.runhooks('postmarch') 336 self._log_start('loop_march') 337 self.runhooks('postloop') 338 # end log. 339 self._log_end('run')
340
341 - def cleanup(self, signum=None, frame=None):
342 import signal 343 if signum == signal.SIGINT: 344 raise KeyboardInterrupt
345 346 @classmethod
347 - def register_arrangement(cls, func, casename=None):
348 """ 349 Decorate simulation functions. This function asserts required 350 signature which is necessary for a function to be a valid simulation 351 function. Moreover, all the simulation function should be decorated by 352 this decorator. 353 354 @return: simulation function. 355 @rtype: callable 356 """ 357 import signal 358 import cPickle as pickle 359 from .batch import Batch 360 if casename is None: casename = func.__name__ 361 def simu(*args, **kw): 362 kw.pop('casename', None) 363 resources = kw.pop('resources', dict()) 364 batch = kw.get('batch', Batch) 365 submit = kw.pop('submit') 366 use_mpi = kw.pop('use_mpi', False) 367 postpone = kw.pop('postpone', False) 368 runlevel = kw.pop('runlevel') 369 # obtain the case object. 370 if runlevel == 1: 371 case = pickle.load(open(cls.CSEFN_DEFAULT, 'rb')) 372 else: 373 case = func(casename=casename, *args, **kw) 374 # submit/run. 375 try: 376 if submit: 377 sbm = batch(case, arnname=casename, use_mpi=use_mpi, 378 **resources) 379 sbm(runlevel=runlevel, postpone=postpone) 380 else: 381 signal.signal(signal.SIGTERM, case.cleanup) 382 signal.signal(signal.SIGINT, case.cleanup) 383 case.init(level=runlevel) 384 case.info('\n') 385 case.run(level=runlevel) 386 case.cleanup() 387 except: 388 case.cleanup() 389 raise 390 return case
391 # register self to simulation registries. 392 cls.arrangements[casename] = simu 393 arrangements[casename] = simu 394 return simu
395
396 -class BlockCase(BaseCase):
397 """ 398 Base class for multi-dimensional cases using block. 399 400 @ivar pythonpaths: extra python paths. 401 @itype pythonpaths: list 402 """ 403 defdict = { 404 # execution related. 405 'execution.npart': None, # number of decomposed blocks. 406 'execution.step_restart': None, 407 'execution.steps_dump': None, 408 # IO. 409 'io.mesher': None, 410 'io.meshfn': None, 411 'io.domain.with_arrs': True, 412 'io.domain.with_whole': True, 413 'io.domain.wholefn': None, 414 'io.domain.splitfns': None, 415 'io.rkillfn': 'solvcon.kill.sh', 416 'io.dump.csefn': 'solvcon.dump.case.obj', 417 'io.dump.svrfntmpl': 'solvcon.dump.solver%s.obj', 418 # conditions. 419 'condition.bcmap': None, 420 'condition.bcmod': None, 421 # solver. 422 'solver.use_incenter': False, 423 'solver.domaintype': None, 424 'solver.domainobj': None, 425 'solver.dealer': None, 426 'solver.envar': dict, 427 'solver.ibcthread': False, 428 } 429
430 - def __init__(self, **kw):
431 self.pythonpaths = kw.pop('pythonpaths', []) 432 super(BlockCase, self).__init__(**kw)
433 434 @property
435 - def is_parallel(self):
436 """ 437 Determine if self should do parallel or not. 438 439 @return: 0 means sequential; 1 means local parallel. 440 @rtype: int 441 """ 442 from . import domain 443 domaintype = self.solver.domaintype 444 if domaintype == domain.Domain: 445 assert self.execution.npart == None 446 flag_parallel = 0 # means sequential. 447 elif domaintype == domain.Collective: 448 assert isinstance(self.execution.npart, int) 449 flag_parallel = 1 # means local parallel. 450 elif domaintype == domain.Distributed: 451 assert isinstance(self.execution.npart, int) 452 flag_parallel = 2 # means network parallel. 453 else: 454 raise TypeError, 'domaintype shouldn\'t be %s' % domaintype 455 return flag_parallel
456
457 - def dump(self):
458 """ 459 Dump case and remote solver objects for later restart. 460 461 @return: nothing 462 """ 463 import cPickle as pickle 464 dealer = self.solver.dealer 465 flag_parallel = self.is_parallel 466 # record the step can be restarted from. 467 self.execution.step_restart = self.execution.step_current 468 # unbind. 469 if flag_parallel: 470 for iblk in range(len(self.solver.domainobj)): 471 dealer[iblk].cmd.dump(self.io.dump.svrfntmpl % str(iblk)) 472 else: 473 self.solver.domainobj.unbind() 474 self.solver.solverobj.unbind() 475 svrholds = dict() 476 for key in ['mesg',]: 477 svrholds[key] = getattr(self.solver.solverobj, key) 478 setattr(self.solver.solverobj, key, None) 479 # pickle. 480 self.solver.dealer = None 481 pickle.dump(self, open(self.io.dump.csefn, 'wb'), 482 pickle.HIGHEST_PROTOCOL) 483 # bind. 484 if flag_parallel: 485 self.solver.dealer = dealer 486 else: 487 for key in svrholds: 488 setattr(self.solver.solverobj, key, svrholds[key]) 489 self.solver.solverobj.bind() 490 self.solver.domainobj.bind()
491
492 - def cleanup(self, signum=None, frame=None):
493 if self.solver.solverobj != None: 494 self.solver.solverobj.unbind() 495 super(BlockCase, self).cleanup(signum=signum, frame=frame)
496 497 ############################################################################ 498 ### 499 ### Begin of block of case initialization logics. 500 ### 501 ############################################################################ 502
503 - def init(self, level=0):
504 """ 505 Load block and initialize solver from the geometry information in the 506 block and conditions in the self case. If parallel run is specified 507 (through domaintype), split the domain and perform corresponding tasks. 508 """ 509 from .boundcond import interface 510 self._log_start('init', msg=' (level %d) %s' % (level, self.io.basefn)) 511 # initilize the whole solver and domain. 512 if level != 1: 513 self._log_start('build_domain') 514 loaded = self.load_block() 515 if callable(self.condition.bcmod): 516 self.condition.bcmod(loaded) 517 if isinstance(loaded, self.solver.domaintype): 518 self.solver.domainobj = loaded 519 else: 520 self.solver.domainobj = self.solver.domaintype(loaded) 521 self._log_end('build_domain') 522 # for serial execution. 523 if not self.is_parallel: 524 # create and initialize solver. 525 if level != 1: 526 self._local_init_solver() 527 else: 528 self._local_bind_solver() 529 # for parallel execution. 530 else: 531 # split the domain. 532 if level != 1 and not self.solver.domainobj.presplit: 533 self.info('\n') 534 self._log_start('split_domain') 535 self.solver.domainobj.split( 536 nblk=self.execution.npart, interface_type=interface) 537 self._log_end('split_domain') 538 # make dealer and create workers for the dealer. 539 self.info('\n') 540 self._log_start('build_dealer') 541 self.solver.dealer = self._create_workers() 542 self._log_end('build_dealer') 543 # make interconnections for rpc. 544 self.info('\n') 545 self._log_start('interconnect') 546 self._interconnect() 547 self._log_end('interconnect') 548 # spread out and initialize decomposed solvers. 549 if level != 1: 550 self.info('\n') 551 self._log_start('remote_init_solver') 552 self._remote_init_solver() 553 self._log_end('remote_init_solver') 554 else: 555 self.info('\n') 556 self._log_start('remote_load_solver') 557 self._remote_load_solver() 558 self._log_end('remote_load_solver') 559 # initialize interfaces. 560 self.info('\n') 561 self._log_start('init_interface') 562 self._init_interface() 563 self._log_end('init_interface') 564 # initialize exchange for remote solver objects. 565 if level != 1: 566 self.info('\n') 567 self._log_start('exchange_metric') 568 self._exchange_metric() 569 self._log_end('exchange_metric') 570 self._log_end('init', msg=' '+self.io.basefn)
571
572 - def load_block(self):
573 """ 574 Return a block for init. 575 576 @return: a block object. 577 @rtype: solvcon.block.Block 578 """ 579 import os, gzip 580 from .io.gmsh import Gmsh 581 from .io.genesis import Genesis 582 from .io.gambit import GambitNeutral 583 from .io.block import BlockIO 584 from .io.domain import DomainIO 585 meshfn = self.io.meshfn 586 bcmapper = self.condition.bcmap 587 self.info('mesh file: %s\n' % meshfn) 588 if callable(self.io.mesher): 589 self._log_start('create_block') 590 obj = self.io.mesher(self) 591 self._log_end('create_block') 592 elif os.path.isdir(meshfn): 593 dof = DomainIO(dirname=meshfn) 594 obj, whole, split = dof.load(bcmapper=bcmapper, 595 with_arrs=self.io.domain.with_arrs, 596 with_whole=self.io.domain.with_whole, with_split=False, 597 return_filenames=True, domaintype=self.solver.domaintype) 598 self.io.domain.wholefn = whole 599 self.io.domain.splitfn = split 600 elif '.msh' in meshfn: 601 self._log_start('create_gmsh_object') 602 if meshfn.endswith('.gz'): 603 stream = gzip.open(meshfn) 604 else: 605 stream = open(meshfn) 606 gmh = Gmsh(stream) 607 gmh.load() 608 stream.close() 609 self._log_end('create_gmsh_object') 610 self._log_start('convert_gmsh_to_block') 611 obj = gmh.toblock(bcname_mapper=bcmapper, 612 use_incenter=self.solver.use_incenter) 613 self._log_end('convert_msh_to_block') 614 elif meshfn.endswith('.g'): 615 self._log_start('create_genesis_object') 616 gn = Genesis(meshfn) 617 gn.load() 618 gn.close_file() 619 self._log_end('create_genesis_object') 620 self._log_start('convert_genesis_to_block') 621 obj = gn.toblock(bcname_mapper=bcmapper, 622 use_incenter=self.solver.use_incenter) 623 self._log_end('convert_genesis_to_block') 624 elif '.neu' in meshfn: 625 self._log_start('read_neu_data', msg=' from %s'%meshfn) 626 if meshfn.endswith('.gz'): 627 stream = gzip.open(meshfn) 628 else: 629 stream = open(meshfn) 630 data = stream.read() 631 stream.close() 632 self._log_end('read_neu_data') 633 self._log_start('create_neu_object') 634 neu = GambitNeutral(data) 635 self._log_end('create_neu_object') 636 self._log_start('convert_neu_to_block') 637 obj = neu.toblock(bcname_mapper=bcmapper, 638 use_incenter=self.solver.use_incenter) 639 self._log_end('convert_neu_to_block') 640 elif '.blk' in meshfn: 641 self._log_start('load_block') 642 obj = BlockIO().load(stream=meshfn, bcmapper=bcmapper) 643 self._log_end('load_block') 644 else: 645 raise ValueError(meshfn) 646 return obj
647
648 - def make_solver_keywords(self):
649 """ 650 Return keywords to initialize solvers. 651 652 @return: keywords 653 @rtype: dict 654 """ 655 return dict( 656 ncore=self.execution.ncore, 657 neq=self.execution.neq, 658 fpdtype=self.execution.fpdtype, 659 enable_mesg=self.io.solver_output, 660 )
661 662 # solver object initialization/binding/loading.
663 - def _local_init_solver(self):
664 """ 665 @return: nothing 666 """ 667 svr = self.solver.solvertype( 668 self.solver.domainobj.blk, **self.make_solver_keywords()) 669 self.runhooks.drop_anchor(svr) 670 svr.bind() 671 svr.ibcthread = self.solver.ibcthread 672 svr.init() 673 self.solver.solverobj = svr
674 - def _local_bind_solver(self):
675 """ 676 @return: nothing 677 """ 678 self.solver.solverobj.bind() 679 self.solver.solverobj.ibcthread = self.solver.ibcthread 680 self.solver.domainobj.bind()
681 - def _remote_init_solver(self):
682 """ 683 @return: nothing 684 """ 685 dealer = self.solver.dealer 686 solvertype = self.solver.solvertype 687 dom = self.solver.domainobj 688 nblk = dom.nblk 689 for iblk in range(nblk): 690 svrkw = self.make_solver_keywords() 691 self.info('solver #%d/(%d-1): ' % (iblk, nblk)) 692 if dom.presplit: 693 dealer[iblk].create_solver(self.condition.bcmap, 694 self.io.meshfn, self.io.domain.splitfn[iblk], 695 iblk, nblk, solvertype, svrkw) 696 self.runhooks.drop_anchor(dealer[iblk]) 697 else: 698 sbk = dom[iblk] 699 svr = solvertype(sbk, **svrkw) 700 self.info('sending ... ') 701 svr.svrn = iblk 702 svr.nsvr = nblk 703 self.runhooks.drop_anchor(svr) 704 svr.unbind() # ensure no pointers (unpicklable) in solver. 705 dealer[iblk].remote_setattr('muscle', svr) 706 self.info('done.\n') 707 self.info('Bind/Init ... ') 708 for sdw in dealer: sdw.cmd.bind() 709 for sdw in dealer: sdw.cmd.remote_setattr('ibcthread', 710 self.solver.ibcthread) 711 for sdw in dealer: sdw.cmd.init() 712 dealer.barrier() 713 self.info('done.\n')
714 - def _remote_load_solver(self):
715 """ 716 @return: nothing 717 """ 718 dealer = self.solver.dealer 719 nblk = self.solver.domainobj.nblk 720 for iblk in range(nblk): 721 dealer[iblk].remote_loadobj('muscle', 722 self.io.dump.svrfntmpl % str(iblk)) 723 for sdw in dealer: sdw.cmd.bind() 724 for sdw in dealer: sdw.cmd.remote_setattr('ibcthread', 725 self.solver.ibcthread) 726 dealer.barrier()
727 728 # workers and worker manager (dealer) creation.
729 - def _create_workers(self):
730 """ 731 Make dealer and create workers for the dealer. 732 733 @return: worker manager. 734 @rtype: solvcon.rpc.Dealer 735 """ 736 from .rpc import Dealer 737 nblk = self.solver.domainobj.nblk 738 flag_parallel = self.is_parallel 739 if flag_parallel == 1: 740 family = None 741 create_workers = self._create_workers_local 742 elif flag_parallel == 2: 743 family = 'AF_INET' 744 create_workers = self._create_workers_remote 745 dealer = Dealer(family=family) 746 create_workers(dealer, nblk) 747 return dealer
748 - def _get_profiler_data(self, iblk):
749 from .conf import env 750 if env.command != None: 751 ops, args = env.command.opargs 752 if getattr(ops, 'use_profiler'): 753 return ( 754 ops.profiler_dat+'%d'%iblk, 755 ops.profiler_log+'%d'%iblk, 756 ops.profiler_sort, 757 ) 758 return None
759 - def _create_workers_local(self, dealer, nblk):
760 from .rpc import Worker 761 for iblk in range(nblk): 762 dealer.hire(Worker(None, 763 profiler_data=self._get_profiler_data(iblk)))
764 - def _create_workers_remote(self, dealer, nblk):
765 import os, sys 766 from .rpc import DEFAULT_AUTHKEY 767 from .conf import env 768 info = self.info 769 authkey = DEFAULT_AUTHKEY 770 paths = dict([(key, os.environ.get(key, '').split(':')) for key in 771 'LD_LIBRARY_PATH', 772 'PYTHONPATH', 773 ]) # TODO: make sure VTK in LD_LIBRARY_PATH. 774 paths['PYTHONPATH'].extend(self.pythonpaths) 775 paths['PYTHONPATH'].insert(0, self.io.rootdir) 776 # appoint remote worker objects. 777 info('Appoint remote workers') 778 bat = self.execution.batch(self) 779 nodelist = bat.nodelist() 780 if env.command != None and env.command.opargs[0].compress_nodelist: 781 info(' (compressed)') 782 if env.mpi: 783 info(' (head excluded for MPI)') 784 info(':\n') 785 iworker = 0 786 for node in nodelist: 787 info(' %s' % node.name) 788 port = bat.create_worker(node, authkey, 789 envar=self.solver.envar, paths=paths, 790 profiler_data=self._get_profiler_data(iworker)) 791 info(' worker #%d created' % iworker) 792 dealer.appoint(node.address, port, authkey) 793 info(' and appointed.\n') 794 iworker += 1 795 if len(dealer) != nblk: 796 raise IndexError('%d != %d' % (len(dealer), nblk)) 797 # create remote killer script. 798 if self.io.rkillfn: 799 f = open(self.io.rkillfn, 'w') 800 f.write("""#!/bin/sh 801 nodes=" 802 %s 803 " 804 for node in $nodes; do rsh $node killall %s; done 805 """ % ( 806 '\n'.join([node.name for node in nodelist]), 807 os.path.split(sys.executable)[-1], 808 ))
809 810 # interconnection.
811 - def _interconnect(self):
812 """ 813 Make interconnections for distributed solver objects. 814 815 @return: nothing 816 """ 817 dom = self.solver.domainobj 818 dealer = self.solver.dealer 819 dwidth = len(str(dom.nblk-1)) 820 oblk = -1 821 for iblk, jblk in dom.ifparr: 822 if iblk != oblk: 823 if oblk != -1: 824 self.info('.\n') 825 self.info(('%%0%dd ->' % dwidth) % iblk) 826 oblk = iblk 827 dealer.bridge((iblk, jblk)) 828 self.info((' %%0%dd'%dwidth) % jblk) 829 self.info('.\n') 830 dealer.barrier()
831 832 # interface.
833 - def _init_interface(self):
834 """ 835 Exchange meta data. 836 837 @return: nothing 838 """ 839 dom = self.solver.domainobj 840 dealer = self.solver.dealer 841 nblk = dom.nblk 842 iflists = dom.make_iflist_per_block() 843 self.info('Interface exchanging pairs (%d phases):\n' % len( 844 iflists[0])) 845 dwidth = len(str(nblk-1)) 846 for iblk in range(nblk): 847 ifacelist = iflists[iblk] 848 sdw = dealer[iblk] 849 sdw.cmd.init_exchange(ifacelist) 850 # print. 851 self.info(('%%0%dd ->' % dwidth) % iblk) 852 for pair in ifacelist: 853 if pair < 0: 854 stab = '-' * (2*dwidth+1) 855 else: 856 stab = '-'.join([('%%0%dd'%dwidth)%item for item in pair]) 857 self.info(' %s' % stab) 858 self.info('\n')
859
860 - def _exchange_metric(self):
861 """ 862 Exchange metric data for solver. 863 864 @return: nothing 865 """ 866 dealer = self.solver.dealer 867 for arrname in self.solver.solvertype._interface_init_: 868 for sdw in dealer: sdw.cmd.exchangeibc(arrname, 869 with_worker=True)
870 871 ############################################################################ 872 ### 873 ### End of block of case initialization logics. 874 ### 875 ############################################################################ 876 877 ############################################################################ 878 ### 879 ### Begin of block of case execution. 880 ### 881 ############################################################################ 882
883 - def run(self, level=0):
884 """ 885 Run the simulation case; time marching. 886 887 @keyword level: run level; higher level does less work. 888 @type level: int 889 890 @return: nothing. 891 """ 892 self._log_start('run', msg=' (level %d) %s' % (level, self.io.basefn)) 893 self.execution.step_current = self.execution.step_init 894 if level < 1: 895 self._run_provide() 896 self._run_preloop() 897 if level < 2: 898 self._run_march() 899 self._run_postloop() 900 self._run_exhaust() 901 else: # level == 2. 902 self.dump() 903 self._run_final() 904 self._log_end('run', msg=' '+self.io.basefn)
905 906 # logics before entering main loop (march).
907 - def _run_provide(self):
908 dealer = self.solver.dealer 909 flag_parallel = self.is_parallel 910 # anchor: provide. 911 if flag_parallel: 912 for sdw in dealer: sdw.cmd.provide() 913 else: 914 self.solver.solverobj.provide()
915 - def _run_preloop(self):
916 dealer = self.solver.dealer 917 flag_parallel = self.is_parallel 918 # hook: preloop. 919 self.runhooks('preloop') 920 if flag_parallel: 921 for sdw in dealer: sdw.cmd.preloop() 922 for arrname in self.solver.solvertype._solution_array_: 923 for sdw in dealer: 924 sdw.cmd.exchangeibc(arrname, with_worker=True) 925 for sdw in dealer: sdw.cmd.boundcond() 926 else: 927 self.solver.solverobj.preloop() 928 self.solver.solverobj.boundcond()
929
930 - def _run_march(self):
931 from time import time as timer 932 dealer = self.solver.dealer 933 flag_parallel = self.is_parallel 934 self.log.time['solver_march'] = 0.0 935 self.info('\n') 936 self._log_start('loop_march') 937 while self.execution.step_current < self.execution.steps_run: 938 # the first thing is detecting for dynamic codes. 939 self._dynamic_execute() 940 if self.execution.stop: break 941 # dump before hooks. 942 if self.execution.steps_dump != None and \ 943 self.execution.step_current != self.execution.step_restart and \ 944 self.execution.step_current%self.execution.steps_dump == 0: 945 self.dump() 946 # hook: premarch. 947 self.runhooks('premarch') 948 # march. 949 solver_march_marker = timer() 950 steps_stride = self.execution.steps_stride 951 time_increment = self.execution.time_increment 952 time = self.execution.step_current*time_increment 953 if flag_parallel: 954 for sdw in dealer: sdw.cmd.march(time, time_increment, 955 steps_stride, with_worker=True) 956 self.execution.marchret = [sdw.recv() for sdw in dealer] 957 else: 958 self.execution.marchret = self.solver.solverobj.march(time, 959 time_increment, steps_stride) 960 self.execution.time += time_increment*steps_stride 961 self.log.time['solver_march'] += timer() - solver_march_marker 962 self.execution.step_current += steps_stride 963 # hook: postmarch. 964 self.runhooks('postmarch') 965 # end log. 966 self._log_end('loop_march') 967 self.info('\n')
968 969 # logics after exiting main loop (march).
970 - def _run_postloop(self):
971 dealer = self.solver.dealer 972 flag_parallel = self.is_parallel 973 # hook: postloop. 974 if flag_parallel: 975 for sdw in dealer: sdw.cmd.postloop() 976 else: 977 self.solver.solverobj.postloop() 978 self.runhooks('postloop')
979 - def _run_exhaust(self):
980 dealer = self.solver.dealer 981 flag_parallel = self.is_parallel 982 # anchor: exhaust. 983 if flag_parallel: 984 for sdw in dealer: sdw.cmd.exhaust() 985 else: 986 self.solver.solverobj.exhaust()
987 - def _run_final(self):
988 dealer = self.solver.dealer 989 flag_parallel = self.is_parallel 990 # finalize. 991 if flag_parallel: 992 for sdw in dealer: sdw.cmd.final() 993 self.solver.dealer.terminate() 994 else: 995 self.solver.solverobj.final()
996 997 ############################################################################ 998 ### 999 ### End of block of case execution. 1000 ### 1001 ############################################################################ 1002