Package solvcon :: Module batch
[hide private]
[frames] | no frames]

Source Code for Module solvcon.batch

  1  # -*- coding: UTF-8 -*- 
  2  # 
  3  # Copyright (C) 2008-2010 Yung-Yu Chen <yyc@solvcon.net>. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License along 
 16  # with this program; if not, write to the Free Software Foundation, Inc., 
 17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 18   
 19  """ 
 20  Basic support for cluster batch systems. 
 21   
 22  * batregy: The registry containing all usable batch system abstraction. 
 23  * Batch: The fundamental class for batch systems. 
 24  * Localhost: A dummy batch systems for domain decomposition working on 
 25      localhost. 
 26  * Torque: The Torque batch system. 
 27  """ 
 28   
 29  from .gendata import TypeNameRegistry 
30 31 -class Node(object):
32 - def __init__(self, name, ncore=1, serial=None, attrs=None):
33 self.name = name 34 self.ncore = ncore 35 self.serial = serial 36 self.pserial = serial 37 self.attrs = attrs if attrs != None else list()
38
39 - def __str__(self):
40 attrstr = ','.join(self.attrs) 41 return '{%s(%d)%s}' % (self.name, self.ncore, 42 ': '+attrstr if attrstr else '')
43 44 @property
45 - def address(self):
46 from socket import gethostbyname 47 return gethostbyname(self.name)
48 49 batregy = TypeNameRegistry() # registry singleton.
50 -class BatchMeta(type):
51 - def __new__(cls, name, bases, namespace):
52 newcls = super(BatchMeta, cls).__new__(cls, name, bases, namespace) 53 # register. 54 batregy.register(newcls) 55 return newcls
56
57 -class Batch(object):
58 """ 59 Batch system submitter. 60 61 @cvar _subcmd_: the command name for the batch submission. 62 @ctype _subcmd_: str 63 @cvar BASH_HOME_SOURCE: the files to be sourced in user's home. 64 @ctype BASH_HOME_SOURCE: list 65 @ivar case: The Case corresponding object. 66 @itype case: solvcon.case.core.Case 67 @ivar arnname: The name of the arrangement to be run. 68 @itype arnname: str 69 @ivar jobname: The name to send to batch system. 70 @itype jobname: str 71 @ivar jobdir: The absolute path for job. 72 @itype jobdir: str 73 @ivar output: Batch output type. 74 @itype output: str 75 @ivar shell: Shell to be used on the cluster. 76 @itype shell: str 77 @ivar use_mpi: Indicate to use MPI as transport layer. 78 @itype use_mpi: bool 79 @ivar resource: Specified resources. 80 @itype resource: dict 81 """ 82 83 __metaclass__ = BatchMeta 84 85 _subcmd_ = 'qsub' 86 87 DEFAULT_OUTPUT = 'oe' 88 DEFAULT_SHELL = '/bin/sh' 89 BASH_HOME_SOURCE = [] 90
91 - def __init__(self, case, **kw):
92 """ 93 @keyword rootdir: Root directory for the project/code. 94 @type rootdir: str 95 @keyword arnname: The arrangement to be run. 96 @type arnname: str 97 """ 98 import os 99 self.case = case 100 self.arnname = kw.pop('arnname', None) 101 self.jobname = kw.pop('jobname', self.arnname) 102 if case.io.basedir == None: 103 if self.jobname != None: 104 self.jobdir = os.path.abspath(os.path.join( 105 self.case.io.rootdir, self.jobname)) 106 else: 107 self.jobdir = None 108 else: 109 self.jobdir = case.io.basedir 110 self.output = kw.pop('output', self.DEFAULT_OUTPUT) 111 self.shell = kw.pop('shell', self.DEFAULT_SHELL) 112 self.use_mpi = kw.pop('use_mpi', False) 113 self.resource = case.execution.resources.copy() 114 self.resource.update(kw) 115 super(Batch, self).__init__()
116 117 @property
118 - def str_header(self):
119 return '#!/bin/sh'
120 121 @property
122 - def str_resource(self):
123 raise NotImplementedError
124 125 @property
126 - def str_jobname(self):
127 raise NotImplementedError
128 129 @property
130 - def str_output(self):
131 raise NotImplementedError
132 133 @property
134 - def str_shell(self):
135 raise NotImplementedError
136 137 @property
138 - def str_path(self):
139 import os 140 ret = list() 141 ret.append('echo "Customized paths for job:"') 142 # source bash rc files per user. 143 home = os.environ['HOME'] 144 for fn in self.BASH_HOME_SOURCE: 145 if os.path.exists(os.path.join(home, fn)): 146 ret.append('. $HOME/%s'%fn) 147 ret.append('export PYTHONPATH=%s:$PYTHONPATH' % self.case.io.rootdir) 148 return '\n'.join(ret)
149 150 @property
151 - def str_prerun(self):
152 msgs = list() 153 envar = self.case.solver.envar 154 if envar != None: 155 for key in envar: 156 msgs.append('export %s=%s' % (key, envar[key])) 157 msgs.append('echo "Run @`date`:"') 158 return '\n'.join(msgs)
159 160 @property
161 - def str_postrun(self):
162 return 'echo "Finish @`date`."'
163
164 - def build_scg_command(self):
165 import os 166 from .conf import env 167 scgargs = ' '.join(['run', self.arnname]) 168 if env.command != None: 169 ops, args = env.command.opargs 170 scgops = list() 171 if ops.npart != None: 172 scgops.append('\\\n') 173 scgops.append('--npart=%d' % ops.npart) 174 scgops.append('--batch=%s' % ops.batch) 175 if ops.envar: 176 scgops.append('\\\n') 177 envar = env.command.envar 178 scgops.append('--envar %s' % ':'.join([ 179 '%s=%s' % (key, envar[key]) for key in envar 180 ])) 181 if ops.compress_nodelist: 182 scgops.append('--compress-nodelist') 183 if ops.use_profiler: 184 scgops.append('\\\n') 185 scgops.append('--use-profiler') 186 scgops.extend([ 187 '--profiler-sort=%s' % ops.profiler_sort, 188 '--profiler-dat=%s' % ops.profiler_dat, 189 '--profiler-log=%s' % ops.profiler_log, 190 ]) 191 if ops.solver_output: 192 scgops.append('--solver-output') 193 if ops.basedir: 194 scgops.append('\\\n') 195 scgops.append('--basedir=%s' % os.path.abspath(ops.basedir)) 196 scgops = ' '.join(scgops) 197 else: 198 scgops = '' 199 scgops = '--runlevel %%d %s' % scgops 200 return ' '.join([env.get_entry_point(), scgargs, scgops])
201
202 - def build_mpi_runner(self):
203 return ''
204 205 @property
206 - def str_run(self):
207 cmds = ['time'] 208 if self.use_mpi: 209 mpi_runner = self.build_mpi_runner() 210 if not mpi_runner: 211 raise RuntimeError( 212 '%s gave null mpi_runner' % str(self.__class__)) 213 cmds.append(mpi_runner) 214 cmds.append(self.build_scg_command()) 215 return '\n'.join([ 216 'cd %s' % self.jobdir, 217 ' '.join(cmds).strip() 218 ])
219
220 - def __str__(self):
221 """ 222 Collect all string properties. 223 """ 224 return '\n'.join([ 225 self.str_header, 226 self.str_resource, 227 self.str_jobname, 228 self.str_output, 229 self.str_shell, 230 self.str_path, 231 self.str_prerun, 232 self.str_run, 233 self.str_postrun, 234 ])
235
236 - def tofile(self, basename=None):
237 """ 238 Write self into the file for the submitting script. 239 """ 240 import os 241 from glob import glob 242 info = self.case.info 243 basename = self.jobname+'.pbs' if basename == None else basename 244 if os.path.exists(self.jobdir): 245 info('Job directory was there: %s\n' % self.jobdir) 246 if self.case.io.empty_jobdir: 247 info('Delete all file in job directory.\n') 248 for fn in glob(os.path.join(self.jobdir, '*')): 249 os.unlink(fn) 250 else: 251 os.makedirs(self.jobdir) 252 fn = os.path.abspath(os.path.join(self.jobdir, basename)) 253 fnlist = list() 254 for ilevel in range(3): 255 fnlist.append(fn+str(ilevel)) 256 f = open(fnlist[-1], 'w') 257 f.write(str(self) % ilevel) 258 f.close() 259 return fnlist
260
261 - def __call__(self, runlevel=0, basename=None, postpone=False):
262 """ 263 Make submitting script and invoke the batch system. 264 """ 265 import os 266 from subprocess import call 267 info = self.case.info 268 fnlist = self.tofile(basename=basename) 269 os.chdir(self.jobdir) 270 if postpone: 271 return 272 else: 273 info('submit runlevel %d\n' % runlevel) 274 return call('%s %s'%(self._subcmd_, fnlist[runlevel]), shell=True)
275
276 - def nodelist(self):
277 raise NotImplementedError
278
279 - def create_worker(self, *args, **kw):
280 """ 281 True implementations are in create_worker_*() and have identical spec. 282 """ 283 raise NotImplementedError
284
285 - def create_worker_ssh(self, node, authkey, 286 envar=None, paths=None, profiler_data=None):
287 """ 288 Use secure shell to create worker object. 289 290 @param node: node information. 291 @type node: Node 292 @param authkey: the authkey for the worker. 293 @type authkey: str 294 @keyword envar: additional environment variables to remote. 295 @type envar: dict 296 @keyword paths: path for remote execution. 297 @type paths: dict 298 @keyword profiler_data: profiler setting for remote worker. 299 @type profiler_data: tuple 300 @return: the port that the remote worker listen on. 301 @rtype: int 302 """ 303 import os 304 from subprocess import PIPE 305 from .connection import Client # XXX: no need. 306 from .rpc import SecureShell 307 remote = SecureShell(node.address, paths=paths) 308 # determine remotely available port. 309 val = int(remote([ 310 'import sys', 311 'from solvcon.connection import pick_unused_port', 312 'sys.stdout.write(str(pick_unused_port()))', 313 ], stdout=PIPE)) 314 try: 315 port = int(val) 316 except ValueError: 317 raise IOError, 'remote port detection fails' 318 # create remote worker objects and return. 319 pdata = str(profiler_data).replace("'", '"') 320 remote([ 321 'import os', 322 'os.chdir("%s")' % os.path.abspath(os.getcwd()), 323 'from solvcon.rpc import Worker', 324 'wkr = Worker(None, profiler_data=%s)' % pdata, 325 'wkr.run(("%s", %d), "%s")' % (node.address, port, authkey), 326 ], envar=envar) 327 return port
328
329 - def create_worker_mpi(self, node, authkey, 330 envar=None, paths=None, profiler_data=None):
331 """ 332 Use MPI to link remote worker object. 333 334 @param node: node information. 335 @type node: Node 336 @param authkey: the authkey for the worker. 337 @type authkey: str 338 @keyword envar: additional environment variables to remote. 339 @type envar: dict 340 @keyword paths: path for remote execution. 341 @type paths: dict 342 @keyword profiler_data: profiler setting for remote worker. 343 @type profiler_data: tuple 344 @return: the port that the remote worker listen on. 345 @rtype: int 346 """ 347 from .conf import env 348 return env.mpi.recv(node.serial, 1)
349
350 -class Localhost(Batch):
351 """ 352 Dummy batch abstraction for localhost. 353 """
354 - def nodelist(self):
355 return [Node('127.0.0.1', ncore=1, serial=i) 356 for i in range(self.case.execution.npart)]
357 - def create_worker(self, *args, **kw):
358 return self.create_worker_ssh(*args, **kw)
359
360 -class Torque(Batch):
361 """ 362 Torque/OpenPBS. 363 """ 364
365 - def __init__(self, case, **kw):
366 super(Torque, self).__init__(case, **kw) 367 self._nodelist = None
368 369 @property
370 - def str_resource(self):
371 res = self.resource.copy() 372 if self.case.execution.npart != None: 373 res['nodes'] = self.case.execution.npart 374 if self.use_mpi: 375 res['nodes'] += 1 376 # build resource tokens. 377 tokens = list() 378 for key in sorted(res.keys()): 379 val = res[key] 380 if val == None: 381 token = key 382 else: 383 token = '%s=%s' % (key, val) 384 if token: 385 tokens.append(token) 386 # nodes and ppn must be together. 387 idx1 = 0 388 while idx1 < len(tokens): 389 if 'nodes' in tokens[idx1]: 390 break 391 idx1 += 1 392 idx2 = 0 393 while idx2 < len(tokens): 394 if 'ppn' in tokens[idx2]: 395 break 396 idx2 += 1 397 if idx1 != len(tokens) and idx2 != len(tokens) and idx1 != idx2: 398 tokens[idx1] = ':'.join([tokens[idx1], tokens[idx2]]) 399 del tokens[idx2] 400 # return resource string. 401 if tokens: 402 return '#PBS -l %s' % ','.join(tokens) 403 else: 404 return ''
405 406 @property
407 - def str_jobname(self):
408 return '#PBS -N %s' % self.jobname
409 410 @property
411 - def str_output(self):
412 return '#PBS -j %s' % self.output
413 414 @property
415 - def str_shell(self):
416 return '#PBS -S %s' % self.shell
417
418 - def nodelist(self):
419 import os 420 from .conf import env 421 if not self._nodelist: 422 # read node file. 423 f = open(os.environ['PBS_NODEFILE']) 424 entries = [item.strip() for item in f.readlines()] 425 f.close() 426 nodelist = [Node(entries[it], ncore=1, serial=it) for it in 427 range(len(entries))] 428 # compress nodelist. 429 if env.command != None: 430 ops, args = env.command.opargs 431 if ops.compress_nodelist: 432 cnodelist = [nodelist[0]] 433 for nodeitem in nodelist[1:]: 434 cnodeitem = cnodelist[-1] 435 if nodeitem.address == cnodeitem.address: 436 cnodeitem.ncore += 1 437 else: 438 nodeitem.serial = len(cnodelist) 439 cnodelist.append(nodeitem) 440 nodelist = cnodelist 441 # exclude head when using MPI. 442 if env.mpi: 443 nodelist = nodelist[1:] 444 # cut nodelist. 445 self._nodelist = nodelist[:self.case.execution.npart] 446 return self._nodelist
447
448 - def create_worker_torque(self, node, authkey, 449 envar=None, paths=None, profiler_data=None):
450 """ 451 Use Torque TM API to create worker object. 452 453 @param node: node information. 454 @type node: Node 455 @param authkey: the authkey for the worker. 456 @type authkey: str 457 @keyword envar: additional environment variables to remote. 458 @type envar: dict 459 @keyword paths: path for remote execution. 460 @type paths: dict 461 @keyword profiler_data: profiler setting for remote worker. 462 @type profiler_data: tuple 463 @return: the port that the remote worker listen on. 464 @rtype: int 465 """ 466 import sys, os 467 from threading import Thread 468 from Queue import Queue 469 from .batch_torque import TaskManager 470 from .connection import pick_unused_port 471 # setup listener for remote port. 472 myhost = self.nodelist()[0].address 473 myport = pick_unused_port() 474 portq = Queue() 475 def get_port(): 476 from .connection import Listener 477 lsnr = Listener((myhost, myport), authkey=authkey) 478 conn = lsnr.accept() 479 portq.put(conn.recv()) 480 conn.close()
481 thd = Thread(target=get_port) 482 thd.start() 483 # start remote worker. 484 tm = TaskManager(paths=paths) 485 tm.spawn(sys.executable, '-c', 486 "from solvcon.batch_torque import run_worker; " 487 "run_worker(%s, %s, %s, %s, %s)" % ( 488 "('%s', %d)" % (myhost, myport), 489 "'%s'" % os.getcwd(), 490 str(profiler_data), 491 "'%s'" % node.address, 492 "'%s'" % authkey, 493 ), where=node.pserial, envar=envar) 494 # stop listening thread. 495 thd.join() 496 return portq.get()
497
498 - def create_worker(self, *args, **kw):
499 from .conf import env 500 from .batch_torque import TaskManager 501 if env.mpi: 502 return self.create_worker_mpi(*args, **kw) 503 elif TaskManager._clib_torque: 504 return self.create_worker_torque(*args, **kw) 505 else: 506 return self.create_worker_ssh(*args, **kw)
507