1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Basic support for cluster batch systems.
21
22 * batregy: The registry containing all usable batch system abstraction.
23 * Batch: The fundamental class for batch systems.
24 * Localhost: A dummy batch systems for domain decomposition working on
25 localhost.
26 * Torque: The Torque batch system.
27 """
28
29 from .gendata import TypeNameRegistry
30
31 -class Node(object):
32 - def __init__(self, name, ncore=1, serial=None, attrs=None):
33 self.name = name
34 self.ncore = ncore
35 self.serial = serial
36 self.pserial = serial
37 self.attrs = attrs if attrs != None else list()
38
40 attrstr = ','.join(self.attrs)
41 return '{%s(%d)%s}' % (self.name, self.ncore,
42 ': '+attrstr if attrstr else '')
43
44 @property
46 from socket import gethostbyname
47 return gethostbyname(self.name)
48
49 batregy = TypeNameRegistry()
56
58 """
59 Batch system submitter.
60
61 @cvar _subcmd_: the command name for the batch submission.
62 @ctype _subcmd_: str
63 @cvar BASH_HOME_SOURCE: the files to be sourced in user's home.
64 @ctype BASH_HOME_SOURCE: list
65 @ivar case: The Case corresponding object.
66 @itype case: solvcon.case.core.Case
67 @ivar arnname: The name of the arrangement to be run.
68 @itype arnname: str
69 @ivar jobname: The name to send to batch system.
70 @itype jobname: str
71 @ivar jobdir: The absolute path for job.
72 @itype jobdir: str
73 @ivar output: Batch output type.
74 @itype output: str
75 @ivar shell: Shell to be used on the cluster.
76 @itype shell: str
77 @ivar use_mpi: Indicate to use MPI as transport layer.
78 @itype use_mpi: bool
79 @ivar resource: Specified resources.
80 @itype resource: dict
81 """
82
83 __metaclass__ = BatchMeta
84
85 _subcmd_ = 'qsub'
86
87 DEFAULT_OUTPUT = 'oe'
88 DEFAULT_SHELL = '/bin/sh'
89 BASH_HOME_SOURCE = []
90
92 """
93 @keyword rootdir: Root directory for the project/code.
94 @type rootdir: str
95 @keyword arnname: The arrangement to be run.
96 @type arnname: str
97 """
98 import os
99 self.case = case
100 self.arnname = kw.pop('arnname', None)
101 self.jobname = kw.pop('jobname', self.arnname)
102 if case.io.basedir == None:
103 if self.jobname != None:
104 self.jobdir = os.path.abspath(os.path.join(
105 self.case.io.rootdir, self.jobname))
106 else:
107 self.jobdir = None
108 else:
109 self.jobdir = case.io.basedir
110 self.output = kw.pop('output', self.DEFAULT_OUTPUT)
111 self.shell = kw.pop('shell', self.DEFAULT_SHELL)
112 self.use_mpi = kw.pop('use_mpi', False)
113 self.resource = case.execution.resources.copy()
114 self.resource.update(kw)
115 super(Batch, self).__init__()
116
117 @property
120
121 @property
123 raise NotImplementedError
124
125 @property
127 raise NotImplementedError
128
129 @property
131 raise NotImplementedError
132
133 @property
135 raise NotImplementedError
136
137 @property
139 import os
140 ret = list()
141 ret.append('echo "Customized paths for job:"')
142
143 home = os.environ['HOME']
144 for fn in self.BASH_HOME_SOURCE:
145 if os.path.exists(os.path.join(home, fn)):
146 ret.append('. $HOME/%s'%fn)
147 ret.append('export PYTHONPATH=%s:$PYTHONPATH' % self.case.io.rootdir)
148 return '\n'.join(ret)
149
150 @property
159
160 @property
161 - def str_postrun(self):
162 return 'echo "Finish @`date`."'
163
165 import os
166 from .conf import env
167 scgargs = ' '.join(['run', self.arnname])
168 if env.command != None:
169 ops, args = env.command.opargs
170 scgops = list()
171 if ops.npart != None:
172 scgops.append('\\\n')
173 scgops.append('--npart=%d' % ops.npart)
174 scgops.append('--batch=%s' % ops.batch)
175 if ops.envar:
176 scgops.append('\\\n')
177 envar = env.command.envar
178 scgops.append('--envar %s' % ':'.join([
179 '%s=%s' % (key, envar[key]) for key in envar
180 ]))
181 if ops.compress_nodelist:
182 scgops.append('--compress-nodelist')
183 if ops.use_profiler:
184 scgops.append('\\\n')
185 scgops.append('--use-profiler')
186 scgops.extend([
187 '--profiler-sort=%s' % ops.profiler_sort,
188 '--profiler-dat=%s' % ops.profiler_dat,
189 '--profiler-log=%s' % ops.profiler_log,
190 ])
191 if ops.solver_output:
192 scgops.append('--solver-output')
193 if ops.basedir:
194 scgops.append('\\\n')
195 scgops.append('--basedir=%s' % os.path.abspath(ops.basedir))
196 scgops = ' '.join(scgops)
197 else:
198 scgops = ''
199 scgops = '--runlevel %%d %s' % scgops
200 return ' '.join([env.get_entry_point(), scgargs, scgops])
201
204
205 @property
207 cmds = ['time']
208 if self.use_mpi:
209 mpi_runner = self.build_mpi_runner()
210 if not mpi_runner:
211 raise RuntimeError(
212 '%s gave null mpi_runner' % str(self.__class__))
213 cmds.append(mpi_runner)
214 cmds.append(self.build_scg_command())
215 return '\n'.join([
216 'cd %s' % self.jobdir,
217 ' '.join(cmds).strip()
218 ])
219
235
236 - def tofile(self, basename=None):
237 """
238 Write self into the file for the submitting script.
239 """
240 import os
241 from glob import glob
242 info = self.case.info
243 basename = self.jobname+'.pbs' if basename == None else basename
244 if os.path.exists(self.jobdir):
245 info('Job directory was there: %s\n' % self.jobdir)
246 if self.case.io.empty_jobdir:
247 info('Delete all file in job directory.\n')
248 for fn in glob(os.path.join(self.jobdir, '*')):
249 os.unlink(fn)
250 else:
251 os.makedirs(self.jobdir)
252 fn = os.path.abspath(os.path.join(self.jobdir, basename))
253 fnlist = list()
254 for ilevel in range(3):
255 fnlist.append(fn+str(ilevel))
256 f = open(fnlist[-1], 'w')
257 f.write(str(self) % ilevel)
258 f.close()
259 return fnlist
260
261 - def __call__(self, runlevel=0, basename=None, postpone=False):
262 """
263 Make submitting script and invoke the batch system.
264 """
265 import os
266 from subprocess import call
267 info = self.case.info
268 fnlist = self.tofile(basename=basename)
269 os.chdir(self.jobdir)
270 if postpone:
271 return
272 else:
273 info('submit runlevel %d\n' % runlevel)
274 return call('%s %s'%(self._subcmd_, fnlist[runlevel]), shell=True)
275
277 raise NotImplementedError
278
280 """
281 True implementations are in create_worker_*() and have identical spec.
282 """
283 raise NotImplementedError
284
285 - def create_worker_ssh(self, node, authkey,
286 envar=None, paths=None, profiler_data=None):
287 """
288 Use secure shell to create worker object.
289
290 @param node: node information.
291 @type node: Node
292 @param authkey: the authkey for the worker.
293 @type authkey: str
294 @keyword envar: additional environment variables to remote.
295 @type envar: dict
296 @keyword paths: path for remote execution.
297 @type paths: dict
298 @keyword profiler_data: profiler setting for remote worker.
299 @type profiler_data: tuple
300 @return: the port that the remote worker listen on.
301 @rtype: int
302 """
303 import os
304 from subprocess import PIPE
305 from .connection import Client
306 from .rpc import SecureShell
307 remote = SecureShell(node.address, paths=paths)
308
309 val = int(remote([
310 'import sys',
311 'from solvcon.connection import pick_unused_port',
312 'sys.stdout.write(str(pick_unused_port()))',
313 ], stdout=PIPE))
314 try:
315 port = int(val)
316 except ValueError:
317 raise IOError, 'remote port detection fails'
318
319 pdata = str(profiler_data).replace("'", '"')
320 remote([
321 'import os',
322 'os.chdir("%s")' % os.path.abspath(os.getcwd()),
323 'from solvcon.rpc import Worker',
324 'wkr = Worker(None, profiler_data=%s)' % pdata,
325 'wkr.run(("%s", %d), "%s")' % (node.address, port, authkey),
326 ], envar=envar)
327 return port
328
329 - def create_worker_mpi(self, node, authkey,
330 envar=None, paths=None, profiler_data=None):
331 """
332 Use MPI to link remote worker object.
333
334 @param node: node information.
335 @type node: Node
336 @param authkey: the authkey for the worker.
337 @type authkey: str
338 @keyword envar: additional environment variables to remote.
339 @type envar: dict
340 @keyword paths: path for remote execution.
341 @type paths: dict
342 @keyword profiler_data: profiler setting for remote worker.
343 @type profiler_data: tuple
344 @return: the port that the remote worker listen on.
345 @rtype: int
346 """
347 from .conf import env
348 return env.mpi.recv(node.serial, 1)
349
351 """
352 Dummy batch abstraction for localhost.
353 """
355 return [Node('127.0.0.1', ncore=1, serial=i)
356 for i in range(self.case.execution.npart)]
359
361 """
362 Torque/OpenPBS.
363 """
364
368
369 @property
371 res = self.resource.copy()
372 if self.case.execution.npart != None:
373 res['nodes'] = self.case.execution.npart
374 if self.use_mpi:
375 res['nodes'] += 1
376
377 tokens = list()
378 for key in sorted(res.keys()):
379 val = res[key]
380 if val == None:
381 token = key
382 else:
383 token = '%s=%s' % (key, val)
384 if token:
385 tokens.append(token)
386
387 idx1 = 0
388 while idx1 < len(tokens):
389 if 'nodes' in tokens[idx1]:
390 break
391 idx1 += 1
392 idx2 = 0
393 while idx2 < len(tokens):
394 if 'ppn' in tokens[idx2]:
395 break
396 idx2 += 1
397 if idx1 != len(tokens) and idx2 != len(tokens) and idx1 != idx2:
398 tokens[idx1] = ':'.join([tokens[idx1], tokens[idx2]])
399 del tokens[idx2]
400
401 if tokens:
402 return '#PBS -l %s' % ','.join(tokens)
403 else:
404 return ''
405
406 @property
408 return '#PBS -N %s' % self.jobname
409
410 @property
412 return '#PBS -j %s' % self.output
413
414 @property
416 return '#PBS -S %s' % self.shell
417
419 import os
420 from .conf import env
421 if not self._nodelist:
422
423 f = open(os.environ['PBS_NODEFILE'])
424 entries = [item.strip() for item in f.readlines()]
425 f.close()
426 nodelist = [Node(entries[it], ncore=1, serial=it) for it in
427 range(len(entries))]
428
429 if env.command != None:
430 ops, args = env.command.opargs
431 if ops.compress_nodelist:
432 cnodelist = [nodelist[0]]
433 for nodeitem in nodelist[1:]:
434 cnodeitem = cnodelist[-1]
435 if nodeitem.address == cnodeitem.address:
436 cnodeitem.ncore += 1
437 else:
438 nodeitem.serial = len(cnodelist)
439 cnodelist.append(nodeitem)
440 nodelist = cnodelist
441
442 if env.mpi:
443 nodelist = nodelist[1:]
444
445 self._nodelist = nodelist[:self.case.execution.npart]
446 return self._nodelist
447
450 """
451 Use Torque TM API to create worker object.
452
453 @param node: node information.
454 @type node: Node
455 @param authkey: the authkey for the worker.
456 @type authkey: str
457 @keyword envar: additional environment variables to remote.
458 @type envar: dict
459 @keyword paths: path for remote execution.
460 @type paths: dict
461 @keyword profiler_data: profiler setting for remote worker.
462 @type profiler_data: tuple
463 @return: the port that the remote worker listen on.
464 @rtype: int
465 """
466 import sys, os
467 from threading import Thread
468 from Queue import Queue
469 from .batch_torque import TaskManager
470 from .connection import pick_unused_port
471
472 myhost = self.nodelist()[0].address
473 myport = pick_unused_port()
474 portq = Queue()
475 def get_port():
476 from .connection import Listener
477 lsnr = Listener((myhost, myport), authkey=authkey)
478 conn = lsnr.accept()
479 portq.put(conn.recv())
480 conn.close()
481 thd = Thread(target=get_port)
482 thd.start()
483
484 tm = TaskManager(paths=paths)
485 tm.spawn(sys.executable, '-c',
486 "from solvcon.batch_torque import run_worker; "
487 "run_worker(%s, %s, %s, %s, %s)" % (
488 "('%s', %d)" % (myhost, myport),
489 "'%s'" % os.getcwd(),
490 str(profiler_data),
491 "'%s'" % node.address,
492 "'%s'" % authkey,
493 ), where=node.pserial, envar=envar)
494
495 thd.join()
496 return portq.get()
497
507