1
2
3 from unittest import TestCase
4
10
12 from ..case import BlockCase
13 from ..batch import Batch
14 case = BlockCase(rootdir='/tmp')
15 sbm = Batch(case, arnname='arn')
16 self.assertRaises(NotImplementedError, lambda: sbm.str_resource)
17 self.assertRaises(NotImplementedError, lambda: sbm.str_jobname)
18 self.assertRaises(NotImplementedError, lambda: sbm.str_output)
19 self.assertRaises(NotImplementedError, lambda: sbm.str_shell)
20 self.assertRaises(NotImplementedError, lambda: str(sbm))
21
23 SCRIPT = '''#!/bin/sh
24
25 #PBS -N arn
26 #PBS -j oe
27 #PBS -S /bin/sh
28 echo "Customized paths for job:"
29 export PYTHONPATH=/tmp:$PYTHONPATH
30 echo "Run @`date`:"
31 cd /tmp/arn
32 time %s run arn --runlevel %%d
33 echo "Finish @`date`."'''
34 SCRIPT_FILE = '''#!/bin/sh
35
36 #PBS -N arn
37 #PBS -j oe
38 #PBS -S /bin/sh
39 echo "Customized paths for job:"
40 export PYTHONPATH=%s:$PYTHONPATH
41 echo "Run @`date`:"
42 cd %s/arn
43 time %s run arn --runlevel %d
44 echo "Finish @`date`."'''
45
56
58 import sys
59 from nose.plugins.skip import SkipTest
60 if sys.platform.startswith('win'): raise SkipTest
61 import os, shutil
62 from tempfile import mkdtemp
63 from ..case import BlockCase
64 from ..batch import batregy
65 from ..conf import env
66 wdir = mkdtemp()
67 case = BlockCase(rootdir=wdir)
68 msg = []
69 case.info = lambda m: msg.append(m)
70 sbm = batregy.Torque(case, arnname='arn')
71 fnlist = sbm.tofile()
72 for it in range(len(fnlist)):
73 self.assertEqual(fnlist[it], os.path.join(wdir, 'arn',
74 'arn.pbs%d'%it))
75 fn = fnlist[it]
76 f = open(fn)
77 self.assertEqual(f.read(),
78 self.SCRIPT_FILE%(wdir, wdir, env.get_entry_point(), it))
79 f.close()
80 shutil.rmtree(wdir)
81 self.assertFalse(os.path.exists(wdir))
82
84 import sys
85 from nose.plugins.skip import SkipTest
86 if sys.platform.startswith('win'): raise SkipTest
87 import os, shutil
88 from tempfile import mkdtemp
89 from ..case import BlockCase
90 from ..batch import batregy
91 wdir = mkdtemp()
92 case = BlockCase(rootdir=wdir)
93 msg = []
94 case.info = lambda m: msg.append(m)
95 sbm = batregy.Torque(case, arnname='arn')
96 if not os.path.exists(os.path.join(wdir, 'arn')):
97 os.makedirs(os.path.join(wdir, 'arn'))
98 fnlist = sbm.tofile()
99 for it in range(len(fnlist)):
100 self.assertEqual(fnlist[it], os.path.join(wdir, 'arn',
101 'arn.pbs%d'%it))
102 self.assertEqual(len(msg), 1)
103 self.assertEqual(msg[0], 'Job directory was there: %s\n' %
104 os.path.join(wdir, 'arn'))
105 shutil.rmtree(wdir)
106 self.assertFalse(os.path.exists(wdir))
107
109 import sys
110 from nose.plugins.skip import SkipTest
111 if sys.platform.startswith('win'): raise SkipTest
112 import os, shutil
113 from tempfile import mkdtemp
114 from ..case import BlockCase
115 from ..batch import batregy
116 wdir = mkdtemp()
117 case = BlockCase(rootdir=wdir)
118 msg = []
119 case.info = lambda m: msg.append(m)
120 sbm = batregy.Torque(case, arnname='arn')
121 if os.path.exists(os.path.join(wdir, 'arn')):
122 shutil.rmtree(os.path.join(wdir, 'arn'))
123 fnlist = sbm.tofile()
124 for it in range(len(fnlist)):
125 self.assertEqual(fnlist[it], os.path.join(wdir, 'arn',
126 'arn.pbs%d'%it))
127 self.assertEqual(len(msg), 0)
128 shutil.rmtree(wdir)
129 self.assertFalse(os.path.exists(wdir))
130
132 import sys
133 from nose.plugins.skip import SkipTest
134 if sys.platform.startswith('win'): raise SkipTest
135 import os, shutil
136 from tempfile import mkdtemp
137 from ..case import BlockCase
138 from ..batch import batregy
139 wdir = mkdtemp()
140 case = BlockCase(rootdir=wdir, empty_jobdir=True)
141 msg = []
142 case.info = lambda m: msg.append(m)
143 sbm = batregy.Torque(case, arnname='arn')
144 if not os.path.exists(os.path.join(wdir, 'arn')):
145 os.makedirs(os.path.join(wdir, 'arn'))
146 fnlist = sbm.tofile()
147 for it in range(len(fnlist)):
148 self.assertEqual(fnlist[it], os.path.join(wdir, 'arn',
149 'arn.pbs%d'%it))
150 self.assertEqual(len(msg), 2)
151 self.assertEqual(msg[0], 'Job directory was there: %s\n' % (
152 os.path.join(wdir, 'arn'),))
153 self.assertEqual(msg[1], 'Delete all file in job directory.\n')
154 shutil.rmtree(wdir)
155 self.assertFalse(os.path.exists(wdir))
156