1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Basic support for cluster batch systems.
21
22 * batregy: The registry containing all usable batch system abstraction.
23 * Batch: The fundamental class for batch systems.
24 * Localhost: A dummy batch systems for domain decomposition working on
25 localhost.
26 * Torque: The Torque batch system.
27 """
28
29 from .gendata import SingleAssignDict, AttributeDict
30
31 -class Node(object):
32 - def __init__(self, name, ncore=1, serial=None, attrs=None):
33 self.name = name
34 self.ncore = ncore
35 self.serial = serial
36 self.pserial = serial
37 self.attrs = attrs if attrs != None else list()
38
40 attrstr = ','.join(self.attrs)
41 return '{%s(%d)%s}' % (self.name, self.ncore,
42 ': '+attrstr if attrstr else '')
43
44 @property
46 from socket import gethostbyname
47 return gethostbyname(self.name)
48
51 name = cmdtype.__name__
52 if name in self:
53 raise KeyError('%s was already registered as a batch' % name)
54 self[name] = cmdtype
55 return cmdtype
56 batregy = BatchRegistry()
64
66 """
67 Batch system submitter.
68
69 @cvar _subcmd_: the command name for the batch submission.
70 @ctype _subcmd_: str
71 @cvar BASH_HOME_SOURCE: the files to be sourced in user's home.
72 @ctype BASH_HOME_SOURCE: list
73 @ivar case: The Case corresponding object.
74 @itype case: solvcon.case.core.Case
75 @ivar arnname: The name of the arrangement to be run.
76 @itype arnname: str
77 @ivar jobname: The name to send to batch system.
78 @itype jobname: str
79 @ivar jobdir: The absolute path for job.
80 @itype jobdir: str
81 @ivar output: Batch output type.
82 @itype output: str
83 @ivar shell: Shell to be used on the cluster.
84 @itype shell: str
85 @ivar use_mpi: Indicate to use MPI as transport layer.
86 @itype use_mpi: bool
87 @ivar resource: Specified resources.
88 @itype resource: dict
89 """
90
91 __metaclass__ = BatchMeta
92
93 _subcmd_ = 'qsub'
94
95 DEFAULT_OUTPUT = 'oe'
96 DEFAULT_SHELL = '/bin/sh'
97 BASH_HOME_SOURCE = []
98
100 """
101 @keyword rootdir: Root directory for the project/code.
102 @type rootdir: str
103 @keyword arnname: The arrangement to be run.
104 @type arnname: str
105 """
106 import os
107 self.case = case
108 self.arnname = kw.pop('arnname', None)
109 self.jobname = kw.pop('jobname', self.arnname)
110 if case.io.basedir == None:
111 if self.jobname != None:
112 self.jobdir = os.path.abspath(os.path.join(
113 self.case.io.rootdir, self.jobname))
114 else:
115 self.jobdir = None
116 else:
117 self.jobdir = case.io.basedir
118 self.output = kw.pop('output', self.DEFAULT_OUTPUT)
119 self.shell = kw.pop('shell', self.DEFAULT_SHELL)
120 self.use_mpi = kw.pop('use_mpi', False)
121 self.resource = case.execution.resources.copy()
122 self.resource.update(kw)
123 super(Batch, self).__init__()
124
125 @property
128
129 @property
131 raise NotImplementedError
132
133 @property
135 raise NotImplementedError
136
137 @property
139 raise NotImplementedError
140
141 @property
143 raise NotImplementedError
144
145 @property
147 import os
148 ret = list()
149 ret.append('echo "Customized paths for job:"')
150
151 home = os.environ['HOME']
152 for fn in self.BASH_HOME_SOURCE:
153 if os.path.exists(os.path.join(home, fn)):
154 ret.append('. $HOME/%s'%fn)
155 ret.append('export PYTHONPATH=%s:$PYTHONPATH' % self.case.io.rootdir)
156 return '\n'.join(ret)
157
158 @property
167
168 @property
169 - def str_postrun(self):
170 return 'echo "Finish @`date`."'
171
173 import os
174 from .conf import env
175 scgargs = ' '.join(['run', self.arnname])
176 if env.command != None:
177 ops, args = env.command.opargs
178 scgops = list()
179 if ops.npart != None:
180 scgops.append('\\\n')
181 scgops.append('--npart=%d' % ops.npart)
182 scgops.append('--batch=%s' % ops.batch)
183 if ops.envar:
184 scgops.append('\\\n')
185 envar = env.command.envar
186 scgops.append('--envar %s' % ':'.join([
187 '%s=%s' % (key, envar[key]) for key in envar
188 ]))
189 if ops.compress_nodelist:
190 scgops.append('--compress-nodelist')
191 if ops.use_profiler:
192 scgops.append('\\\n')
193 scgops.append('--use-profiler')
194 scgops.extend([
195 '--profiler-sort=%s' % ops.profiler_sort,
196 '--profiler-dat=%s' % ops.profiler_dat,
197 '--profiler-log=%s' % ops.profiler_log,
198 ])
199 if ops.solver_output:
200 scgops.append('--solver-output')
201 if ops.basedir:
202 scgops.append('\\\n')
203 scgops.append('--basedir=%s' % os.path.abspath(ops.basedir))
204 scgops = ' '.join(scgops)
205 else:
206 scgops = ''
207 scgops = '--runlevel %%d %s' % scgops
208 return ' '.join([env.get_entry_point(), scgargs, scgops])
209
212
213 @property
215 cmds = ['time']
216 if self.use_mpi:
217 mpi_runner = self.build_mpi_runner()
218 if not mpi_runner:
219 raise RuntimeError(
220 '%s gave null mpi_runner' % str(self.__class__))
221 cmds.append(mpi_runner)
222 cmds.append(self.build_scg_command())
223 return '\n'.join([
224 'cd %s' % self.jobdir,
225 ' '.join(cmds).strip()
226 ])
227
243
244 - def tofile(self, basename=None):
245 """
246 Write self into the file for the submitting script.
247 """
248 import os
249 from glob import glob
250 info = self.case.info
251 basename = self.jobname+'.pbs' if basename == None else basename
252 if os.path.exists(self.jobdir):
253 info('Job directory was there: %s\n' % self.jobdir)
254 if self.case.io.empty_jobdir:
255 info('Delete all file in job directory.\n')
256 for fn in glob(os.path.join(self.jobdir, '*')):
257 os.unlink(fn)
258 else:
259 os.makedirs(self.jobdir)
260 fn = os.path.abspath(os.path.join(self.jobdir, basename))
261 fnlist = list()
262 for ilevel in range(3):
263 fnlist.append(fn+str(ilevel))
264 f = open(fnlist[-1], 'w')
265 f.write(str(self) % ilevel)
266 f.close()
267 return fnlist
268
269 - def __call__(self, runlevel=0, basename=None, postpone=False):
270 """
271 Make submitting script and invoke the batch system.
272 """
273 import os
274 from subprocess import call
275 info = self.case.info
276 fnlist = self.tofile(basename=basename)
277 os.chdir(self.jobdir)
278 if postpone:
279 return
280 else:
281 info('submit runlevel %d\n' % runlevel)
282 return call('%s %s'%(self._subcmd_, fnlist[runlevel]), shell=True)
283
285 raise NotImplementedError
286
288 """
289 True implementations are in create_worker_*() and have identical spec.
290 """
291 raise NotImplementedError
292
293 - def create_worker_ssh(self, node, authkey,
294 envar=None, paths=None, profiler_data=None):
295 """
296 Use secure shell to create worker object.
297
298 @param node: node information.
299 @type node: Node
300 @param authkey: the authkey for the worker.
301 @type authkey: str
302 @keyword envar: additional environment variables to remote.
303 @type envar: dict
304 @keyword paths: path for remote execution.
305 @type paths: dict
306 @keyword profiler_data: profiler setting for remote worker.
307 @type profiler_data: tuple
308 @return: the port that the remote worker listen on.
309 @rtype: int
310 """
311 import os
312 from subprocess import PIPE
313 from .connection import Client
314 from .rpc import SecureShell
315 remote = SecureShell(node.address, paths=paths)
316
317 val = int(remote([
318 'import sys',
319 'from solvcon.connection import pick_unused_port',
320 'sys.stdout.write(str(pick_unused_port()))',
321 ], stdout=PIPE))
322 try:
323 port = int(val)
324 except ValueError:
325 raise IOError, 'remote port detection fails'
326
327 pdata = str(profiler_data).replace("'", '"')
328 remote([
329 'import os',
330 'os.chdir("%s")' % os.path.abspath(os.getcwd()),
331 'from solvcon.rpc import Worker',
332 'wkr = Worker(None, profiler_data=%s)' % pdata,
333 'wkr.run(("%s", %d), "%s")' % (node.address, port, authkey),
334 ], envar=envar)
335 return port
336
337 - def create_worker_mpi(self, node, authkey,
338 envar=None, paths=None, profiler_data=None):
339 """
340 Use MPI to link remote worker object.
341
342 @param node: node information.
343 @type node: Node
344 @param authkey: the authkey for the worker.
345 @type authkey: str
346 @keyword envar: additional environment variables to remote.
347 @type envar: dict
348 @keyword paths: path for remote execution.
349 @type paths: dict
350 @keyword profiler_data: profiler setting for remote worker.
351 @type profiler_data: tuple
352 @return: the port that the remote worker listen on.
353 @rtype: int
354 """
355 from .conf import env
356 return env.mpi.recv(node.serial, 1)
357
359 """
360 Dummy batch abstraction for localhost.
361 """
363 return [Node('127.0.0.1', ncore=1, serial=i)
364 for i in range(self.case.execution.npart)]
367
369 """
370 Torque/OpenPBS.
371 """
372
376
377 @property
379 res = self.resource.copy()
380 if self.case.execution.npart != None:
381 res['nodes'] = self.case.execution.npart
382 if self.use_mpi:
383 res['nodes'] += 1
384
385 tokens = list()
386 for key in sorted(res.keys()):
387 val = res[key]
388 if val == None:
389 token = key
390 else:
391 token = '%s=%s' % (key, val)
392 if token:
393 tokens.append(token)
394
395 idx1 = 0
396 while idx1 < len(tokens):
397 if 'nodes' in tokens[idx1]:
398 break
399 idx1 += 1
400 idx2 = 0
401 while idx2 < len(tokens):
402 if 'ppn' in tokens[idx2]:
403 break
404 idx2 += 1
405 if idx1 != len(tokens) and idx2 != len(tokens) and idx1 != idx2:
406 tokens[idx1] = ':'.join([tokens[idx1], tokens[idx2]])
407 del tokens[idx2]
408
409 if tokens:
410 return '#PBS -l %s' % ','.join(tokens)
411 else:
412 return ''
413
414 @property
416 return '#PBS -N %s' % self.jobname
417
418 @property
420 return '#PBS -j %s' % self.output
421
422 @property
424 return '#PBS -S %s' % self.shell
425
427 import os
428 from .conf import env
429 if not self._nodelist:
430
431 f = open(os.environ['PBS_NODEFILE'])
432 entries = [item.strip() for item in f.readlines()]
433 f.close()
434 nodelist = [Node(entries[it], ncore=1, serial=it) for it in
435 range(len(entries))]
436
437 if env.command != None:
438 ops, args = env.command.opargs
439 if ops.compress_nodelist:
440 cnodelist = [nodelist[0]]
441 for nodeitem in nodelist[1:]:
442 cnodeitem = cnodelist[-1]
443 if nodeitem.address == cnodeitem.address:
444 cnodeitem.ncore += 1
445 else:
446 nodeitem.serial = len(cnodelist)
447 cnodelist.append(nodeitem)
448 nodelist = cnodelist
449
450 if env.mpi:
451 nodelist = nodelist[1:]
452
453 self._nodelist = nodelist[:self.case.execution.npart]
454 return self._nodelist
455
458 """
459 Use Torque TM API to create worker object.
460
461 @param node: node information.
462 @type node: Node
463 @param authkey: the authkey for the worker.
464 @type authkey: str
465 @keyword envar: additional environment variables to remote.
466 @type envar: dict
467 @keyword paths: path for remote execution.
468 @type paths: dict
469 @keyword profiler_data: profiler setting for remote worker.
470 @type profiler_data: tuple
471 @return: the port that the remote worker listen on.
472 @rtype: int
473 """
474 import sys, os
475 from threading import Thread
476 from Queue import Queue
477 from .batch_torque import TaskManager
478 from .connection import pick_unused_port
479
480 myhost = self.nodelist()[0].address
481 myport = pick_unused_port()
482 portq = Queue()
483 def get_port():
484 from .connection import Listener
485 lsnr = Listener((myhost, myport), authkey=authkey)
486 conn = lsnr.accept()
487 portq.put(conn.recv())
488 conn.close()
489 thd = Thread(target=get_port)
490 thd.start()
491
492 tm = TaskManager(paths=paths)
493 tm.spawn(sys.executable, '-c',
494 "from solvcon.batch_torque import run_worker; "
495 "run_worker(%s, %s, %s, %s, %s)" % (
496 "('%s', %d)" % (myhost, myport),
497 "'%s'" % os.getcwd(),
498 str(profiler_data),
499 "'%s'" % node.address,
500 "'%s'" % authkey,
501 ), where=node.pserial, envar=envar)
502
503 thd.join()
504 return portq.get()
505
515