Package solvcon :: Module rpc
[hide private]
[frames] | no frames]

Source Code for Module solvcon.rpc

  1  # -*- coding: UTF-8 -*- 
  2  # 
  3  # Copyright (C) 2008-2010 Yung-Yu Chen <yyc@solvcon.net>. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License along 
 16  # with this program; if not, write to the Free Software Foundation, Inc., 
 17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 18   
 19  """ 
 20  Remote procedure call and inter-process communication. 
 21  """ 
 22   
 23  DEFAULT_AUTHKEY = 'solvcon.rpc' 
 24  DEFAULT_SLEEP = 0.1 
25 26 -class Terminate(Exception):
27 """ 28 Signaling termination of Worker event loop. 29 """ 30 pass
31
32 -class Notice(object):
33 """ 34 Base class for notification for ipc. 35 """ 36 pass
37 -class Barrier(Notice):
38 pass
39 -class Command(Notice):
40 """ 41 The command to remote process. The base class is for all kinds of commands 42 designated to Muscle object. 43 """
44 - def __init__(self, methodname, *args, **kw):
45 self.with_worker = kw.pop('with_worker', False) 46 self.methodname = methodname 47 self.args = args 48 self.kw = kw
49 -class Control(Command):
50 """ 51 Special commands to Worker object. 52 """ 53 pass
54
55 -class Worker(object):
56 """ 57 The whole worker object will run remotely (means in separated process). 58 59 @ivar muscle: the muscle object. 60 @itype muscle: Muscle 61 @ivar serial: serial number of the worker process. 62 @itype serial: int 63 @ivar lsnr: the listener object for master. 64 @itype lsnr: solvcon.connection.Listener 65 @ivar conn: the connection object to master. 66 @itype conn: solvcon.connection.Client 67 @ivar plsnrs: dictionary of listener objects for peers. 68 @itype plsnrs: dict 69 @ivar pconns: dictionary of connection objects to peers. 70 @itype pconns: dict 71 """
72 - def __init__(self, muscle, profiler_data=None):
73 from .conf import env 74 self.muscle = muscle 75 self.serial = None 76 self.lsnr = None 77 self.conn = None 78 self.plsnrs = dict() 79 self.pconns = dict() 80 self.do_profile = True if profiler_data else False 81 self.profiler_dat = profiler_data[0] if profiler_data else None 82 self.profiler_log = profiler_data[1] if profiler_data else None 83 self.profiler_sort = profiler_data[2] if profiler_data else None
84
85 - def _eventloop(self):
86 """ 87 Event loop. 88 """ 89 while True: 90 ntc = self.conn.recv() 91 try: 92 if isinstance(ntc, Command): 93 obj = self.muscle 94 if isinstance(ntc, Control): 95 obj = self 96 method = getattr(obj, ntc.methodname) 97 if ntc.with_worker: 98 ntc.kw.update(worker=self) 99 ret = method(*ntc.args, **ntc.kw) 100 except Terminate: 101 break
102
103 - def eventloop(self):
104 import cProfile 105 import pstats 106 from .conf import env 107 if self.do_profile: 108 if env.mpi: 109 self.profiler_dat += '%d' % env.mpi.rank 110 self.profiler_log += '%d' % env.mpi.rank 111 cProfile.runctx('self._eventloop()', globals(), locals(), 112 self.profiler_dat) 113 plog = open(self.profiler_log, 'w') 114 p = pstats.Stats(self.profiler_dat, stream=plog) 115 p.sort_stats(*self.profiler_sort.split(',')) 116 p.dump_stats(self.profiler_dat) 117 p.print_stats() 118 plog.close() 119 else: 120 self._eventloop()
121
122 - def register(self, address, authkey, *args, **kw):
123 """ 124 Connect to remote listener and run event lop. In this case, the worker 125 don't have a valid listener since it acts as a client to the dealer. 126 127 @param address: address to connect. 128 @type address: str or tuple 129 @param authkey: authentication key for connection. 130 @type authkey: str 131 """ 132 from time import sleep 133 from .connection import Client 134 # connect to the public address to the dealer. 135 conn = Client(address=address, authkey=authkey) 136 # get the actual/random/private address from dealer. 137 address, authkey, wait_for_connect = conn.recv() 138 # close the original public connection and wait for a period of time. 139 conn.close() 140 sleep(wait_for_connect) 141 # make the private connection and save for self. 142 self.conn = Client(address=address, authkey=authkey) 143 # start eventloop. 144 self.eventloop()
145
146 - def run(self, address, authkey, *args, **kw):
147 """ 148 Listen to given address and run event loop. In this case, the worker 149 has the listener and acts as a server. 150 151 @param address: address to listen. 152 @type address: str or tuple 153 @param authkey: authentication key for connection. 154 @type authkey: str 155 """ 156 from .conf import env 157 from .connection import Listener 158 # listen on the given address and accept connection. 159 self.lsnr = Listener(address=address, authkey=authkey) 160 if address[1] == 0 and env.mpi: 161 env.mpi.send(self.lsnr.address[1], 0, 1) 162 self.conn = self.lsnr.accept() 163 # start eventloop. 164 self.eventloop()
165
166 - def chdir(self, dirname):
167 import os 168 os.chdir(dirname)
169
170 - def remote_setattr(self, name, var):
171 """ 172 Remotely set attribute of worker. 173 """ 174 return setattr(self, name, var)
175
176 - def remote_loadobj(self, name, objfn):
177 """ 178 Remotely unpickle a file and set it to self with the specified name. 179 """ 180 import cPickle as pickle 181 setattr(self, name, pickle.load(open(objfn)))
182
183 - def barrier(self):
184 """ 185 Send barrier signal for synchronization. 186 """ 187 self.conn.send(Barrier)
188
189 - def accept_peer(self, peern, family, authkey):
190 """ 191 Accept connection from specified peer. 192 193 @param peern: index of the peer who wants to connect. 194 @type peern: int 195 @param family: the family of address needed to be guessed. 196 @type family: str 197 @param authkey: authentication key for connection. 198 @type authkey: str 199 """ 200 lsnr = self.lsnr 201 self.conn.send(lsnr.address) 202 # bind the address to set up a connection. 203 conn = lsnr.accept() 204 # after get connected, save the listener and connection. 205 self.plsnrs[peern] = lsnr 206 self.pconns[peern] = conn
207
208 - def connect_peer(self, peern, address, authkey):
209 """ 210 Make a connection to specified peer (it has to be accepting 211 connection). 212 213 @param peern: index of the peer who wants to connect. 214 @type peern: int 215 @param address: the address to connect to. 216 @type address: str or tuple 217 @param authkey: authentication key for connection. 218 @type authkey: str 219 """ 220 from .connection import Client 221 conn = Client(address=address, authkey=authkey) 222 self.pconns[peern] = conn
223
224 - def set_peer(self, src, dst):
225 """ 226 Create MPI proxy for a pair of p2p connection. 227 228 @param src: source worker ID. 229 @type src: int 230 @param dst: destination worker ID. 231 @type dst: int 232 """ 233 from .connection import MPIConnection 234 self.pconns[dst] = MPIConnection(src+1, dst+1)
235
236 - def get_port_by_mpi(self, dst, tag):
237 port = self.mpi.recv(dst, tag) 238 self.conn.send(port)
239
240 - def terminate(self):
241 raise Terminate
242
243 - def create_solver(self, bcmap, dirname, iblk, nblk, solvertype, svrkw):
244 """ 245 Load a block and create a solver object with the given information, and 246 set it to muscle. 247 248 @param bcmap: BC mapper. 249 @type bcmap: dict 250 @param dirname: the directory of saved domain object. 251 @type dirname: str 252 @param iblk: index of the block to be loaded. 253 @type iblk: int 254 @param nblk: number of total blocks (sub-domains). 255 @type nblk: int 256 @param solvertype: the type of solver to be created. 257 @type solvertype: type 258 @param svrkw: keywords passed to the constructor of solver. 259 @type svrkw: dict 260 @return: nothing 261 """ 262 from .io.domain import DomainIO 263 dio = DomainIO() 264 blk = dio.load_block(dirname=dirname, blkid=iblk, bcmapper=bcmap) 265 svr = solvertype(blk, **svrkw) 266 svr.svrn = iblk 267 svr.nsvr = nblk 268 svr.unbind() 269 self.muscle = svr
270
271 - def drop_anchor(self, ankcls, ankkw):
272 """ 273 Create an anchor object and append it to the solver muscle. 274 275 @param ankcls: anchor type. 276 @type ankcls: type 277 @param ankkw: keywords to the constructor of the anchor. 278 @type ankkw: dict 279 @return: nothing 280 """ 281 self.muscle.runanchors.append(ankcls, **ankkw)
282
283 -class Agent(object):
284 """ 285 Remote agent to worker. 286 287 @ivar conn: connection to the worker. 288 @itype conn: solvcon.connection.Client 289 @ivar noticetype: type of notice object to send. 290 @itype noticetype: Notice 291 """
292 - def __init__(self, conn=None, noticetype=Command):
293 self.conn = conn 294 self.noticetype = noticetype
295
296 - def __getattr__(self, name):
297 conn = self.conn 298 ntype = self.noticetype 299 def func(*arg, **kw): 300 conn.send(ntype(name, *arg, **kw))
301 return func
302
303 -class Shadow(object):
304 """ 305 Convenient wrapper for two agents that send commands to remote worker and 306 muscle. The default agent is to the worker (ctl). 307 308 @ivar lsnr: listener to worker. 309 @itype lsnr: solvcon.connection.Listener 310 @ivar conn: connection to the worker. 311 @itype conn: solvcon.connection.Client 312 @ivar address: remote address. 313 @itype address: tuple or str 314 @ivar cmd: agent to muscle. 315 @itype cmd: Agent 316 @ivar ctl: agent to worker. 317 @itype ctl: Agent 318 """
319 - def __init__(self, lsnr=None, conn=None, address=None):
320 self.lsnr = lsnr 321 self.conn = conn 322 self.address = address 323 self.cmd = Agent(conn=conn, noticetype=Command) 324 self.ctl = Agent(conn=conn, noticetype=Control)
325
326 - def __getattr__(self, name):
327 """ 328 Default to worker. 329 """ 330 return getattr(self.ctl, name)
331
332 - def recv(self, *args, **kw):
333 """ 334 Receive data from worker/muscle. 335 """ 336 return self.conn.recv(*args, **kw)
337
338 -class Dealer(list):
339 """ 340 Contains shadows to workers. Workers can be hired or recruited. A hired 341 worker is local to the dealer so that the dealer can directly start it. A 342 recruited worker is remote, and the a dealer can only wait for it to 343 register. A recruited worker is instantiated by itself, in a standalone 344 process, and usually remotely. 345 346 @ivar publicaddress: the public address for worker to gain connection 347 information. It is used for recruitment. 348 @itype publicaddress: tuple or str 349 @ivar authkey: authentication key for worker connection. 350 @itype authkey: str 351 @ivar family: connection family for automatically connection generation. 352 Can be 'AF_PIPE', 'AF_UNIX', or 'AF_INET'. 353 @itype family: str 354 """ 355 WAIT_FOR_ACCEPT = 0.1 356
357 - def __init__(self, *args, **kw):
358 import sys 359 self.publicaddress = kw.pop('publicaddress', None) 360 self.authkey = kw.pop('authkey', DEFAULT_AUTHKEY) 361 self.family = kw.pop('family', None) 362 if self.family == None: 363 if sys.platform.startswith('win'): 364 self.family = 'AF_PIPE' 365 elif sys.platform.startswith('linux'): 366 self.family = 'AF_UNIX' 367 else: 368 self.family = 'AF_INET' 369 super(Dealer, self).__init__(*args, **kw) 370 self.spanhead = None
371
372 - def hire(self, worker, inetaddr=None, wait_for_accept=None):
373 """ 374 Create a process for a worker object. The worker will be sent to 375 the process. 376 377 @param worker: worker object. 378 @type worker: Worker 379 @keyword wait_for_accept: seconds to wait after accepting. If None use 380 DEFAULT. 381 @type wait_for_accept: float 382 """ 383 from time import sleep 384 from multiprocessing import Process 385 from .connection import guess_address, Client 386 # create and start the process. 387 address = guess_address(self.family) 388 proc = Process( 389 target=worker.run, 390 args=(address, self.authkey), 391 ) 392 proc.start() 393 sleep(wait_for_accept if wait_for_accept!=None else self.WAIT_FOR_ACCEPT) 394 # connect to the created process and make its shadow. 395 conn = Client(address=address, authkey=self.authkey) 396 shadow = Shadow(conn=conn, address=address) 397 shadow.remote_setattr('serial', len(self)) 398 self.append(shadow)
399
400 - def appoint(self, inetaddr, port, authkey):
401 """ 402 @param inetaddr: the IP/DN of the machine to build the worker. 403 @type inetaddr: str 404 @param port: the port that the remote worker listen on. 405 @type: int 406 @param authkey: the authkey for the worker. 407 @type authkey: str 408 @return: nothing 409 """ 410 from .connection import Client 411 conn = Client(address=(inetaddr, port), authkey=authkey) 412 shadow = Shadow(conn=conn, address=(inetaddr, port)) 413 shadow.remote_setattr('serial', len(self)) 414 self.append(shadow)
415
416 - def bridge(self, peers, wait_for_accept=None):
417 """ 418 Tell two peering worker to establish a connection. 419 """ 420 from time import sleep 421 from .conf import env 422 plow, phigh = peers 423 assert plow != phigh # makes no sense. 424 if plow > phigh: 425 tmp = plow 426 plow = phigh 427 phigh = tmp 428 if env.mpi: 429 self[phigh].set_peer(phigh, plow) 430 self[plow].set_peer(plow, phigh) 431 else: 432 # ask higher to accept connection. 433 self[phigh].accept_peer(plow, self.family, self.authkey) 434 address = self[phigh].recv() 435 if address != self[phigh].address: 436 raise ValueError('%s != %s' % ( 437 str(address), str(self[phigh].address))) 438 # ask lower to make connection. 439 sleep(wait_for_accept 440 if wait_for_accept!=None else self.WAIT_FOR_ACCEPT) 441 self[plow].connect_peer(phigh, address, self.authkey)
442
443 - def span(self, graph):
444 from .connection import SpanningTreeNode 445 self.spanhead = SpanningTreeNode(val=0, level=0) 446 visited = dict() 447 self.spanhead.traverse(graph, visited) 448 assert len(graph) == len(visited)
449
450 - def terminate(self, idx=slice(None,None,None), msg=None):
451 """ 452 Termiinate workers. 453 454 @param idx: what to terminate 455 @type idx: slice or list 456 @param msg: message to output after temination. 457 @type msg: str 458 """ 459 import sys 460 for sdw in self[idx]: 461 sdw.terminate() 462 if msg: 463 sys.stdout.write(msg)
464
465 - def barrier(self, idx=slice(None,None,None), msg=None):
466 """ 467 Check for barrier signals sent from workers. Used for synchronization. 468 469 @param idx: what to synchronize. 470 @type idx: slice or list 471 @param msg: message to output after synchronization. 472 @type msg: str 473 """ 474 import sys 475 for sdw in self[idx]: 476 sdw.barrier() 477 for sdw in self[idx]: 478 assert issubclass(sdw.recv(), Barrier) 479 if msg: 480 sys.stdout.write(msg)
481
482 ############################################################################### 483 # Remote invocation. 484 ############################################################################### 485 486 -class SecureShell(object):
487 """ 488 Remote execution through ssh. 489 490 @cvar DEFAULT_SSH_CONFIG: default ssh configuration options. 491 @ctype DEFAULT_SSH_CONFIG: dict 492 @ivar address: inet address. 493 @itype address: str 494 @ivar username: username for the connecting machine. 495 @itype username: str 496 @ivar prescript: list of the Python statements to prepend before the main 497 body of execution. 498 @itype prescript: list 499 @ivar paths: dict of lists for various environmental variables for paths. 500 @itype paths: dict 501 @ivar ssh_config: ssh configuration options. 502 @itype ssh_config: dict 503 """ 504 505 DEFAULT_SSH_CONFIG = { 506 'UserKnownHostsFile': '/dev/null', 507 'StrictHostKeyChecking': 'no', 508 'LogLevel': 'ERROR', 509 } 510
511 - def __init__(self, address, 512 username=None, prescript=None, paths=None, ssh_config=None):
513 from .helper import get_username 514 from .conf import env 515 self.address = address 516 self.username = username if username else get_username() 517 self.prescript = prescript if prescript != None else list() 518 # customiza paths. 519 paths = paths if paths != None else dict() 520 paths.setdefault('PYTHONPATH', list()) 521 if env.pkgdir not in paths['PYTHONPATH']: 522 paths['PYTHONPATH'].append(env.pkgdir) 523 self.paths = paths 524 # ssh options. 525 self.ssh_config = self.DEFAULT_SSH_CONFIG.copy() 526 if ssh_config != None: 527 self.ssh_config.update(ssh_config)
528 529 @staticmethod
530 - def _pathmunge(key, pathlist):
531 from .helper import iswin 532 pathlist = pathlist[:] 533 # scan for duplication. 534 ip = 0 535 while ip < len(pathlist): 536 jp = ip + 1 537 while jp < len(pathlist): 538 if pathlist[ip] == pathlist[jp]: 539 del pathlist[jp] 540 else: 541 jp += 1 542 ip += 1 543 # join the path. 544 sep = ';' if iswin() else ':' 545 pathstr = sep.join([path for path in pathlist if path]) 546 return 'export %s=%s:$%s' % (key, pathstr, key)
547 548 @property
549 - def ssh_cmds(self):
550 ssh_cmds = ['ssh', '-n'] 551 for key in self.ssh_config: 552 ssh_cmds.append('-o') 553 ssh_cmds.append('%s=%s' % (key, str(self.ssh_config[key]))) 554 ssh_cmds.append('%s@%s' % (self.username, self.address)) 555 return ssh_cmds
556
557 - def shell(self, script):
558 from subprocess import Popen, PIPE, STDOUT 559 cmds = self.ssh_cmds 560 cmds.append('; '.join(script)) 561 subp = Popen(cmds, stdout=PIPE, stderr=STDOUT) 562 return subp.stdout.read()
563
564 - def __call__(self, script, envar=None, stdout=None):
565 """ 566 @param script: the script to be send to remote machine to execute. 567 @type script: list 568 @keywork envar: additional environment variables to remote. 569 @type envar: dict 570 """ 571 import sys 572 from subprocess import Popen 573 script = self.prescript + script 574 # build the commands to be run remotely. 575 remote_cmds = [self._pathmunge(k, self.paths[k]) for k in self.paths] 576 if envar: 577 remote_cmds.extend([ 578 'export %s=%s' % (key, envar[key]) for key in envar 579 ]) 580 remote_cmds.append('%s -c \'%s\''%(sys.executable, '; '.join(script))) 581 # build the commands for ssh. 582 ssh_cmds = self.ssh_cmds 583 # join ssh commands and remote commands and fire. 584 subp = Popen(ssh_cmds + ['; '.join(remote_cmds)], stdout=stdout) 585 # get the return from ssh. 586 if subp.stdout != None: 587 return subp.stdout.read() 588 else: 589 return None
590