Package solvcon :: Module connection
[hide private]
[frames] | no frames]

Source Code for Module solvcon.connection

  1  # -*- coding: UTF-8 -*- 
  2  # 
  3  # Copyright (C) 2008-2010 Yung-Yu Chen <yyc@solvcon.net>. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License along 
 16  # with this program; if not, write to the Free Software Foundation, Inc., 
 17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 18   
 19  """ 
 20  Remote connection and communication. 
 21  """ 
 22   
23 -def pick_unused_port():
24 """ 25 Use socket to find out a unused (inet) port. 26 """ 27 import socket 28 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 29 s.bind(('localhost', 0)) 30 addr, port = s.getsockname() 31 s.close() 32 return port
33
34 -def guess_address(family, localhost=True):
35 """ 36 Guess a unused address according to given family. 37 38 @param family: AF_INET, AF_UNIX, AF_PIPE. 39 @type family: str 40 @keyword localhost: use 'localhost' as hostname or not. 41 @type localhost: bool 42 """ 43 from socket import gethostname 44 from random import sample 45 string = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 46 strlen = 8 47 port = pick_unused_port() 48 if family == 'AF_INET': 49 if localhost: 50 hostname = 'localhost' 51 else: 52 hostname = gethostname() 53 address = (hostname, port) 54 elif family == 'AF_UNIX': 55 strpart = ''.join(sample(string, strlen)) 56 address = '/tmp/srpc%s%d' % (strpart, port) 57 elif family == 'AF_PIPE': 58 strpart = ''.join(sample(string, strlen)) 59 address = r'\\.\pipe\srpc' + "%s%d"%(strpart, port) 60 else: 61 raise ValueError, "family can't be %s" % family 62 return address
63
64 -def guess_family(address):
65 """ 66 Determine the family of address. 67 """ 68 if type(address) == tuple: 69 return 'AF_INET' 70 elif type(address) is str and address.startswith('\\\\'): 71 raise ValueError('Windows pipe is not supported') 72 #return 'AF_PIPE' 73 elif type(address) is str: 74 return 'AF_UNIX' 75 else: 76 raise ValueError('address type of %r unrecognized' % address)
77
78 -class Credential(object):
79 """ 80 Authenticating information to be exchanged between two ends of a 81 connection. 82 """ 83 MSGCAP = 256 84 MSGLEN = 20 85 CHALLENGE = b'#CHALLENGE#' 86 SUCCESS = b'#SUCCESS#' 87 FAILURE = b'#FAILURE#'
88 - def __init__(self, conn, authkey):
89 """ 90 @param conn: the communicating connection. 91 @type conn: solvcon.connection.Connection 92 @param authkey: authenticating key. 93 @type authkey: str 94 """ 95 assert isinstance(authkey, bytes) 96 self.conn = conn 97 self.authkey = authkey
98 - def question(self):
99 import os, hmac 100 conn = self.conn 101 msg = os.urandom(self.MSGLEN) 102 conn.send_bytes(self.CHALLENGE+msg) 103 digest = hmac.new(self.authkey, msg).digest() 104 res = conn.recv_bytes(self.MSGCAP) 105 if res == digest: 106 conn.send_bytes(self.SUCCESS) 107 else: 108 conn.send_bytes(self.FAILURE) 109 raise IOError('digest received was wrong')
110 - def answer(self):
111 import hmac 112 conn = self.conn 113 msg = conn.recv_bytes(self.MSGCAP) 114 assert msg[:len(self.CHALLENGE)] == self.CHALLENGE, 'msg = %r'%msg 115 msg = msg[len(self.CHALLENGE):] 116 digest = hmac.new(self.authkey, msg).digest() 117 conn.send_bytes(digest) 118 res = conn.recv_bytes(self.MSGCAP) 119 if res != self.SUCCESS: 120 raise IOError('digest sent was rejected')
121
122 -class SocketConnection(object):
123 - def __init__(self, *args, **kw):
124 from _multiprocessing import Connection 125 self.conn = Connection(*args, **kw)
126 - def send_bytes(self, *args, **kw):
127 return self.conn.send_bytes(*args, **kw)
128 - def recv_bytes(self, *args, **kw):
129 return self.conn.recv_bytes(*args, **kw)
130 - def send(self, *args, **kw):
131 return self.conn.send(*args, **kw)
132 - def recv(self, *args, **kw):
133 return self.conn.recv(*args, **kw)
134 - def close(self, *args, **kw):
135 return self.conn.close(*args, **kw)
136 - def sendarr(self, arr):
137 self.send(arr)
138 - def recvarr(self, arr):
139 arr[:] = self.recv()[:]
140
141 -class MPIConnection(object):
142 TAG = 1
143 - def __init__(self, src, dst):
144 self.src = src 145 self.dst = dst
146 - def send(self, dat):
147 from .conf import env 148 env.mpi.send(dat, self.dst, self.TAG)
149 - def recv(self):
150 from .conf import env 151 return env.mpi.recv(self.dst, self.TAG)
152 - def sendarr(self, arr):
153 from .conf import env 154 env.mpi.sendarr(arr, self.dst, self.TAG)
155 - def recvarr(self, arr):
156 from .conf import env 157 env.mpi.recvarr(arr, self.dst, self.TAG)
158 159 CLIENT_TIMEOUT = 20.
160 -def Client(address, family=None, authkey=None):
161 """ 162 Establish a connection to a Listener. 163 164 @param address: The address of a Unix or TCP/IP socket. 165 @type address: str or tuple 166 @keyword family: The family of address. 167 @type family: str 168 @keyword authkey: Authenticating key. 169 @type authkey: str 170 """ 171 import os, time, errno, socket 172 timeout = time.time() + CLIENT_TIMEOUT 173 family = family or guess_family(address) 174 # create socket. 175 skt = socket.socket(getattr(socket, family)) 176 while True: 177 try: 178 skt.connect(address) 179 except socket.error, e: 180 if e.args[0] != errno.ECONNREFUSED or time.time() > timeout: 181 raise 182 time.sleep(0.01) 183 else: 184 break 185 # create connection. 186 conn = SocketConnection(os.dup(skt.fileno())) 187 skt.close() 188 # authenticate. 189 if authkey is not None: 190 if not isinstance(authkey, bytes): 191 raise TypeError('authkey must be a byte string') 192 credit = Credential(conn, authkey) 193 credit.answer() 194 credit.question() 195 return conn
196
197 -class Listener(object):
198 """ 199 Socket listener for connection. 200 """
201 - def __init__(self, address, family=None, authkey=None):
202 """ 203 @param address: The address of a Unix or TCP/IP socket. 204 @type address: str or tuple 205 @keyword family: The family of address. 206 @type family: str 207 @keyword authkey: Authenticating key. 208 @type authkey: str 209 """ 210 import socket 211 family = family or (address and guess_family(address)) 212 # create socket. 213 self._socket = socket.socket(getattr(socket, family)) 214 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 215 self._socket.bind(address) 216 self._socket.listen(1) 217 # store extra information. 218 self.address = self._socket.getsockname() 219 self._last_accepted = None 220 self._authkey = authkey
221 - def accept(self):
222 import os 223 # accepting connection. 224 skt, self._last_accepted = self._socket.accept() 225 # establish connection. 226 conn = SocketConnection(os.dup(skt.fileno())) 227 skt.close() 228 # authenticate. 229 if self._authkey: 230 credit = Credential(conn, self._authkey) 231 credit.question() 232 credit.answer() 233 return conn
234 - def close(self):
235 self._socket.close()
236
237 -class SpanningTreeNode(dict):
238 - def __init__(self, *args, **kw):
239 self.val = kw.pop('val') 240 self.level = kw.pop('level') 241 super(SpanningTreeNode, self).__init__(*args, **kw)
242 - def traverse(self, graph, visited):
243 if self.val not in visited: 244 visited[self.val] = True 245 for it in graph[self.val]: 246 nd = SpanningTreeNode(val=it, level=self.level+1) 247 if nd.traverse(graph, visited) == True: 248 self[it] = nd 249 return True 250 else: 251 return False
252