Package solvcon :: Module batch
[hide private]
[frames] | no frames]

Source Code for Module solvcon.batch

  1  # -*- coding: UTF-8 -*- 
  2  # 
  3  # Copyright (C) 2008-2010 Yung-Yu Chen <yyc@solvcon.net>. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License along 
 16  # with this program; if not, write to the Free Software Foundation, Inc., 
 17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 18   
 19  """ 
 20  Basic support for cluster batch systems. 
 21   
 22  * batregy: The registry containing all usable batch system abstraction. 
 23  * Batch: The fundamental class for batch systems. 
 24  * Localhost: A dummy batch systems for domain decomposition working on 
 25      localhost. 
 26  * Torque: The Torque batch system. 
 27  """ 
 28   
 29  from .gendata import SingleAssignDict, AttributeDict 
30 31 -class Node(object):
32 - def __init__(self, name, ncore=1, serial=None, attrs=None):
33 self.name = name 34 self.ncore = ncore 35 self.serial = serial 36 self.pserial = serial 37 self.attrs = attrs if attrs != None else list()
38
39 - def __str__(self):
40 attrstr = ','.join(self.attrs) 41 return '{%s(%d)%s}' % (self.name, self.ncore, 42 ': '+attrstr if attrstr else '')
43 44 @property
45 - def address(self):
46 from socket import gethostbyname 47 return gethostbyname(self.name)
48
49 -class BatchRegistry(SingleAssignDict, AttributeDict):
50 - def register(self, cmdtype):
51 name = cmdtype.__name__ 52 if name in self: 53 raise KeyError('%s was already registered as a batch' % name) 54 self[name] = cmdtype 55 return cmdtype
56 batregy = BatchRegistry() # registry singleton.
57 58 -class BatchMeta(type):
59 - def __new__(cls, name, bases, namespace):
60 newcls = super(BatchMeta, cls).__new__(cls, name, bases, namespace) 61 # register. 62 batregy.register(newcls) 63 return newcls
64
65 -class Batch(object):
66 """ 67 Batch system submitter. 68 69 @cvar _subcmd_: the command name for the batch submission. 70 @ctype _subcmd_: str 71 @cvar BASH_HOME_SOURCE: the files to be sourced in user's home. 72 @ctype BASH_HOME_SOURCE: list 73 @ivar case: The Case corresponding object. 74 @itype case: solvcon.case.core.Case 75 @ivar arnname: The name of the arrangement to be run. 76 @itype arnname: str 77 @ivar jobname: The name to send to batch system. 78 @itype jobname: str 79 @ivar jobdir: The absolute path for job. 80 @itype jobdir: str 81 @ivar output: Batch output type. 82 @itype output: str 83 @ivar shell: Shell to be used on the cluster. 84 @itype shell: str 85 @ivar use_mpi: Indicate to use MPI as transport layer. 86 @itype use_mpi: bool 87 @ivar resource: Specified resources. 88 @itype resource: dict 89 """ 90 91 __metaclass__ = BatchMeta 92 93 _subcmd_ = 'qsub' 94 95 DEFAULT_OUTPUT = 'oe' 96 DEFAULT_SHELL = '/bin/sh' 97 BASH_HOME_SOURCE = [] 98
99 - def __init__(self, case, **kw):
100 """ 101 @keyword rootdir: Root directory for the project/code. 102 @type rootdir: str 103 @keyword arnname: The arrangement to be run. 104 @type arnname: str 105 """ 106 import os 107 self.case = case 108 self.arnname = kw.pop('arnname', None) 109 self.jobname = kw.pop('jobname', self.arnname) 110 if case.io.basedir == None: 111 if self.jobname != None: 112 self.jobdir = os.path.abspath(os.path.join( 113 self.case.io.rootdir, self.jobname)) 114 else: 115 self.jobdir = None 116 else: 117 self.jobdir = case.io.basedir 118 self.output = kw.pop('output', self.DEFAULT_OUTPUT) 119 self.shell = kw.pop('shell', self.DEFAULT_SHELL) 120 self.use_mpi = kw.pop('use_mpi', False) 121 self.resource = case.execution.resources.copy() 122 self.resource.update(kw) 123 super(Batch, self).__init__()
124 125 @property
126 - def str_header(self):
127 return '#!/bin/sh'
128 129 @property
130 - def str_resource(self):
131 raise NotImplementedError
132 133 @property
134 - def str_jobname(self):
135 raise NotImplementedError
136 137 @property
138 - def str_output(self):
139 raise NotImplementedError
140 141 @property
142 - def str_shell(self):
143 raise NotImplementedError
144 145 @property
146 - def str_path(self):
147 import os 148 ret = list() 149 ret.append('echo "Customized paths for job:"') 150 # source bash rc files per user. 151 home = os.environ['HOME'] 152 for fn in self.BASH_HOME_SOURCE: 153 if os.path.exists(os.path.join(home, fn)): 154 ret.append('. $HOME/%s'%fn) 155 ret.append('export PYTHONPATH=%s:$PYTHONPATH' % self.case.io.rootdir) 156 return '\n'.join(ret)
157 158 @property
159 - def str_prerun(self):
160 msgs = list() 161 envar = self.case.solver.envar 162 if envar != None: 163 for key in envar: 164 msgs.append('export %s=%s' % (key, envar[key])) 165 msgs.append('echo "Run @`date`:"') 166 return '\n'.join(msgs)
167 168 @property
169 - def str_postrun(self):
170 return 'echo "Finish @`date`."'
171
172 - def build_scg_command(self):
173 import os 174 from .conf import env 175 scgargs = ' '.join(['run', self.arnname]) 176 if env.command != None: 177 ops, args = env.command.opargs 178 scgops = list() 179 if ops.npart != None: 180 scgops.append('\\\n') 181 scgops.append('--npart=%d' % ops.npart) 182 scgops.append('--batch=%s' % ops.batch) 183 if ops.envar: 184 scgops.append('\\\n') 185 envar = env.command.envar 186 scgops.append('--envar %s' % ':'.join([ 187 '%s=%s' % (key, envar[key]) for key in envar 188 ])) 189 if ops.compress_nodelist: 190 scgops.append('--compress-nodelist') 191 if ops.use_profiler: 192 scgops.append('\\\n') 193 scgops.append('--use-profiler') 194 scgops.extend([ 195 '--profiler-sort=%s' % ops.profiler_sort, 196 '--profiler-dat=%s' % ops.profiler_dat, 197 '--profiler-log=%s' % ops.profiler_log, 198 ]) 199 if ops.solver_output: 200 scgops.append('--solver-output') 201 if ops.basedir: 202 scgops.append('\\\n') 203 scgops.append('--basedir=%s' % os.path.abspath(ops.basedir)) 204 scgops = ' '.join(scgops) 205 else: 206 scgops = '' 207 scgops = '--runlevel %%d %s' % scgops 208 return ' '.join([env.get_entry_point(), scgargs, scgops])
209
210 - def build_mpi_runner(self):
211 return ''
212 213 @property
214 - def str_run(self):
215 cmds = ['time'] 216 if self.use_mpi: 217 mpi_runner = self.build_mpi_runner() 218 if not mpi_runner: 219 raise RuntimeError( 220 '%s gave null mpi_runner' % str(self.__class__)) 221 cmds.append(mpi_runner) 222 cmds.append(self.build_scg_command()) 223 return '\n'.join([ 224 'cd %s' % self.jobdir, 225 ' '.join(cmds).strip() 226 ])
227
228 - def __str__(self):
229 """ 230 Collect all string properties. 231 """ 232 return '\n'.join([ 233 self.str_header, 234 self.str_resource, 235 self.str_jobname, 236 self.str_output, 237 self.str_shell, 238 self.str_path, 239 self.str_prerun, 240 self.str_run, 241 self.str_postrun, 242 ])
243
244 - def tofile(self, basename=None):
245 """ 246 Write self into the file for the submitting script. 247 """ 248 import os 249 from glob import glob 250 info = self.case.info 251 basename = self.jobname+'.pbs' if basename == None else basename 252 if os.path.exists(self.jobdir): 253 info('Job directory was there: %s\n' % self.jobdir) 254 if self.case.io.empty_jobdir: 255 info('Delete all file in job directory.\n') 256 for fn in glob(os.path.join(self.jobdir, '*')): 257 os.unlink(fn) 258 else: 259 os.makedirs(self.jobdir) 260 fn = os.path.abspath(os.path.join(self.jobdir, basename)) 261 fnlist = list() 262 for ilevel in range(3): 263 fnlist.append(fn+str(ilevel)) 264 f = open(fnlist[-1], 'w') 265 f.write(str(self) % ilevel) 266 f.close() 267 return fnlist
268
269 - def __call__(self, runlevel=0, basename=None, postpone=False):
270 """ 271 Make submitting script and invoke the batch system. 272 """ 273 import os 274 from subprocess import call 275 info = self.case.info 276 fnlist = self.tofile(basename=basename) 277 os.chdir(self.jobdir) 278 if postpone: 279 return 280 else: 281 info('submit runlevel %d\n' % runlevel) 282 return call('%s %s'%(self._subcmd_, fnlist[runlevel]), shell=True)
283
284 - def nodelist(self):
285 raise NotImplementedError
286
287 - def create_worker(self, *args, **kw):
288 """ 289 True implementations are in create_worker_*() and have identical spec. 290 """ 291 raise NotImplementedError
292
293 - def create_worker_ssh(self, node, authkey, 294 envar=None, paths=None, profiler_data=None):
295 """ 296 Use secure shell to create worker object. 297 298 @param node: node information. 299 @type node: Node 300 @param authkey: the authkey for the worker. 301 @type authkey: str 302 @keyword envar: additional environment variables to remote. 303 @type envar: dict 304 @keyword paths: path for remote execution. 305 @type paths: dict 306 @keyword profiler_data: profiler setting for remote worker. 307 @type profiler_data: tuple 308 @return: the port that the remote worker listen on. 309 @rtype: int 310 """ 311 import os 312 from subprocess import PIPE 313 from .connection import Client # XXX: no need. 314 from .rpc import SecureShell 315 remote = SecureShell(node.address, paths=paths) 316 # determine remotely available port. 317 val = int(remote([ 318 'import sys', 319 'from solvcon.connection import pick_unused_port', 320 'sys.stdout.write(str(pick_unused_port()))', 321 ], stdout=PIPE)) 322 try: 323 port = int(val) 324 except ValueError: 325 raise IOError, 'remote port detection fails' 326 # create remote worker objects and return. 327 pdata = str(profiler_data).replace("'", '"') 328 remote([ 329 'import os', 330 'os.chdir("%s")' % os.path.abspath(os.getcwd()), 331 'from solvcon.rpc import Worker', 332 'wkr = Worker(None, profiler_data=%s)' % pdata, 333 'wkr.run(("%s", %d), "%s")' % (node.address, port, authkey), 334 ], envar=envar) 335 return port
336
337 - def create_worker_mpi(self, node, authkey, 338 envar=None, paths=None, profiler_data=None):
339 """ 340 Use MPI to link remote worker object. 341 342 @param node: node information. 343 @type node: Node 344 @param authkey: the authkey for the worker. 345 @type authkey: str 346 @keyword envar: additional environment variables to remote. 347 @type envar: dict 348 @keyword paths: path for remote execution. 349 @type paths: dict 350 @keyword profiler_data: profiler setting for remote worker. 351 @type profiler_data: tuple 352 @return: the port that the remote worker listen on. 353 @rtype: int 354 """ 355 from .conf import env 356 return env.mpi.recv(node.serial, 1)
357
358 -class Localhost(Batch):
359 """ 360 Dummy batch abstraction for localhost. 361 """
362 - def nodelist(self):
363 return [Node('127.0.0.1', ncore=1, serial=i) 364 for i in range(self.case.execution.npart)]
365 - def create_worker(self, *args, **kw):
366 return self.create_worker_ssh(*args, **kw)
367
368 -class Torque(Batch):
369 """ 370 Torque/OpenPBS. 371 """ 372
373 - def __init__(self, case, **kw):
374 super(Torque, self).__init__(case, **kw) 375 self._nodelist = None
376 377 @property
378 - def str_resource(self):
379 res = self.resource.copy() 380 if self.case.execution.npart != None: 381 res['nodes'] = self.case.execution.npart 382 if self.use_mpi: 383 res['nodes'] += 1 384 # build resource tokens. 385 tokens = list() 386 for key in sorted(res.keys()): 387 val = res[key] 388 if val == None: 389 token = key 390 else: 391 token = '%s=%s' % (key, val) 392 if token: 393 tokens.append(token) 394 # nodes and ppn must be together. 395 idx1 = 0 396 while idx1 < len(tokens): 397 if 'nodes' in tokens[idx1]: 398 break 399 idx1 += 1 400 idx2 = 0 401 while idx2 < len(tokens): 402 if 'ppn' in tokens[idx2]: 403 break 404 idx2 += 1 405 if idx1 != len(tokens) and idx2 != len(tokens) and idx1 != idx2: 406 tokens[idx1] = ':'.join([tokens[idx1], tokens[idx2]]) 407 del tokens[idx2] 408 # return resource string. 409 if tokens: 410 return '#PBS -l %s' % ','.join(tokens) 411 else: 412 return ''
413 414 @property
415 - def str_jobname(self):
416 return '#PBS -N %s' % self.jobname
417 418 @property
419 - def str_output(self):
420 return '#PBS -j %s' % self.output
421 422 @property
423 - def str_shell(self):
424 return '#PBS -S %s' % self.shell
425
426 - def nodelist(self):
427 import os 428 from .conf import env 429 if not self._nodelist: 430 # read node file. 431 f = open(os.environ['PBS_NODEFILE']) 432 entries = [item.strip() for item in f.readlines()] 433 f.close() 434 nodelist = [Node(entries[it], ncore=1, serial=it) for it in 435 range(len(entries))] 436 # compress nodelist. 437 if env.command != None: 438 ops, args = env.command.opargs 439 if ops.compress_nodelist: 440 cnodelist = [nodelist[0]] 441 for nodeitem in nodelist[1:]: 442 cnodeitem = cnodelist[-1] 443 if nodeitem.address == cnodeitem.address: 444 cnodeitem.ncore += 1 445 else: 446 nodeitem.serial = len(cnodelist) 447 cnodelist.append(nodeitem) 448 nodelist = cnodelist 449 # exclude head when using MPI. 450 if env.mpi: 451 nodelist = nodelist[1:] 452 # cut nodelist. 453 self._nodelist = nodelist[:self.case.execution.npart] 454 return self._nodelist
455
456 - def create_worker_torque(self, node, authkey, 457 envar=None, paths=None, profiler_data=None):
458 """ 459 Use Torque TM API to create worker object. 460 461 @param node: node information. 462 @type node: Node 463 @param authkey: the authkey for the worker. 464 @type authkey: str 465 @keyword envar: additional environment variables to remote. 466 @type envar: dict 467 @keyword paths: path for remote execution. 468 @type paths: dict 469 @keyword profiler_data: profiler setting for remote worker. 470 @type profiler_data: tuple 471 @return: the port that the remote worker listen on. 472 @rtype: int 473 """ 474 import sys, os 475 from threading import Thread 476 from Queue import Queue 477 from .batch_torque import TaskManager 478 from .connection import pick_unused_port 479 # setup listener for remote port. 480 myhost = self.nodelist()[0].address 481 myport = pick_unused_port() 482 portq = Queue() 483 def get_port(): 484 from .connection import Listener 485 lsnr = Listener((myhost, myport), authkey=authkey) 486 conn = lsnr.accept() 487 portq.put(conn.recv()) 488 conn.close()
489 thd = Thread(target=get_port) 490 thd.start() 491 # start remote worker. 492 tm = TaskManager(paths=paths) 493 tm.spawn(sys.executable, '-c', 494 "from solvcon.batch_torque import run_worker; " 495 "run_worker(%s, %s, %s, %s, %s)" % ( 496 "('%s', %d)" % (myhost, myport), 497 "'%s'" % os.getcwd(), 498 str(profiler_data), 499 "'%s'" % node.address, 500 "'%s'" % authkey, 501 ), where=node.pserial, envar=envar) 502 # stop listening thread. 503 thd.join() 504 return portq.get()
505
506 - def create_worker(self, *args, **kw):
507 from .conf import env 508 from .batch_torque import TaskManager 509 if env.mpi: 510 return self.create_worker_mpi(*args, **kw) 511 elif TaskManager._clib_torque: 512 return self.create_worker_torque(*args, **kw) 513 else: 514 return self.create_worker_ssh(*args, **kw)
515