Package solvcon :: Package io :: Package tests :: Module test_domain
[hide private]
[frames] | no frames]

Source Code for Module solvcon.io.tests.test_domain

  1  # -*- coding: UTF-8 -*- 
  2   
  3  from unittest import TestCase 
  4   
5 -def get_sample_neu():
6 """ 7 Read data from oblique.neu file and convert it into Block. 8 """ 9 from ...testing import loadfile 10 from ..gambit import GambitNeutral 11 blk = GambitNeutral(loadfile('sample.neu')).toblock() 12 for bc in blk.bclist: 13 if bc.name == 'unspecified': 14 bc.name = 'unspec_orig' 15 return blk
16
17 -class CheckDomainIO(TestCase):
18 - def _check_domain_shape(self, don, doo):
19 self.assertEqual(don.edgecut, doo.edgecut)
20 - def _check_domain_array(self, don, doo):
21 self.assertTrue((don.part == doo.part).all()) 22 self.assertTrue((don.shapes == doo.shapes).all()) 23 self.assertTrue((don.ifparr == doo.ifparr).all()) 24 self.assertTrue((don.mappers[0] == doo.mappers[0]).all()) 25 self.assertTrue((don.mappers[1] == doo.mappers[1]).all()) 26 self.assertTrue((don.mappers[2] == doo.mappers[2]).all()) 27 for it in range(len(doo)): 28 try: 29 self.assertTrue((don.idxinfo[it][0] == 30 doo.idxinfo[it][0]).all()) 31 self.assertTrue((don.idxinfo[it][1] == 32 doo.idxinfo[it][1]).all()) 33 self.assertTrue((don.idxinfo[it][2] == 34 doo.idxinfo[it][2]).all()) 35 except StandardError as e: 36 msgs = list(e.args) 37 msgs.append('%d-th block' % it) 38 e.args = tuple(msgs) 39 raise
40
41 - def _check_block_shape(self, newblk, blk):
42 # shape. 43 self.assertEqual(newblk.ndim, blk.ndim) 44 self.assertEqual(newblk.nnode, blk.nnode) 45 self.assertEqual(newblk.nface, blk.nface) 46 self.assertEqual(newblk.ncell, blk.ncell) 47 self.assertEqual(newblk.nbound, blk.nbound) 48 self.assertEqual(newblk.ngstnode, blk.ngstnode) 49 self.assertEqual(newblk.ngstface, blk.ngstface) 50 self.assertEqual(newblk.ngstcell, blk.ngstcell) 51 # serial number. 52 self.assertEqual(newblk.blkn, blk.blkn)
53 - def _check_block_group(self, newblk, blk):
54 # group names. 55 self.assertEqual(len(newblk.grpnames), len(blk.grpnames)) 56 for igrp in range(len(blk.grpnames)): 57 self.assertEqual(newblk.grpnames[igrp], blk.grpnames[igrp])
58 - def _check_block_bc(self, newblk, blk):
59 from ...boundcond import interface 60 self.assertTrue((newblk.bndfcs == blk.bndfcs).all()) 61 self.assertEqual(len(newblk.bclist), len(blk.bclist)) 62 for ibc in range(len(newblk.bclist)): 63 try: 64 newbc = newblk.bclist[ibc] 65 bc = blk.bclist[ibc] 66 # meta data. 67 self.assertEqual(newbc.sern, bc.sern) 68 self.assertEqual(newbc.name, bc.name) 69 self.assertNotEqual(newbc.blk, bc.blk) 70 self.assertEqual(newbc.blkn, bc.blkn) 71 self.assertTrue(newbc.svr == None) 72 # faces. 73 self.assertTrue((newbc.facn[:,:2] == bc.facn[:,:2]).all()) 74 # values. 75 self.assertEqual(newbc.value.shape[1], bc.value.shape[1]) 76 if newbc.value.shape[1] > 0: 77 self.assertTrue((newbc.value == bc.value).all()) 78 # for interface. 79 if isinstance(bc, interface): 80 self.assertTrue(isinstance(newbc, interface)) 81 self.assertEqual(newbc.rblkn, bc.rblkn) 82 self.assertTrue((newbc.rblkinfo == bc.rblkinfo).all()) 83 self.assertTrue((newbc.rclp == bc.rclp).all()) 84 except StandardError as e: 85 msgs = list(e.args) 86 msgs.append('%d-th BC' % ibc) 87 e.args = tuple(msgs) 88 raise
89 - def _check_block_array(self, newblk, blk):
90 # metrics. 91 self.assertTrue((newblk.ndcrd == blk.ndcrd).all()) 92 self.assertTrue((newblk.fccnd == blk.fccnd).all()) 93 self.assertTrue((newblk.fcnml == blk.fcnml).all()) 94 self.assertTrue((newblk.fcara == blk.fcara).all()) 95 self.assertTrue((newblk.clcnd == blk.clcnd).all()) 96 self.assertTrue((newblk.clvol == blk.clvol).all()) 97 # type. 98 self.assertTrue((newblk.fctpn == blk.fctpn).all()) 99 self.assertTrue((newblk.cltpn == blk.cltpn).all()) 100 self.assertTrue((newblk.clgrp == blk.clgrp).all()) 101 # connectivity. 102 self.assertTrue((newblk.fcnds == blk.fcnds).all()) 103 self.assertTrue((newblk.fccls == blk.fccls).all()) 104 self.assertTrue((newblk.clnds == blk.clnds).all()) 105 self.assertTrue((newblk.clfcs == blk.clfcs).all()) 106 # ghost metrics. 107 self.assertTrue((newblk.gstndcrd == blk.gstndcrd).all()) 108 self.assertTrue((newblk.gstfccnd == blk.gstfccnd).all()) 109 self.assertTrue((newblk.gstfcnml == blk.gstfcnml).all()) 110 self.assertTrue((newblk.gstfcara == blk.gstfcara).all()) 111 self.assertTrue((newblk.gstclcnd == blk.gstclcnd).all()) 112 self.assertTrue((newblk.gstclvol == blk.gstclvol).all()) 113 # ghost type. 114 self.assertTrue((newblk.gstfctpn == blk.gstfctpn).all()) 115 self.assertTrue((newblk.gstcltpn == blk.gstcltpn).all()) 116 self.assertTrue((newblk.gstclgrp == blk.gstclgrp).all()) 117 # ghost connectivity. 118 self.assertTrue((newblk.gstfcnds == blk.gstfcnds).all()) 119 self.assertTrue((newblk.gstfccls == blk.gstfccls).all()) 120 self.assertTrue((newblk.gstclnds == blk.gstclnds).all()) 121 self.assertTrue((newblk.gstclfcs == blk.gstclfcs).all()) 122 # shared metrics. 123 self.assertTrue((newblk.shndcrd == blk.shndcrd).all()) 124 self.assertTrue((newblk.shfccnd == blk.shfccnd).all()) 125 self.assertTrue((newblk.shfcnml == blk.shfcnml).all()) 126 self.assertTrue((newblk.shfcara == blk.shfcara).all()) 127 self.assertTrue((newblk.shclcnd == blk.shclcnd).all()) 128 self.assertTrue((newblk.shclvol == blk.shclvol).all()) 129 # shared type. 130 self.assertTrue((newblk.shfctpn == blk.shfctpn).all()) 131 self.assertTrue((newblk.shcltpn == blk.shcltpn).all()) 132 self.assertTrue((newblk.shclgrp == blk.shclgrp).all()) 133 # shared connectivity. 134 self.assertTrue((newblk.shfcnds == blk.shfcnds).all()) 135 self.assertTrue((newblk.shfccls == blk.shfccls).all()) 136 self.assertTrue((newblk.shclnds == blk.shclnds).all()) 137 self.assertTrue((newblk.shclfcs == blk.shclfcs).all())
138
139 -class TestReloadTrivial(CheckDomainIO):
140 - def test_whole_domain(self):
141 from tempfile import mkdtemp 142 from shutil import rmtree 143 from ...domain import Collective 144 from ..domain import DomainIO 145 npart = 3 146 # create original domain. 147 blk = get_sample_neu() 148 doo = Collective(blk=blk) 149 doo.split(npart) 150 dio = DomainIO(compressor='gz') 151 # save and reload to new domain. 152 dirname = mkdtemp() 153 dio.save(dom=doo, dirname=dirname) 154 don = dio.load(dirname=dirname, with_split=True) 155 rmtree(dirname) 156 # check domain. 157 self._check_domain_shape(don, doo) 158 self._check_domain_array(don, doo) 159 # check whole block. 160 self._check_block_shape(don.blk, doo.blk) 161 self._check_block_group(don.blk, doo.blk) 162 self._check_block_bc(don.blk, doo.blk) 163 self._check_block_array(don.blk, doo.blk) 164 # check split blocks. 165 for iblk in range(npart): 166 try: 167 self._check_block_shape(don[iblk], doo[iblk]) 168 self._check_block_group(don[iblk], doo[iblk]) 169 self._check_block_bc(don[iblk], doo[iblk]) 170 self._check_block_array(don[iblk], doo[iblk]) 171 except StandardError as e: 172 msgs = list(e.args) 173 msgs.append('%d-th block' % iblk) 174 e.args = tuple(msgs) 175 raise
176
177 - def test_single_block(self):
178 from tempfile import mkdtemp 179 from shutil import rmtree 180 from ...domain import Collective 181 from ..domain import DomainIO 182 npart = 3 183 # create original domain. 184 blk = get_sample_neu() 185 doo = Collective(blk=blk) 186 doo.split(npart) 187 dio = DomainIO(compressor='gz') 188 # save and reload to new domain. 189 dirname = mkdtemp() 190 dio.save(dom=doo, dirname=dirname) 191 don = dio.load_block(dirname=dirname) 192 # check whole block. 193 blk = dio.load_block(dirname=dirname, blkid=None, bcmapper=None) 194 self._check_block_shape(blk, doo.blk) 195 self._check_block_group(blk, doo.blk) 196 self._check_block_bc(blk, doo.blk) 197 self._check_block_array(blk, doo.blk) 198 # check split blocks. 199 for iblk in range(npart): 200 try: 201 blk = dio.load_block(dirname=dirname, blkid=iblk, 202 bcmapper=None) 203 self._check_block_shape(blk, doo[iblk]) 204 self._check_block_group(blk, doo[iblk]) 205 self._check_block_bc(blk, doo[iblk]) 206 self._check_block_array(blk, doo[iblk]) 207 except StandardError as e: 208 msgs = list(e.args) 209 msgs.append('%d-th block' % iblk) 210 e.args = tuple(msgs) 211 raise 212 # finalize. 213 rmtree(dirname)
214
215 - def test_limited_domain(self):
216 from tempfile import mkdtemp 217 from shutil import rmtree 218 from ...domain import Collective 219 from ..domain import DomainIO 220 npart = 3 221 # create original domain. 222 blk = get_sample_neu() 223 doo = Collective(blk=blk) 224 doo.split(npart) 225 dio = DomainIO(compressor='gz') 226 # save and reload to new domain. 227 dirname = mkdtemp() 228 dio.save(dom=doo, dirname=dirname) 229 don = dio.load(dirname=dirname) 230 rmtree(dirname) 231 # check domain. 232 self._check_domain_shape(don, doo) 233 self._check_domain_array(don, doo) 234 # check whole block. 235 self._check_block_shape(don.blk, doo.blk) 236 self._check_block_group(don.blk, doo.blk) 237 self._check_block_bc(don.blk, doo.blk) 238 self._check_block_array(don.blk, doo.blk) 239 # check split blocks. 240 self.assertEqual(len(don), 0)
241