Package qb :: Module remoteMessaging
[hide private]
[frames] | no frames]

Source Code for Module qb.remoteMessaging

  1  ''' 
  2  A framework for socket-based client/server persistent remote messaging sessions. 
  3   
  4  Neither the server nor the client are bi-directional.  Each is intended to either receive messages 
  5  (the server) or send messages (the client). 
  6   
  7  The server functions as the receiver; it is multi-threaded and can receive from any number of 
  8  clients simultaneously.  The message content is not limited to strings.  It is possible to send 
  9  anything that can be serialzed, including traceback objects, classes, class instances, or code 
 10  objects.  Child threads of the main thread are created as needed to handle inbound messages, and are 
 11  shut down as soon as the message is deemed to have been received in full.  The message length is 
 12  sent in the first 4 bytes of the message; in this way, the message is not deemed to be completely 
 13  received until the number of bytes received matches the payload size.  This provides robustness on 
 14  congested or noisy networks that do not provide reliable transport. 
 15   
 16  The client functions as the sender.  It is single threaded and can communicate with only 1 server. 
 17  The server's INET socket address (ip address & port number) is an argument to the client's constructor 
 18  method. 
 19   
 20  Main concepts: 
 21       
 22      * The central host in a messaging session communicates with multiple remote hosts. 
 23      * Each remote host in a messaging session communicates with only 1 central host. 
 24      * The central host runs 1 instance of a server. 
 25      * Each remote host runs 1 instance of a client as well as 1 instance of a server. 
 26      * The central host runs a client instance for each remote host. 
 27   
 28  Port assignment for the server in python versions 2.5 and above is dynamically assigned by the 
 29  kernel, and will be different for each instance.  In python versions 2.4 and below, the server port 
 30  is randomly assigned a port between 45000 and 60000. It is the responsibility of the developer to 
 31  inform the app instantiating the client of the server's host/port socket address. 
 32       
 33  The creation of a persistent messaging session between a local and remote host is a 4-step process: 
 34   
 35      1.) create a server on a local host.  The INET socket address (ip & port) of the server is used when 
 36      the clients are started up on remote machines.  This server instance running on the local host 
 37      is the 'central' server and is expected to have sessions with clients on multiple remote hosts. 
 38   
 39      2.) create a server on the remote host. 
 40       
 41      3.) create a client on the remote host, passing the socket address of the 'central' server to 
 42      the constructor. Send a 'hello' message back to the 'central' server informing it of the socket 
 43      address on the server running on the same machine as the client (the "remote" host from the POV 
 44      of the "central server). 
 45   
 46      4.) when the 'central' server running on the local host receives the 'hello' message from the 
 47      remote host, it creates a corresponding client for the server running on the remote host, using 
 48      the socket address of the server running on the remote machine.  Since the local host is 
 49      expected to have messaging sessions with multiple remote hosts, the client which is associated 
 50      with the server on the remote host is stored in a dictionary so that it may be easily accessed 
 51      at a later time. 
 52   
 53  Communication from a remote host back to the central host is accomplished by using the remote client 
 54  to send a message back to the central server. 
 55   
 56  Communcation from the central server to a remote host is accomplished by looking up the client which 
 57  corresponds to the remote host and using that client to send messages. 
 58   
 59  The app which utilizes the server and clients are meant to implement a heartbeat scheme such that the 
 60  server periodically sends heartbeats to the clients; if the clients do not receive heartbeats within 
 61  some interval past the timeout, they shut themselves down. 
 62   
 63  The app on the remote machine should also be written so that it inspects each message sent to the 
 64  server instance running on that machine, and if it receives a 'shutdown' message, will shutdown the 
 65  client and server and exit. 
 66  ''' 
 67  #====================================== 
 68  #  $Revision: #4 $ 
 69  #  $Change: 9080 $ 
 70  #====================================== 
 71   
 72  import sys 
 73  import os 
 74  import time 
 75  import socket 
 76  import logging 
 77  import logging.handlers 
 78  import cPickle 
 79  import SocketServer 
 80  import struct 
 81  import threading 
 82   
 83  # Provide a method of finding the IP address bound to a NIC, even when the hostname resolves to the 
 84  # loopback addr when the host looks up itself.  This only works on linux, but linux is the only OS 
 85  # that typically exhibits this loopback behavior. 
 86  if os.name != 'nt': 
 87      import fcntl 
 88      import struct 
89 - def getInterfaceIP(ifName):
90 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 91 92 return socket.inet_ntoa(fcntl.ioctl( 93 s.fileno(), 94 0x8915, # SIOCGIFADDR 95 struct.pack('256s', ifName[:15]) 96 )[20:24])
97 98
99 -def getLanIP():
100 # handle cases on some linux where gethostbyname returns loopback addr 101 ip = socket.gethostbyname(socket.gethostname()) 102 103 if ip.startswith("127.") and os.name != "nt": 104 for ifName in ["eth0","eth1","eth2"]: 105 try: 106 ip = getInterfaceIP(ifName) 107 break 108 except IOError: 109 pass 110 111 return ip
112
113 -def pyVer():
114 return float('%s.%s' % (sys.version_info[0], sys.version_info[1]))
115 116
117 -class RemoteMessagingError(Exception):
118 - def __init__(self, value=''):
119 self.value = value
120
121 - def __str__(self):
122 return repr(self.value)
123 124
125 -class RemoteMessagingStreamRequestHandler(SocketServer.StreamRequestHandler):
126 """ 127 Handler for a streaming logging request. 128 129 This basically logs the record using whatever logging policy is 130 configured locally. 131 132 G{classtree RemoteMessagingStreamRequestHandler} 133 """
134 - def handle(self):
135 """ 136 Handle multiple requests - each expected to be encode the payload size in the first 4-bytes, 137 followed by the LogRecord in pickle format. Logs the record according to whatever policy is 138 configured locally. 139 """ 140 while True: 141 payloadSize = self.connection.recv(4) 142 if len(payloadSize) < 4: 143 # TODO: handle the case when the client can't even receive 4 bytes 144 break 145 146 msgLen = struct.unpack(">L", payloadSize)[0] 147 msg = self.connection.recv(msgLen) 148 while len(msg) < msgLen: 149 msg = msg + self.connection.recv(msgLen - len(msg)) 150 151 obj = cPickle.loads(msg) 152 record = logging.makeLogRecord(obj) 153 self.handleLogRecord(record)
154
155 - def handleLogRecord(self, record):
156 ''' 157 Pass a LogRecord to the appropriate logging.Handler. 158 159 @param record: The logRecord received over the socket. 160 @type record: logging.LogRecord 161 ''' 162 # if a name is specified, we use the named logger rather than the one 163 # implied by the record. 164 if self.server.logname is not None: 165 logger = logging.getLogger(self.server.logname) 166 else: 167 logger = logging.getLogger(record.name) 168 if not logger.handlers: 169 logger.setLevel(logging.INFO) 170 logger.addHandler(logging.StreamHandler(sys.stdout)) 171 172 logger.handle(record)
173 174
175 -class RemoteMessagingReceiver(SocketServer.ThreadingTCPServer):
176 """ 177 A simple TCP socket-based logging receiver. 178 179 G{classtree RemoteMessagingReceiver} 180 """ 181 allow_reuse_address = 1
182 - def __init__(self, 183 host='localhost', 184 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, 185 loggerName=None, 186 handler=RemoteMessagingStreamRequestHandler):
187 ''' 188 @param host: The name of the host running the socket server. 189 @type host: C{str} 190 191 @param port: The port number for the INET socket on the listening host. 192 @type port: C{int} 193 194 @param loggerName: The name of the logging.Logger object which is to 195 receive the logRecords sent over the socket. 196 @type loggerName: C{str} 197 198 @param handler: The request handler for the SocketServer 199 @type handler: L{RemoteMessagingStreamRequestHandler} 200 ''' 201 SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler) 202 self.abort = False 203 self.timeout = 3 204 self.logname = loggerName
205
206 - def serveUntilStopped(self):
207 ''' 208 Handle incoming requests until told to shut down. 209 ''' 210 import select 211 while not self.abort: 212 rd = select.select([self.socket.fileno()], [], [], self.timeout)[0] 213 if rd: 214 self.handle_request()
215 216
217 -class RemoteMessagingServer(object):
218 ''' 219 Create and manage a logger running in a thread. 220 221 G{classtree RemoteMessagingServer} 222 ''' 223 if pyVer() > 2.4: 224 DEFAULT_LOGGER_PORT = 0 # let the kernel assign the port dynamically, doesn't work in 2.4 or below 225 else: 226 import random 227 DEFAULT_LOGGER_PORT = random.randint(45000,60000) 228 229 loggerIP = getLanIP() 230 231 THREADNAME = 'remoteMessagingServerMainThead' 232
233 - def __init__(self, verbose=False, port=None):
234 ''' 235 Create a logging object and L{RemoteMessagingReceiver} which will send logRecords to it. 236 237 @param verbose: Print out open/close messages or not 238 @type verbose: C{bool} 239 ''' 240 self.serverThread = None 241 self.receiver = None 242 243 if port: 244 self.loggerPort = port 245 else: 246 self.loggerPort = self.DEFAULT_LOGGER_PORT 247 248 self.verbose = verbose
249
250 - def getServerAddr(self):
251 ''' 252 @return: Return the IP address and port number of the host running the socket server. 253 @rtype: C{tuple} (ip, port) 254 ''' 255 return self.receiver.server_address
256 serverAddress = property(getServerAddr) 257
258 - def getServerIP(self):
259 ''' 260 @return: Return the IP address of the host running the socket server. 261 @rtype: C{str} An IP address in dotted notation: n.n.n.n 262 ''' 263 return self.receiver.server_address[0]
264 serverIP = property(getServerIP) 265
266 - def getServerPort(self):
267 ''' 268 @return: Return the port number of the host running the socket server. 269 @rtype: C{int} A port number for an INET socket. 270 ''' 271 return self.receiver.server_address[1]
272 serverPort = property(getServerPort) 273
274 - def start(self):
275 ''' 276 Start the socket server in a thread. 277 ''' 278 self.receiver = RemoteMessagingReceiver(host=self.loggerIP, port=self.loggerPort) 279 if self.verbose: 280 print '\n%s: started on %s:%s\n' % (self.__class__.__name__, self.serverIP, self.serverPort) 281 282 self.serverThread = threading.Thread(target=self.receiver.serveUntilStopped, name=self.THREADNAME) 283 self.serverThread.setDaemon(True) 284 self.serverThread.start()
285
286 - def stop(self):
287 ''' 288 Shut down the socket server 289 ''' 290 self.receiver.abort = True 291 # give the server time to loop once more through the select() loop in serveUntilStopped() 292 time.sleep(self.receiver.timeout + 0.1) 293 294 self.receiver.server_close() 295 # wait for all the threads from the server to finish up 296 self.serverThread.join() 297 298 if self.verbose: 299 print '%s: server shut down at %s:%s' % (self.__class__.__name__, self.serverIP, self.serverPort)
300 301
302 -class RemoteMessagingSocketHandler(logging.handlers.SocketHandler):
303 - def send(self, s):
304 """ 305 Send a pickled string to the socket. 306 307 This function allows for partial sends which can happen when the 308 network is busy. 309 """ 310 if self.sock is None: 311 self.createSocket() 312 #self.sock can be None either because we haven't reached the retry 313 #time yet, or because we have reached the retry time and retried, 314 #but are still unable to connect. 315 if self.sock: 316 try: 317 if hasattr(self.sock, "sendall"): 318 self.sock.sendall(s) 319 else: 320 sentsofar = 0 321 left = len(s) 322 while left > 0: 323 sent = self.sock.send(s[sentsofar:]) 324 sentsofar = sentsofar + sent 325 left = left - sent 326 except socket.error: 327 self.sock.close() 328 self.sock = None # so we can call createSocket next time 329 else: 330 raise RemoteMessagingError
331
332 - def handleError(self, record):
334 335
336 -class RemoteMessagingSender(logging.Logger):
337 ''' 338 Create a logging object which will send messages over a socket to a logger running on a remote host. 339 340 G{classtree RemoteMessagingSender} 341 '''
342 - def __init__(self, name, host, port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, logLevel=logging.DEBUG):
343 ''' 344 @param name: The logger name 345 @type name: C{str} 346 347 @param host: The host which is listening to this sender. 348 @type host: C{str} 349 350 @param port: The INET socket port number on the listening host. 351 @type port: C{int} 352 353 @param logLevel: The level for the logging object. 354 @type logLevel: C{int} 355 ''' 356 logging.Logger.__init__(self, name) 357 358 self.setLevel(logLevel) 359 360 self.socketHandler = RemoteMessagingSocketHandler(host, int(port)) 361 self.addHandler(self.socketHandler)
362
363 - def close(self):
364 ''' 365 Close the socket in use by the SocketHandler 366 ''' 367 self.socketHandler.close()
368 369 370 if __name__ == '__main__': 371 ''' A hello world example ''' 372 LOGNAME = 'helloWorld' 373 374 from optparse import OptionParser 375 parser = OptionParser() 376 377 parser.set_defaults( 378 server=False, 379 verbose=False, 380 runtime=0, 381 ip=socket.gethostbyname(socket.gethostname()), 382 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT) 383 384 parser.add_option('-s', '--server', action='store_true', dest='server') 385 parser.add_option('-c', '--client', action='store_false', dest='server') 386 parser.add_option('-i', '--ip') 387 parser.add_option('-p', '--port', type='int', help='try a value of 0 to allow the kernel to dynamically allocate a port') 388 parser.add_option('-r', '--runtime', type='int') 389 parser.add_option('-v', '--verbose', action='store_true') 390 391 if len(sys.argv) == 0: 392 parser.print_help() 393 sys.exit(2) 394 395 (opts, args) = parser.parse_args() 396 397 if opts.server: # configure the server (receiving side) 398 logger = logging.getLogger(LOGNAME) 399 logger.setLevel(logging.DEBUG) 400 401 logFormatter = logging.Formatter('%(asctime)s - %(levelname)8s : loop %(message)s', '[%x %X]') 402 403 logHandler = logging.StreamHandler() 404 logHandler.setFormatter(logFormatter) 405 406 logger.addHandler(logHandler) 407 408 # now start up the receiver, which will use the locally configured logger by default 409 server = RemoteMessagingServer(port=opts.port, verbose=opts.verbose) 410 server.start() 411 412 # normally at this point the app using the server would start doing something, and shut down 413 # the server once it was finished. We'll just go to sleep instead and let the server thread 414 # run. 415 if opts.runtime > 0: 416 print 'server started - will run for %ss' % opts.runtime 417 time.sleep(opts.runtime) 418 server.stop() 419 else: 420 # just loop through once a minute, we'll exit when the user hits <CTRL>-c to kill the 421 # running script 422 while True: 423 time.sleep(60) 424 425 else: # configure the client (sending side) 426 sndr = RemoteMessagingSender(host=opts.ip, port=opts.port, name=LOGNAME) 427 428 for i in range(5): 429 print 'loop: %s' % i 430 extra = (i, socket.gethostname().split('.')[0], getLanIP()) 431 432 sndr.info('%s : %s %s some sort of informational message', *extra) 433 #sndr.warning('a warning message', extra) 434 #sndr.debug('a debug message', extra) 435 436 time.sleep(1) 437 438 sndr.info('Done\n') 439 sndr.close() 440