Package solvcon :: Package tests :: Module test_case
[hide private]
[frames] | no frames]

Source Code for Module solvcon.tests.test_case

  1  # -*- coding: UTF-8 -*- 
  2  # Copyright (C) 2008-2011 by Yung-Yu Chen.  See LICENSE.txt for terms of usage. 
  3   
  4  from unittest import TestCase 
  5  from ..anchor import VtkAnchor 
  6   
7 -class TestArrangement(TestCase):
9 from ..case import BaseCase, BlockCase 10 self.assertNotEqual( 11 id(BaseCase.arrangements), 12 id(BlockCase.arrangements) 13 )
14
15 -class TestCaseInfo(TestCase):
16 - def test_CaseInfo(self):
17 from ..case import CaseInfo 18 tdic = CaseInfo(_defdict={ 19 'lay1_1': '1_1', 20 'lay1_2.lay2_1': '1_2/2_1', 21 'lay1_2.lay2_2': '1_2/2_2', 22 'lay1_3.': '1_3/', 23 }) 24 self.assertEqual(tdic.lay1_1, '1_1') 25 self.assertEqual(tdic.lay1_2.lay2_1, '1_2/2_1') 26 self.assertEqual(tdic.lay1_2.lay2_2, '1_2/2_2') 27 self.assertEqual(len(tdic.lay1_3), 0)
28
29 -class TestBaseCase(TestCase):
30 - def test_empty_fields(self):
31 from ..case import BaseCase 32 case = BaseCase() 33 # execution related. 34 self.assertTrue(isinstance(case.runhooks, list)) 35 self.assertEqual(case.execution.time, 0.0) 36 self.assertEqual(case.execution.time_increment, 0.0) 37 self.assertEqual(case.execution.step_init, 0) 38 self.assertEqual(case.execution.step_current, None) 39 self.assertEqual(case.execution.steps_run, None) 40 self.assertEqual(case.execution.neq, 0) 41 self.assertEqual(case.execution.var, dict()) 42 self.assertEqual(case.execution.varstep, None) 43 # io related. 44 self.assertEqual(case.io.abspath, False) 45 self.assertEqual(case.io.basedir, None) 46 self.assertEqual(case.io.basefn, None) 47 # condition related. 48 self.assertEqual(case.condition.mtrllist, list()) 49 # solver related. 50 self.assertEqual(case.solver.solvertype, None) 51 self.assertEqual(case.solver.solverobj, None) 52 # logging. 53 self.assertEqual(case.log.time, dict())
54
55 - def test_abspath(self):
56 import os 57 from ..case import BaseCase 58 case = BaseCase(basedir='.', abspath=True) 59 path = os.path.abspath('.') 60 self.assertEqual(case.io.basedir, path)
61
62 - def test_init(self):
63 from ..case import BaseCase 64 case = BaseCase() 65 case.init()
66 67 from ..hook import BlockHook
68 -class CaseCollect(BlockHook):
69 - def postmarch(self):
70 self._collect_interior('soln', tovar=True) 71 self._collect_interior('dsoln', tovar=True)
72 - def preloop(self):
73 self.postmarch()
74 del BlockHook 75
76 -class TestBlockCaseRun(TestCase):
77 time = 0.0 78 time_increment = 1.0 79 nsteps = 10 80
81 - def _get_case(self, **kw):
82 import os 83 from ..testing import TestingSolver 84 from ..conf import env 85 from ..case import BlockCase 86 from ..anchor import FillAnchor 87 from ..helper import Information 88 meshfn = kw.get('meshfn', 'sample.neu') 89 kw['meshfn'] = os.path.join(env.datadir, meshfn) 90 case = BlockCase(basedir='.', basefn='blockcase', bcmap=None, 91 solvertype=TestingSolver, neq=1, 92 steps_run=self.nsteps, time_increment=self.time_increment, 93 **kw 94 ) 95 case.info = Information() 96 case.runhooks.append(FillAnchor, 97 keys=('soln', 'dsoln'), value=0.0, 98 ) 99 case.runhooks.append(CaseCollect) 100 case.init() 101 return case
102
103 -class TestSequential(TestBlockCaseRun):
104 - def test_soln(self):
105 from numpy import zeros 106 from ..domain import Domain 107 case = self._get_case(domaintype=Domain) 108 svr = case.solver.solverobj 109 case.run() 110 ngstcell = svr.ngstcell 111 # get result. 112 soln = svr.soln[ngstcell:,0] 113 # calculate reference 114 clvol = zeros(soln.shape, dtype=soln.dtype) 115 for iistep in range(self.nsteps*2): 116 clvol += svr.clvol[ngstcell:]*self.time_increment/2 117 # compare. 118 self.assertTrue((soln==clvol).all())
119
120 - def test_dsoln(self):
121 from numpy import zeros 122 from ..domain import Domain 123 case = self._get_case(domaintype=Domain) 124 svr = case.solver.solverobj 125 case.run() 126 ngstcell = svr.ngstcell 127 # get result. 128 dsoln = svr.dsoln[ngstcell:,0,:] 129 # calculate reference 130 clcnd = zeros(dsoln.shape, dtype=dsoln.dtype) 131 for iistep in range(self.nsteps*2): 132 clcnd += svr.clcnd[ngstcell:]*self.time_increment/2 133 # compare. 134 self.assertTrue((dsoln==clcnd).all())
135
136 -class TestLocalParallel(TestBlockCaseRun):
137 npart = 3 138
139 - def test_soln(self):
140 import sys 141 from nose.plugins.skip import SkipTest 142 if sys.platform.startswith('win'): raise SkipTest 143 from numpy import zeros 144 from ..domain import Collective 145 case = self._get_case(npart=self.npart, domaintype=Collective) 146 case.run() 147 # get result. 148 soln = case.execution.var['soln'][:,0] 149 # calculate reference 150 blk = case.solver.domainobj.blk 151 clvol = zeros(soln.shape, dtype=soln.dtype) 152 for iistep in range(self.nsteps*2): 153 clvol += blk.clvol*self.time_increment/2 154 # compare. 155 self.assertTrue((soln==clvol).all())
156
157 - def test_dsoln(self):
158 import sys 159 from nose.plugins.skip import SkipTest 160 if sys.platform.startswith('win'): raise SkipTest 161 from numpy import zeros 162 from ..domain import Collective 163 case = self._get_case(npart=self.npart, domaintype=Collective) 164 case.run() 165 # get result. 166 dsoln = case.execution.var['dsoln'][:,0,:] 167 # calculate reference 168 blk = case.solver.domainobj.blk 169 clcnd = zeros(dsoln.shape, dtype=dsoln.dtype) 170 for iistep in range(self.nsteps*2): 171 clcnd += blk.clcnd*self.time_increment/2 172 # compare. 173 self.assertTrue((dsoln==clcnd).all())
174
175 - def test_ibcthread(self):
176 import sys 177 from nose.plugins.skip import SkipTest 178 if sys.platform.startswith('win'): raise SkipTest 179 from numpy import zeros 180 from ..domain import Collective 181 case = self._get_case(npart=self.npart, domaintype=Collective, 182 ibcthread=True) 183 case.run() 184 # get result. 185 soln = case.execution.var['soln'][:,0] 186 # calculate reference 187 blk = case.solver.domainobj.blk 188 clvol = zeros(soln.shape, dtype=soln.dtype) 189 for iistep in range(self.nsteps*2): 190 clvol += blk.clvol*self.time_increment/2 191 # compare. 192 self.assertTrue((soln==clvol).all())
193
194 -class TestRemoteParallel(TestBlockCaseRun):
195 npart = 2 196
197 - def test_dsoln(self):
198 import sys 199 from nose.plugins.skip import SkipTest 200 if sys.platform.startswith('win'): raise SkipTest 201 from numpy import zeros 202 from ..batch import Localhost 203 from ..domain import Distributed 204 case = self._get_case(npart=self.npart, domaintype=Distributed, 205 batch=Localhost, rkillfn='') 206 case.run() 207 # get result. 208 dsoln = case.execution.var['dsoln'][:,0,:] 209 # calculate reference 210 blk = case.solver.domainobj.blk 211 clcnd = zeros(dsoln.shape, dtype=dsoln.dtype) 212 for iistep in range(self.nsteps*2): 213 clcnd += blk.clcnd*self.time_increment/2 214 # compare. 215 self.assertTrue((dsoln==clcnd).all())
216
217 -class TestPresplitLocalParallel(TestBlockCaseRun):
218 npart = 3
219 - def test_soln(self):
220 import sys 221 from nose.plugins.skip import SkipTest 222 if sys.platform.startswith('win'): raise SkipTest 223 from numpy import zeros 224 from ..domain import Collective 225 case = self._get_case(npart=self.npart, domaintype=Collective, 226 meshfn='sample.dom') 227 case.run() 228 # get result. 229 soln = case.execution.var['soln'][:,0] 230 # calculate reference 231 blk = case.solver.domainobj.blk 232 clvol = zeros(soln.shape, dtype=soln.dtype) 233 for iistep in range(self.nsteps*2): 234 clvol += blk.clvol*self.time_increment/2 235 # compare. 236 self.assertTrue((soln==clvol).all())
237
238 -class TestPresplitLocalParallelNoArrs(TestBlockCaseRun):
239 npart = 3
240 - def _get_case_nocollect(self, **kw):
241 import os 242 from ..testing import TestingSolver 243 from ..conf import env 244 from ..case import BlockCase 245 from ..anchor import FillAnchor 246 from ..helper import Information 247 meshfn = kw.get('meshfn', 'sample.neu') 248 kw['meshfn'] = os.path.join(env.datadir, meshfn) 249 case = BlockCase(basedir='.', basefn='blockcase', bcmap=None, 250 solvertype=TestingSolver, neq=1, 251 steps_run=self.nsteps, time_increment=self.time_increment, 252 **kw 253 ) 254 case.info = Information() 255 case.runhooks.append(FillAnchor, 256 keys=('soln', 'dsoln'), value=0.0, 257 ) 258 case.init() 259 return case
260 - def test_run(self):
261 import sys 262 from nose.plugins.skip import SkipTest 263 if sys.platform.startswith('win'): raise SkipTest 264 from numpy import zeros 265 from ..domain import Collective 266 case = self._get_case_nocollect(npart=self.npart, 267 domaintype=Collective, meshfn='sample.dom', 268 with_arrs=False, with_whole=False) 269 case.run()
270
271 -class SampleVtkAnchor(VtkAnchor):
272 - def process(self, istep):
273 from ..visual_vtk import Vop 274 self._aggregate() 275 usp = Vop.c2p(self.svr.ust) 276 cut = Vop.cut(usp, (0,0,0), (0,1,0)) 277 cut.Update() 278 Vop.write_poly(cut, self.vtkfn)
279 -class TestPresplitRemoteParallel(TestBlockCaseRun):
280 npart = 3
282 import sys, os 283 import shutil, glob 284 from tempfile import mkdtemp 285 from nose.plugins.skip import SkipTest 286 if sys.platform.startswith('win'): raise SkipTest 287 try: 288 import vtk 289 except ImportError: 290 raise SkipTest 291 from numpy import zeros 292 from ..conf import env 293 from ..testing import TestingSolver 294 from ..batch import Localhost 295 from ..domain import Distributed 296 from ..helper import Information 297 from .. import case, hook, anchor 298 # construct case. 299 cse = case.BlockCase(basedir='.', basefn='blockcase', bcmap=None, 300 solvertype=TestingSolver, neq=1, 301 steps_run=self.nsteps, time_increment=self.time_increment, 302 npart=self.npart, domaintype=Distributed, 303 batch=Localhost, rkillfn='', 304 meshfn=os.path.join(env.datadir, 'sample.dom')) 305 cse.info = Information() 306 cse.runhooks.append(anchor.FillAnchor, 307 keys=('soln', 'dsoln'), value=0.0, 308 ) 309 cse.runhooks.append(CaseCollect) 310 tdir = mkdtemp() 311 cse.runhooks.append(hook.PMarchSave, anames=[ 312 ('soln', False, -1), 313 ], fpdtype='float32', psteps=1, compressor='gz', altdir=tdir) 314 cse.runhooks.append(hook.PVtkHook, anames=[ 315 ('soln', False, -1), 316 ], ankcls=SampleVtkAnchor, psteps=1, altdir=tdir) 317 cse.init() 318 cse.run() 319 # verify output files. 320 self.assertEqual(len(glob.glob(os.path.join(tdir, '*.pvtu'))), 321 self.nsteps+1) 322 self.assertEqual(len(glob.glob(os.path.join(tdir, '*.pvtp'))), 323 self.nsteps+1) 324 self.assertEqual(len(glob.glob(os.path.join(tdir, '*.vtu'))), 325 (self.nsteps+1)*self.npart) 326 self.assertEqual(len(glob.glob(os.path.join(tdir, '*.vtp'))), 327 (self.nsteps+1)*self.npart) 328 shutil.rmtree(tdir) 329 # get result. 330 dsoln = cse.execution.var['dsoln'][:,0,:] 331 # calculate reference 332 blk = cse.solver.domainobj.blk 333 clcnd = zeros(dsoln.shape, dtype=dsoln.dtype) 334 for iistep in range(self.nsteps*2): 335 clcnd += blk.clcnd*self.time_increment/2 336 # compare. 337 self.assertTrue((dsoln==clcnd).all())
338
339 -class TestTorqueParallel(TestBlockCaseRun):
340 npart = 2 341
342 - def test_runparallel(self):
343 import sys, os 344 from nose.plugins.skip import SkipTest 345 if sys.platform.startswith('win'): raise SkipTest 346 from ..batch_torque import TaskManager 347 if TaskManager._clib_torque == None or \ 348 'PBS_NODEFILE' not in os.environ: raise SkipTest 349 from numpy import zeros 350 from ..batch import Torque 351 from ..domain import Distributed 352 case = self._get_case(npart=self.npart, domaintype=Distributed, 353 batch=Torque, rkillfn='') 354 case.run()
355