1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Remote procedure call and inter-process communication.
21 """
22
23 DEFAULT_AUTHKEY = 'solvcon.rpc'
24 DEFAULT_SLEEP = 0.1
27 """
28 Signaling termination of Worker event loop.
29 """
30 pass
31
33 """
34 Base class for notification for ipc.
35 """
36 pass
40 """
41 The command to remote process. The base class is for all kinds of commands
42 designated to Muscle object.
43 """
44 - def __init__(self, methodname, *args, **kw):
45 self.with_worker = kw.pop('with_worker', False)
46 self.methodname = methodname
47 self.args = args
48 self.kw = kw
50 """
51 Special commands to Worker object.
52 """
53 pass
54
56 """
57 The whole worker object will run remotely (means in separated process).
58
59 @ivar muscle: the muscle object.
60 @itype muscle: Muscle
61 @ivar serial: serial number of the worker process.
62 @itype serial: int
63 @ivar lsnr: the listener object for master.
64 @itype lsnr: solvcon.connection.Listener
65 @ivar conn: the connection object to master.
66 @itype conn: solvcon.connection.Client
67 @ivar plsnrs: dictionary of listener objects for peers.
68 @itype plsnrs: dict
69 @ivar pconns: dictionary of connection objects to peers.
70 @itype pconns: dict
71 """
72 - def __init__(self, muscle, profiler_data=None):
73 from .conf import env
74 self.muscle = muscle
75 self.serial = None
76 self.lsnr = None
77 self.conn = None
78 self.plsnrs = dict()
79 self.pconns = dict()
80 self.do_profile = True if profiler_data else False
81 self.profiler_dat = profiler_data[0] if profiler_data else None
82 self.profiler_log = profiler_data[1] if profiler_data else None
83 self.profiler_sort = profiler_data[2] if profiler_data else None
84
86 """
87 Event loop.
88 """
89 while True:
90 ntc = self.conn.recv()
91 try:
92 if isinstance(ntc, Command):
93 obj = self.muscle
94 if isinstance(ntc, Control):
95 obj = self
96 method = getattr(obj, ntc.methodname)
97 if ntc.with_worker:
98 ntc.kw.update(worker=self)
99 ret = method(*ntc.args, **ntc.kw)
100 except Terminate:
101 break
102
104 import cProfile
105 import pstats
106 from .conf import env
107 if self.do_profile:
108 if env.mpi:
109 self.profiler_dat += '%d' % env.mpi.rank
110 self.profiler_log += '%d' % env.mpi.rank
111 cProfile.runctx('self._eventloop()', globals(), locals(),
112 self.profiler_dat)
113 plog = open(self.profiler_log, 'w')
114 p = pstats.Stats(self.profiler_dat, stream=plog)
115 p.sort_stats(*self.profiler_sort.split(','))
116 p.dump_stats(self.profiler_dat)
117 p.print_stats()
118 plog.close()
119 else:
120 self._eventloop()
121
122 - def register(self, address, authkey, *args, **kw):
123 """
124 Connect to remote listener and run event lop. In this case, the worker
125 don't have a valid listener since it acts as a client to the dealer.
126
127 @param address: address to connect.
128 @type address: str or tuple
129 @param authkey: authentication key for connection.
130 @type authkey: str
131 """
132 from time import sleep
133 from .connection import Client
134
135 conn = Client(address=address, authkey=authkey)
136
137 address, authkey, wait_for_connect = conn.recv()
138
139 conn.close()
140 sleep(wait_for_connect)
141
142 self.conn = Client(address=address, authkey=authkey)
143
144 self.eventloop()
145
146 - def run(self, address, authkey, *args, **kw):
147 """
148 Listen to given address and run event loop. In this case, the worker
149 has the listener and acts as a server.
150
151 @param address: address to listen.
152 @type address: str or tuple
153 @param authkey: authentication key for connection.
154 @type authkey: str
155 """
156 from .conf import env
157 from .connection import Listener
158
159 self.lsnr = Listener(address=address, authkey=authkey)
160 if address[1] == 0 and env.mpi:
161 env.mpi.send(self.lsnr.address[1], 0, 1)
162 self.conn = self.lsnr.accept()
163
164 self.eventloop()
165
166 - def chdir(self, dirname):
167 import os
168 os.chdir(dirname)
169
171 """
172 Remotely set attribute of worker.
173 """
174 return setattr(self, name, var)
175
177 """
178 Remotely unpickle a file and set it to self with the specified name.
179 """
180 import cPickle as pickle
181 setattr(self, name, pickle.load(open(objfn)))
182
184 """
185 Send barrier signal for synchronization.
186 """
187 self.conn.send(Barrier)
188
190 """
191 Accept connection from specified peer.
192
193 @param peern: index of the peer who wants to connect.
194 @type peern: int
195 @param family: the family of address needed to be guessed.
196 @type family: str
197 @param authkey: authentication key for connection.
198 @type authkey: str
199 """
200 lsnr = self.lsnr
201 self.conn.send(lsnr.address)
202
203 conn = lsnr.accept()
204
205 self.plsnrs[peern] = lsnr
206 self.pconns[peern] = conn
207
209 """
210 Make a connection to specified peer (it has to be accepting
211 connection).
212
213 @param peern: index of the peer who wants to connect.
214 @type peern: int
215 @param address: the address to connect to.
216 @type address: str or tuple
217 @param authkey: authentication key for connection.
218 @type authkey: str
219 """
220 from .connection import Client
221 conn = Client(address=address, authkey=authkey)
222 self.pconns[peern] = conn
223
225 """
226 Create MPI proxy for a pair of p2p connection.
227
228 @param src: source worker ID.
229 @type src: int
230 @param dst: destination worker ID.
231 @type dst: int
232 """
233 from .connection import MPIConnection
234 self.pconns[dst] = MPIConnection(src+1, dst+1)
235
237 port = self.mpi.recv(dst, tag)
238 self.conn.send(port)
239
242
243 - def create_solver(self, bcmap, dirname, iblk, nblk, solvertype, svrkw):
244 """
245 Load a block and create a solver object with the given information, and
246 set it to muscle.
247
248 @param bcmap: BC mapper.
249 @type bcmap: dict
250 @param dirname: the directory of saved domain object.
251 @type dirname: str
252 @param iblk: index of the block to be loaded.
253 @type iblk: int
254 @param nblk: number of total blocks (sub-domains).
255 @type nblk: int
256 @param solvertype: the type of solver to be created.
257 @type solvertype: type
258 @param svrkw: keywords passed to the constructor of solver.
259 @type svrkw: dict
260 @return: nothing
261 """
262 from .io.domain import DomainIO
263 dio = DomainIO()
264 blk = dio.load_block(dirname=dirname, blkid=iblk, bcmapper=bcmap)
265 svr = solvertype(blk, **svrkw)
266 svr.svrn = iblk
267 svr.nsvr = nblk
268 svr.unbind()
269 self.muscle = svr
270
272 """
273 Create an anchor object and append it to the solver muscle.
274
275 @param ankcls: anchor type.
276 @type ankcls: type
277 @param ankkw: keywords to the constructor of the anchor.
278 @type ankkw: dict
279 @return: nothing
280 """
281 self.muscle.runanchors.append(ankcls, **ankkw)
282
284 """
285 Remote agent to worker.
286
287 @ivar conn: connection to the worker.
288 @itype conn: solvcon.connection.Client
289 @ivar noticetype: type of notice object to send.
290 @itype noticetype: Notice
291 """
293 self.conn = conn
294 self.noticetype = noticetype
295
297 conn = self.conn
298 ntype = self.noticetype
299 def func(*arg, **kw):
300 conn.send(ntype(name, *arg, **kw))
301 return func
302
304 """
305 Convenient wrapper for two agents that send commands to remote worker and
306 muscle. The default agent is to the worker (ctl).
307
308 @ivar lsnr: listener to worker.
309 @itype lsnr: solvcon.connection.Listener
310 @ivar conn: connection to the worker.
311 @itype conn: solvcon.connection.Client
312 @ivar address: remote address.
313 @itype address: tuple or str
314 @ivar cmd: agent to muscle.
315 @itype cmd: Agent
316 @ivar ctl: agent to worker.
317 @itype ctl: Agent
318 """
319 - def __init__(self, lsnr=None, conn=None, address=None):
325
327 """
328 Default to worker.
329 """
330 return getattr(self.ctl, name)
331
332 - def recv(self, *args, **kw):
333 """
334 Receive data from worker/muscle.
335 """
336 return self.conn.recv(*args, **kw)
337
339 """
340 Contains shadows to workers. Workers can be hired or recruited. A hired
341 worker is local to the dealer so that the dealer can directly start it. A
342 recruited worker is remote, and the a dealer can only wait for it to
343 register. A recruited worker is instantiated by itself, in a standalone
344 process, and usually remotely.
345
346 @ivar publicaddress: the public address for worker to gain connection
347 information. It is used for recruitment.
348 @itype publicaddress: tuple or str
349 @ivar authkey: authentication key for worker connection.
350 @itype authkey: str
351 @ivar family: connection family for automatically connection generation.
352 Can be 'AF_PIPE', 'AF_UNIX', or 'AF_INET'.
353 @itype family: str
354 """
355 WAIT_FOR_ACCEPT = 0.1
356
358 import sys
359 self.publicaddress = kw.pop('publicaddress', None)
360 self.authkey = kw.pop('authkey', DEFAULT_AUTHKEY)
361 self.family = kw.pop('family', None)
362 if self.family == None:
363 if sys.platform.startswith('win'):
364 self.family = 'AF_PIPE'
365 elif sys.platform.startswith('linux'):
366 self.family = 'AF_UNIX'
367 else:
368 self.family = 'AF_INET'
369 super(Dealer, self).__init__(*args, **kw)
370 self.spanhead = None
371
372 - def hire(self, worker, inetaddr=None, wait_for_accept=None):
373 """
374 Create a process for a worker object. The worker will be sent to
375 the process.
376
377 @param worker: worker object.
378 @type worker: Worker
379 @keyword wait_for_accept: seconds to wait after accepting. If None use
380 DEFAULT.
381 @type wait_for_accept: float
382 """
383 from time import sleep
384 from multiprocessing import Process
385 from .connection import guess_address, Client
386
387 address = guess_address(self.family)
388 proc = Process(
389 target=worker.run,
390 args=(address, self.authkey),
391 )
392 proc.start()
393 sleep(wait_for_accept if wait_for_accept!=None else self.WAIT_FOR_ACCEPT)
394
395 conn = Client(address=address, authkey=self.authkey)
396 shadow = Shadow(conn=conn, address=address)
397 shadow.remote_setattr('serial', len(self))
398 self.append(shadow)
399
400 - def appoint(self, inetaddr, port, authkey):
401 """
402 @param inetaddr: the IP/DN of the machine to build the worker.
403 @type inetaddr: str
404 @param port: the port that the remote worker listen on.
405 @type: int
406 @param authkey: the authkey for the worker.
407 @type authkey: str
408 @return: nothing
409 """
410 from .connection import Client
411 conn = Client(address=(inetaddr, port), authkey=authkey)
412 shadow = Shadow(conn=conn, address=(inetaddr, port))
413 shadow.remote_setattr('serial', len(self))
414 self.append(shadow)
415
416 - def bridge(self, peers, wait_for_accept=None):
417 """
418 Tell two peering worker to establish a connection.
419 """
420 from time import sleep
421 from .conf import env
422 plow, phigh = peers
423 assert plow != phigh
424 if plow > phigh:
425 tmp = plow
426 plow = phigh
427 phigh = tmp
428 if env.mpi:
429 self[phigh].set_peer(phigh, plow)
430 self[plow].set_peer(plow, phigh)
431 else:
432
433 self[phigh].accept_peer(plow, self.family, self.authkey)
434 address = self[phigh].recv()
435 if address != self[phigh].address:
436 raise ValueError('%s != %s' % (
437 str(address), str(self[phigh].address)))
438
439 sleep(wait_for_accept
440 if wait_for_accept!=None else self.WAIT_FOR_ACCEPT)
441 self[plow].connect_peer(phigh, address, self.authkey)
442
443 - def span(self, graph):
449
450 - def terminate(self, idx=slice(None,None,None), msg=None):
451 """
452 Termiinate workers.
453
454 @param idx: what to terminate
455 @type idx: slice or list
456 @param msg: message to output after temination.
457 @type msg: str
458 """
459 import sys
460 for sdw in self[idx]:
461 sdw.terminate()
462 if msg:
463 sys.stdout.write(msg)
464
465 - def barrier(self, idx=slice(None,None,None), msg=None):
466 """
467 Check for barrier signals sent from workers. Used for synchronization.
468
469 @param idx: what to synchronize.
470 @type idx: slice or list
471 @param msg: message to output after synchronization.
472 @type msg: str
473 """
474 import sys
475 for sdw in self[idx]:
476 sdw.barrier()
477 for sdw in self[idx]:
478 assert issubclass(sdw.recv(), Barrier)
479 if msg:
480 sys.stdout.write(msg)
481
487 """
488 Remote execution through ssh.
489
490 @cvar DEFAULT_SSH_CONFIG: default ssh configuration options.
491 @ctype DEFAULT_SSH_CONFIG: dict
492 @ivar address: inet address.
493 @itype address: str
494 @ivar username: username for the connecting machine.
495 @itype username: str
496 @ivar prescript: list of the Python statements to prepend before the main
497 body of execution.
498 @itype prescript: list
499 @ivar paths: dict of lists for various environmental variables for paths.
500 @itype paths: dict
501 @ivar ssh_config: ssh configuration options.
502 @itype ssh_config: dict
503 """
504
505 DEFAULT_SSH_CONFIG = {
506 'UserKnownHostsFile': '/dev/null',
507 'StrictHostKeyChecking': 'no',
508 'LogLevel': 'ERROR',
509 }
510
511 - def __init__(self, address,
512 username=None, prescript=None, paths=None, ssh_config=None):
513 from .helper import get_username
514 from .conf import env
515 self.address = address
516 self.username = username if username else get_username()
517 self.prescript = prescript if prescript != None else list()
518
519 paths = paths if paths != None else dict()
520 paths.setdefault('PYTHONPATH', list())
521 if env.pkgdir not in paths['PYTHONPATH']:
522 paths['PYTHONPATH'].append(env.pkgdir)
523 self.paths = paths
524
525 self.ssh_config = self.DEFAULT_SSH_CONFIG.copy()
526 if ssh_config != None:
527 self.ssh_config.update(ssh_config)
528
529 @staticmethod
531 from .helper import iswin
532 pathlist = pathlist[:]
533
534 ip = 0
535 while ip < len(pathlist):
536 jp = ip + 1
537 while jp < len(pathlist):
538 if pathlist[ip] == pathlist[jp]:
539 del pathlist[jp]
540 else:
541 jp += 1
542 ip += 1
543
544 sep = ';' if iswin() else ':'
545 pathstr = sep.join([path for path in pathlist if path])
546 return 'export %s=%s:$%s' % (key, pathstr, key)
547
548 @property
556
557 - def shell(self, script):
558 from subprocess import Popen, PIPE, STDOUT
559 cmds = self.ssh_cmds
560 cmds.append('; '.join(script))
561 subp = Popen(cmds, stdout=PIPE, stderr=STDOUT)
562 return subp.stdout.read()
563
564 - def __call__(self, script, envar=None, stdout=None):
565 """
566 @param script: the script to be send to remote machine to execute.
567 @type script: list
568 @keywork envar: additional environment variables to remote.
569 @type envar: dict
570 """
571 import sys
572 from subprocess import Popen
573 script = self.prescript + script
574
575 remote_cmds = [self._pathmunge(k, self.paths[k]) for k in self.paths]
576 if envar:
577 remote_cmds.extend([
578 'export %s=%s' % (key, envar[key]) for key in envar
579 ])
580 remote_cmds.append('%s -c \'%s\''%(sys.executable, '; '.join(script)))
581
582 ssh_cmds = self.ssh_cmds
583
584 subp = Popen(ssh_cmds + ['; '.join(remote_cmds)], stdout=stdout)
585
586 if subp.stdout != None:
587 return subp.stdout.read()
588 else:
589 return None
590