1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Definition of the structure of solvers.
21 """
22
23 from ctypes import Structure
24 from .gendata import TypeWithBinder
25
26 ALMOST_ZERO = 1.e-200
29 """
30 Execution information for BaseSolver.
31 """
32 from ctypes import c_int, c_double
33 _fields_ = [
34 ('ncore', c_int), ('neq', c_int),
35 ('time', c_double), ('time_increment', c_double),
36 ]
37 del c_int, c_double
39 svr = kw.pop('svr', None)
40 super(BaseSolverExedata, self).__init__(*args, **kw)
41 if svr == None:
42 return
43 for key in ('ncore', 'neq', 'time', 'time_increment'):
44 setattr(self, key, getattr(svr, key))
45
47 """
48 Generic solver definition. It is an abstract class and should not be used
49 to any concrete simulation case. The concrete solver sub-classes should
50 override the empty init and final methods for initialization and
51 finalization, respectively.
52
53 @cvar _clib_solve: the external dll (accessible through ctypes) which do
54 the cell loop. Subclass should override it.
55 @ctype _clib_solve: ctypes.CDLL
56 @cvar _exedatatype_: the C struct definition in ctypes.Structure.
57 @ctype _exedatatype_: ctypes.Structure
58
59 @cvar MESG_FILENAME_DEFAULT = the default file name for serial solver
60 object.
61
62 @ivar _fpdtype: dtype for the floating point data in the block instance.
63 @itype _fpdtype: numpy.dtype
64
65 @ivar enable_mesg: flag if mesg device should be enabled.
66 @itype enable_mesg: bool
67 @ivar mesg: message printer attached to a certain solver object; designed
68 and mainly used for parallel solver.
69 @itype mesg: solvcon.helper.Printer
70
71 @ivar runanchors: the list for the anchor objects to be run.
72 @itype runanchors: solvcon.anchor.AnchorList
73
74 @ivar exd: execution information for the solver.
75 @itype exd: ctypes.Structure
76 @ivar enable_tpool: flag to enable thread pool on binding.
77 @itype enable_tpool: bool
78 @ivar tpool: thread pool for solver.
79 @itype tpool: solvcon.mthread.ThreadPool
80 @ivar arglists: argument lists for C functions to be executed in the
81 thread pool.
82 @itype arglists: list
83 @ivar mmnames: marching methods name.
84 @itype mmnames: list
85 @ivar marchret: return value set for march.
86
87 @ivar der: the dictionary to put derived data arrays. Mostly used by
88 Anchors.
89 @itype der: dict
90 """
91
92 __metaclass__ = TypeWithBinder
93 _pointers_ = ['exd', 'tpool', 'arglists']
94
95 _exedatatype_ = BaseSolverExedata
96 _clib_solve = None
97
98 MESG_FILENAME_DEFAULT = 'solvcon.solver.log'
99 MMNAMES = []
100
102 from .conf import env
103 from .anchor import AnchorList
104 from .gendata import Timer
105 self._fpdtype = kw.pop('fpdtype', env.fpdtype)
106 self._fpdtype = env.fpdtype if self._fpdtype==None else self._fpdtype
107 self.enable_mesg = kw.pop('enable_mesg', False)
108 self.mesg = None
109 self.enable_tpool = kw.pop('enable_tpool', True)
110 self.ncore = kw.pop('ncore', -1)
111 self.neq = kw.pop('neq')
112 self.time = kw.pop('time', 0.0)
113 self.time_increment = kw.pop('time_increment', 0.0)
114 self.step_global = 0
115 self.step_current = 0
116 self.substep_current = 0
117 self.substep_run = kw.pop('substep_run', 2)
118
119 self.runanchors = AnchorList(self)
120
121 self.exd = None
122 self.tpool = None
123 self.arglists = None
124
125 self.mmnames = self.MMNAMES[:]
126 self.marchret = None
127
128 self.timer = Timer(vtype=float)
129 self.ticker = dict()
130
131 self.der = dict()
132
133 @property
135 import numpy
136 _fpdtype = self._fpdtype
137 if isinstance(_fpdtype, str):
138 return getattr(numpy, _fpdtype)
139 else:
140 return self._fpdtype
141 @property
145 @property
149
150 @staticmethod
152 f = open('/proc/stat')
153 data = f.read()
154 f.close()
155 cpulist = [line for line in data.split('\n') if
156 line.startswith('cpu')]
157 cpulist = [line for line in cpulist if line.split()[0] != 'cpu']
158 return len(cpulist)
159
161 """
162 Create the message outputing device, which is intended for debugging
163 and outputing messages related to the solver. The outputing device is
164 most useful when running distributed solvers. The created device will
165 be attach to self.
166
167 @keyword force: flag to force the creation. Default False,
168 @type force: bool
169
170 @return: nothing
171 """
172 import os
173 from .helper import Printer
174 if force: self.enable_mesg = True
175 if self.enable_mesg:
176 if self.svrn != None:
177 main, ext = os.path.splitext(self.MESG_FILENAME_DEFAULT)
178 tmpl = main + '%d' + ext
179 dfn = tmpl % self.svrn
180 dprefix = 'SOLVER%d: ' % self.svrn
181 else:
182 dfn = self.MESG_FILENAME_DEFAULT
183 dprefix = ''
184 else:
185 dfn = os.devnull
186 dprefix = ''
187 self.mesg = Printer(dfn, prefix=dprefix, override=True)
188
189 - def dump(self, objfn):
190 """
191 Pickle self into the given filename.
192
193 @parameter objfn: the output filename.
194 @type objfn: str
195 """
196 import cPickle as pickle
197 holds = dict()
198 self.unbind()
199 for key in ['mesg',]:
200 holds[key] = getattr(self, key)
201 setattr(self, key, None)
202 pickle.dump(self, open(objfn, 'wb'), pickle.HIGHEST_PROTOCOL)
203 for key in holds:
204 setattr(self, key, holds[key])
205 self.bind()
206
207 - def _tcall(self, *args, **kw):
208 """
209 Use thread pool to call C functions in parallel (shared-memory).
210 """
211 if not self.tpool:
212 raise RuntimeError('tpool is not available in %s'%str(self))
213 from ctypes import byref, c_int
214 from numpy import zeros
215 ncore = self.ncore
216 cfunc = args[0]
217 iter_start = args[1]
218 iter_end = args[2]
219 tickerkey = kw.pop('tickerkey', None)
220 if ncore > 0:
221 if len(args)>3:
222 alsts = list()
223 for it in range(self.ncore):
224 alst = [byref(self.exd), c_int(0), c_int(0)]
225 alst.extend(args[3:])
226 alsts.append(alst)
227 else:
228 alsts = self.arglists
229 incre = (iter_end-iter_start)/ncore + 1
230 istart = iter_start
231 for it in range(ncore):
232 iend = min(istart+incre, iter_end)
233 alsts[it][1].value = istart
234 alsts[it][2].value = iend
235 istart = iend
236 ret = self.tpool(cfunc, alsts)
237 else:
238 alst = [byref(self.exd), c_int(iter_start), c_int(iter_end)]
239 alst.extend(args[3:])
240 ret = [cfunc(*alst)]
241 if tickerkey != None:
242 if tickerkey not in self.ticker:
243 self.ticker[tickerkey] = zeros(len(ret), dtype='int32')
244 for it in range(len(ret)):
245 self.ticker[tickerkey][it] += ret[it]
246 return ret
247
249 """
250 Put everything that cannot be pickled, such as file objects, ctypes
251 pointers, etc., into self.
252
253 @return: nothing
254 """
255 import sys
256 from ctypes import byref, c_int
257 from solvcon.mthread import ThreadPool
258
259 if self.mesg == None: self.__create_mesg()
260
261 if self.ncore == -1 and sys.platform.startswith('linux2'):
262 self.ncore = self.detect_ncore()
263
264 exdtype = self._exedatatype_
265 self.exd = exdtype(svr=self)
266
267 if self.enable_tpool:
268 self.tpool = ThreadPool(nthread=self.ncore)
269 self.arglists = list()
270 for it in range(self.ncore):
271 self.arglists.append([byref(self.exd), c_int(0), c_int(0)])
272
273 - def init(self, **kw):
277
279 self.runanchors('provide')
281 self.runanchors('preloop')
282 - def postloop(self):
283 self.runanchors('postloop')
285 self.runanchors('exhaust')
286
293 - def march(self, time, time_increment, steps_run, worker=None):
294 """
295 Default marcher for the solver object.
296
297 @param time: starting time of marching.
298 @type time: float
299 @param time_increment: temporal interval for the time step.
300 @type time_increment: float
301 @param steps_run: the count of time steps to run.
302 @type steps_run: int
303 @return: arbitrary return value.
304 @rtype: float
305 """
306 from time import time as _time
307 self.marchret = dict()
308 self.step_current = 0
309 self.runanchors('premarch')
310 while self.step_current < steps_run:
311 self.substep_current = 0
312 self.runanchors('prefull')
313 t0= _time()
314 while self.substep_current < self.substep_run:
315 self._set_time(time, time_increment)
316 self.runanchors('presub')
317
318 for mmname in self.mmnames:
319 method = getattr(self, mmname)
320 t1 = _time()
321 self.runanchors('pre'+mmname)
322 t2 = _time()
323 method(worker=worker)
324 self.timer.increase(mmname, _time() - t2)
325 self.runanchors('post'+mmname)
326 self.timer.increase(mmname+'_a', _time() - t1)
327
328 time += time_increment/self.substep_run
329 self._set_time(time, time_increment)
330 self.substep_current += 1
331 self.runanchors('postsub')
332 self.timer.increase('march', _time() - t0)
333 self.step_global += 1
334 self.step_current += 1
335 self.runanchors('postfull')
336 self.runanchors('postmarch')
337 if worker:
338 worker.conn.send(self.marchret)
339 return self.marchret
340
342 """
343 Faked block from solver for being used by VTK.
344 """
353
355 """
356 Generic class for multi-dimensional (implemented with Block)
357 sequential/parallel solvers. Meta, metric, and connectivity data arrays
358 are absorbed into the instance of this class.
359
360 Before the invocation of init() method, bind() method must be called.
361
362 @note: When subclass BlockSolver, in the init() method in the subclass must
363 be initilized first, and the super().init() can then be called. Otherwise
364 the BCs can't set correct information to the solver.
365
366 @cvar _interface_init_: list of attributes (arrays) to be exchanged on
367 interface when initialized.
368 @ctype _interface_init_: list
369
370 @ivar ibcthread: flag if using threads.
371 @itype ibcthread: bool
372
373 @ivar svrn: serial number of solver object.
374 @itype svrn: int
375 @ivar nsvr: number of solver objects.
376 @itype nsvr: int
377
378 @ivar grpnames: list of names of groups.
379 @itype grpnames: list
380 @ivar ngroup: number of groups.
381 @itype ngroup: int
382
383 @ivar bclist: list of BCs.
384 @itype bclist: list
385 @ivar ibclist: list of interface BCs.
386 @itype ibclist: list
387 @ivar all_simplex: True if the mesh is all-simplex, False otherwise.
388 @itype all_simplex: bool
389 @ivar use_incenter: True if the mesh uses incenters, False otherwise.
390 @itype use_incenter: bool
391 """
392
393 _interface_init_ = []
394 _solution_array_ = []
395
396 from .block import Block
397 FCMND = Block.FCMND
398 CLMND = Block.CLMND
399 CLMFC = Block.CLMFC
400 del Block
401
448
449 @property
458
460 """
461 Bind all the boundary condition objects.
462
463 @note: BC must be bound AFTER solver "pointers". Overridders to the
464 method should firstly bind all pointers, secondly super binder, and
465 then methods/subroutines.
466 """
467 super(BlockSolver, self).bind()
468
469 for bc in self.bclist:
470 bc.bind()
471
473 """
474 Unbind all the boundary condition objects.
475 """
476 super(BlockSolver, self).unbind()
477 for bc in self.bclist:
478 bc.unbind()
479
480 @property
482 """
483 Check boundness for solver as well as BC objects.
484 """
485 if not super(BlockSolver, self).is_bound:
486 return False
487 else:
488 for bc in self.bclist:
489 if not bc.is_bound:
490 return False
491 return True
492
493 @property
495 """
496 Check unboundness for solver as well as BC objects.
497 """
498 if not super(BlockSolver, self).is_unbound:
499 return False
500 else:
501 for bc in self.bclist:
502 if not bc.is_unbound:
503 return False
504 return True
505
506 - def init(self, **kw):
507 """
508 Check and initialize BCs.
509 """
510 for arrname in self._solution_array_:
511 arr = getattr(self, arrname)
512 arr.fill(ALMOST_ZERO)
513 for bc in self.bclist:
514 bc.init(**kw)
515 super(BlockSolver, self).init(**kw)
516
518 """
519 Update the boundary conditions.
520
521 @return: nothing.
522 """
523 pass
524
526 """
527 Call method of each of non-interface BC objects in my list.
528
529 @param name: name of the method of BC to call.
530 @type name: str
531 @return: nothing
532 """
533 from .boundcond import interface
534 bclist = [bc for bc in self.bclist if not isinstance(bc, interface)]
535 for bc in bclist:
536 try:
537 getattr(bc, name)(*args, **kw)
538 except Exception as e:
539 e.args = tuple([str(bc), name] + list(e.args))
540 raise
541
542
543
544
546 """
547 Remotely set attribute of worker.
548 """
549 return setattr(self, name, var)
550
551 - def pull(self, arrname, inder=False, worker=None):
552 """
553 Pull data array to dealer (rpc) through worker object.
554
555 @param arrname: the array to pull to master.
556 @type arrname: str
557 @param inder: the data array is derived data array.
558 @type inder: bool
559 @keyword worker: the worker object for communication.
560 @type worker: solvcon.rpc.Worker
561 @return: nothing.
562 """
563 conn = worker.conn
564 if inder:
565 arr = self.der[arrname]
566 else:
567 arr = getattr(self, arrname)
568 conn.send(arr)
569
570 - def push(self, marr, arrname, start=0, inder=False):
571 """
572 Push data array received from dealer (rpc) into self.
573
574 @param marr: the array passed in.
575 @type marr: numpy.ndarray
576 @param arrname: the array to pull to master.
577 @type arrname: str
578 @param start: the starting index of pushing.
579 @type start: int
580 @param inder: the data array is derived data array.
581 @type inder: bool
582 @return: nothing.
583 """
584 if inder:
585 arr = self.der[arrname]
586 else:
587 arr = getattr(self, arrname)
588 arr[start:] = marr[start:]
589
590 - def pullank(self, ankname, objname, worker=None):
591 """
592 Pull data array to dealer (rpc) through worker object.
593
594 @param ankname: the name of related anchor.
595 @type ankname: str
596 @param objname: the object to pull to master.
597 @type objname: str
598 @keyword worker: the worker object for communication.
599 @type worker: solvcon.rpc.Worker
600 @return: nothing.
601 """
602 conn = worker.conn
603 obj = getattr(self.runanchors[ankname], objname)
604 conn.send(obj)
605
607 from .boundcond import interface
608
609 ibclist = list()
610 for pair in ifacelist:
611 if pair < 0:
612 ibclist.append(pair)
613 else:
614 assert len(pair) == 2
615 assert self.svrn in pair
616 ibclist.append(sum(pair)-self.svrn)
617
618 for bc in self.bclist:
619 if not isinstance(bc, interface):
620 continue
621 it = ibclist.index(bc.rblkn)
622 sendn, recvn = ifacelist[it]
623 ibclist[it] = bc, sendn, recvn
624 self.ibclist = ibclist
625
627 from time import sleep
628 from threading import Thread
629 threads = list()
630 for ibc in self.ibclist:
631
632 if ibc < 0:
633 continue
634 bc, sendn, recvn = ibc
635
636 if self.svrn == sendn:
637 target = self.pushibc
638 args = arrname, bc, recvn
639 elif self.svrn == recvn:
640 target = self.pullibc
641 args = arrname, bc, sendn
642 else:
643 raise ValueError, 'bc.rblkn = %d != %d or %d' % (
644 bc.rblkn, sendn, recvn)
645 kwargs = {'worker': worker}
646
647 if self.ibcthread:
648 threads.append(Thread(
649 target=target,
650 args=args,
651 kwargs=kwargs,
652 ))
653 threads[-1].start()
654 else:
655 target(*args, **kwargs)
656 if self.ibcthread:
657 for thread in threads:
658 thread.join()
659
660 - def pushibc(self, arrname, bc, recvn, worker=None):
661 """
662 Push data toward selected interface which connect to blocks with larger
663 serial number than myself.
664
665 @param arrname: name of the array in the object to exchange.
666 @type arrname: str
667 @param bc: the interface BC to push.
668 @type bc: solvcon.boundcond.interface
669 @param recvn: serial number of the peer to exchange data with.
670 @type recvn: int
671 @keyword worker: the wrapping worker object for parallel processing.
672 @type worker: solvcon.rpc.Worker
673 """
674 from numpy import empty
675 conn = worker.pconns[bc.rblkn]
676 ngstcell = self.ngstcell
677 arr = getattr(self, arrname)
678
679 shape = list(arr.shape)
680 shape[0] = bc.rclp.shape[0]
681 rarr = empty(shape, dtype=arr.dtype)
682 conn.recvarr(rarr)
683 slct = bc.rclp[:,0] + ngstcell
684 arr[slct] = rarr[:]
685
686 slct = bc.rclp[:,2] + ngstcell
687 conn.sendarr(arr[slct])
688
689 - def pullibc(self, arrname, bc, sendn, worker=None):
690 """
691 Pull data from the interface determined by the serial of peer.
692
693 @param arrname: name of the array in the object to exchange.
694 @type arrname: str
695 @param bc: the interface BC to pull.
696 @type bc: solvcon.boundcond.interface
697 @param sendn: serial number of the peer to exchange data with.
698 @type sendn: int
699 @keyword worker: the wrapping worker object for parallel processing.
700 @type worker: solvcon.rpc.Worker
701 """
702 from numpy import empty
703 conn = worker.pconns[bc.rblkn]
704 ngstcell = self.ngstcell
705 arr = getattr(self, arrname)
706
707 slct = bc.rclp[:,2] + ngstcell
708 conn.sendarr(arr[slct])
709
710 shape = list(arr.shape)
711 shape[0] = bc.rclp.shape[0]
712 rarr = empty(shape, dtype=arr.dtype)
713 conn.recvarr(rarr)
714 slct = bc.rclp[:,0] + ngstcell
715 arr[slct] = rarr[:]
716