|
Package qb ::
Module remoteMessaging
|
|
1 '''
2 A framework for socket-based client/server persistent remote messaging sessions.
3
4 Neither the server nor the client are bi-directional. Each is intended to either receive messages
5 (the server) or send messages (the client).
6
7 The server functions as the receiver; it is multi-threaded and can receive from any number of
8 clients simultaneously. The message content is not limited to strings. It is possible to send
9 anything that can be serialzed, including traceback objects, classes, class instances, or code
10 objects. Child threads of the main thread are created as needed to handle inbound messages, and are
11 shut down as soon as the message is deemed to have been received in full. The message length is
12 sent in the first 4 bytes of the message; in this way, the message is not deemed to be completely
13 received until the number of bytes received matches the payload size. This provides robustness on
14 congested or noisy networks that do not provide reliable transport.
15
16 The client functions as the sender. It is single threaded and can communicate with only 1 server.
17 The server's INET socket address (ip address & port number) is an argument to the client's constructor
18 method.
19
20 Main concepts:
21
22 * The central host in a messaging session communicates with multiple remote hosts.
23 * Each remote host in a messaging session communicates with only 1 central host.
24 * The central host runs 1 instance of a server.
25 * Each remote host runs 1 instance of a client as well as 1 instance of a server.
26 * The central host runs a client instance for each remote host.
27
28 Port assignment for the server in python versions 2.5 and above is dynamically assigned by the
29 kernel, and will be different for each instance. In python versions 2.4 and below, the server port
30 is randomly assigned a port between 45000 and 60000. It is the responsibility of the developer to
31 inform the app instantiating the client of the server's host/port socket address.
32
33 The creation of a persistent messaging session between a local and remote host is a 4-step process:
34
35 1.) create a server on a local host. The INET socket address (ip & port) of the server is used when
36 the clients are started up on remote machines. This server instance running on the local host
37 is the 'central' server and is expected to have sessions with clients on multiple remote hosts.
38
39 2.) create a server on the remote host.
40
41 3.) create a client on the remote host, passing the socket address of the 'central' server to
42 the constructor. Send a 'hello' message back to the 'central' server informing it of the socket
43 address on the server running on the same machine as the client (the "remote" host from the POV
44 of the "central server).
45
46 4.) when the 'central' server running on the local host receives the 'hello' message from the
47 remote host, it creates a corresponding client for the server running on the remote host, using
48 the socket address of the server running on the remote machine. Since the local host is
49 expected to have messaging sessions with multiple remote hosts, the client which is associated
50 with the server on the remote host is stored in a dictionary so that it may be easily accessed
51 at a later time.
52
53 Communication from a remote host back to the central host is accomplished by using the remote client
54 to send a message back to the central server.
55
56 Communcation from the central server to a remote host is accomplished by looking up the client which
57 corresponds to the remote host and using that client to send messages.
58
59 The app which utilizes the server and clients are meant to implement a heartbeat scheme such that the
60 server periodically sends heartbeats to the clients; if the clients do not receive heartbeats within
61 some interval past the timeout, they shut themselves down.
62
63 The app on the remote machine should also be written so that it inspects each message sent to the
64 server instance running on that machine, and if it receives a 'shutdown' message, will shutdown the
65 client and server and exit.
66 '''
67
68
69
70
71
72 import sys
73 import os
74 import time
75 import socket
76 import logging
77 import logging.handlers
78 import cPickle
79 import SocketServer
80 import struct
81 import threading
82
83
84
85
86 if os.name != 'nt':
87 import fcntl
88 import struct
90 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
91
92 return socket.inet_ntoa(fcntl.ioctl(
93 s.fileno(),
94 0x8915,
95 struct.pack('256s', ifName[:15])
96 )[20:24])
97
98
100
101 ip = socket.gethostbyname(socket.gethostname())
102
103 if ip.startswith("127.") and os.name != "nt":
104 for ifName in ["eth0","eth1","eth2"]:
105 try:
106 ip = getInterfaceIP(ifName)
107 break
108 except IOError:
109 pass
110
111 return ip
112
114 return float('%s.%s' % (sys.version_info[0], sys.version_info[1]))
115
116
120
122 return repr(self.value)
123
124
126 """
127 Handler for a streaming logging request.
128
129 This basically logs the record using whatever logging policy is
130 configured locally.
131
132 G{classtree RemoteMessagingStreamRequestHandler}
133 """
135 """
136 Handle multiple requests - each expected to be encode the payload size in the first 4-bytes,
137 followed by the LogRecord in pickle format. Logs the record according to whatever policy is
138 configured locally.
139 """
140 while True:
141 payloadSize = self.connection.recv(4)
142 if len(payloadSize) < 4:
143
144 break
145
146 msgLen = struct.unpack(">L", payloadSize)[0]
147 msg = self.connection.recv(msgLen)
148 while len(msg) < msgLen:
149 msg = msg + self.connection.recv(msgLen - len(msg))
150
151 obj = cPickle.loads(msg)
152 record = logging.makeLogRecord(obj)
153 self.handleLogRecord(record)
154
156 '''
157 Pass a LogRecord to the appropriate logging.Handler.
158
159 @param record: The logRecord received over the socket.
160 @type record: logging.LogRecord
161 '''
162
163
164 if self.server.logname is not None:
165 logger = logging.getLogger(self.server.logname)
166 else:
167 logger = logging.getLogger(record.name)
168 if not logger.handlers:
169 logger.setLevel(logging.INFO)
170 logger.addHandler(logging.StreamHandler(sys.stdout))
171
172 logger.handle(record)
173
174
176 """
177 A simple TCP socket-based logging receiver.
178
179 G{classtree RemoteMessagingReceiver}
180 """
181 allow_reuse_address = 1
187 '''
188 @param host: The name of the host running the socket server.
189 @type host: C{str}
190
191 @param port: The port number for the INET socket on the listening host.
192 @type port: C{int}
193
194 @param loggerName: The name of the logging.Logger object which is to
195 receive the logRecords sent over the socket.
196 @type loggerName: C{str}
197
198 @param handler: The request handler for the SocketServer
199 @type handler: L{RemoteMessagingStreamRequestHandler}
200 '''
201 SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)
202 self.abort = False
203 self.timeout = 3
204 self.logname = loggerName
205
207 '''
208 Handle incoming requests until told to shut down.
209 '''
210 import select
211 while not self.abort:
212 rd = select.select([self.socket.fileno()], [], [], self.timeout)[0]
213 if rd:
214 self.handle_request()
215
216
218 '''
219 Create and manage a logger running in a thread.
220
221 G{classtree RemoteMessagingServer}
222 '''
223 if pyVer() > 2.4:
224 DEFAULT_LOGGER_PORT = 0
225 else:
226 import random
227 DEFAULT_LOGGER_PORT = random.randint(45000,60000)
228
229 loggerIP = getLanIP()
230
231 THREADNAME = 'remoteMessagingServerMainThead'
232
233 - def __init__(self, verbose=False, port=None):
234 '''
235 Create a logging object and L{RemoteMessagingReceiver} which will send logRecords to it.
236
237 @param verbose: Print out open/close messages or not
238 @type verbose: C{bool}
239 '''
240 self.serverThread = None
241 self.receiver = None
242
243 if port:
244 self.loggerPort = port
245 else:
246 self.loggerPort = self.DEFAULT_LOGGER_PORT
247
248 self.verbose = verbose
249
251 '''
252 @return: Return the IP address and port number of the host running the socket server.
253 @rtype: C{tuple} (ip, port)
254 '''
255 return self.receiver.server_address
256 serverAddress = property(getServerAddr)
257
259 '''
260 @return: Return the IP address of the host running the socket server.
261 @rtype: C{str} An IP address in dotted notation: n.n.n.n
262 '''
263 return self.receiver.server_address[0]
264 serverIP = property(getServerIP)
265
267 '''
268 @return: Return the port number of the host running the socket server.
269 @rtype: C{int} A port number for an INET socket.
270 '''
271 return self.receiver.server_address[1]
272 serverPort = property(getServerPort)
273
275 '''
276 Start the socket server in a thread.
277 '''
278 self.receiver = RemoteMessagingReceiver(host=self.loggerIP, port=self.loggerPort)
279 if self.verbose:
280 print '\n%s: started on %s:%s\n' % (self.__class__.__name__, self.serverIP, self.serverPort)
281
282 self.serverThread = threading.Thread(target=self.receiver.serveUntilStopped, name=self.THREADNAME)
283 self.serverThread.setDaemon(True)
284 self.serverThread.start()
285
287 '''
288 Shut down the socket server
289 '''
290 self.receiver.abort = True
291
292 time.sleep(self.receiver.timeout + 0.1)
293
294 self.receiver.server_close()
295
296 self.serverThread.join()
297
298 if self.verbose:
299 print '%s: server shut down at %s:%s' % (self.__class__.__name__, self.serverIP, self.serverPort)
300
301
304 """
305 Send a pickled string to the socket.
306
307 This function allows for partial sends which can happen when the
308 network is busy.
309 """
310 if self.sock is None:
311 self.createSocket()
312
313
314
315 if self.sock:
316 try:
317 if hasattr(self.sock, "sendall"):
318 self.sock.sendall(s)
319 else:
320 sentsofar = 0
321 left = len(s)
322 while left > 0:
323 sent = self.sock.send(s[sentsofar:])
324 sentsofar = sentsofar + sent
325 left = left - sent
326 except socket.error:
327 self.sock.close()
328 self.sock = None
329 else:
330 raise RemoteMessagingError
331
334
335
337 '''
338 Create a logging object which will send messages over a socket to a logger running on a remote host.
339
340 G{classtree RemoteMessagingSender}
341 '''
342 - def __init__(self, name, host, port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, logLevel=logging.DEBUG):
343 '''
344 @param name: The logger name
345 @type name: C{str}
346
347 @param host: The host which is listening to this sender.
348 @type host: C{str}
349
350 @param port: The INET socket port number on the listening host.
351 @type port: C{int}
352
353 @param logLevel: The level for the logging object.
354 @type logLevel: C{int}
355 '''
356 logging.Logger.__init__(self, name)
357
358 self.setLevel(logLevel)
359
360 self.socketHandler = RemoteMessagingSocketHandler(host, int(port))
361 self.addHandler(self.socketHandler)
362
364 '''
365 Close the socket in use by the SocketHandler
366 '''
367 self.socketHandler.close()
368
369
370 if __name__ == '__main__':
371 ''' A hello world example '''
372 LOGNAME = 'helloWorld'
373
374 from optparse import OptionParser
375 parser = OptionParser()
376
377 parser.set_defaults(
378 server=False,
379 verbose=False,
380 runtime=0,
381 ip=socket.gethostbyname(socket.gethostname()),
382 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT)
383
384 parser.add_option('-s', '--server', action='store_true', dest='server')
385 parser.add_option('-c', '--client', action='store_false', dest='server')
386 parser.add_option('-i', '--ip')
387 parser.add_option('-p', '--port', type='int', help='try a value of 0 to allow the kernel to dynamically allocate a port')
388 parser.add_option('-r', '--runtime', type='int')
389 parser.add_option('-v', '--verbose', action='store_true')
390
391 if len(sys.argv) == 0:
392 parser.print_help()
393 sys.exit(2)
394
395 (opts, args) = parser.parse_args()
396
397 if opts.server:
398 logger = logging.getLogger(LOGNAME)
399 logger.setLevel(logging.DEBUG)
400
401 logFormatter = logging.Formatter('%(asctime)s - %(levelname)8s : loop %(message)s', '[%x %X]')
402
403 logHandler = logging.StreamHandler()
404 logHandler.setFormatter(logFormatter)
405
406 logger.addHandler(logHandler)
407
408
409 server = RemoteMessagingServer(port=opts.port, verbose=opts.verbose)
410 server.start()
411
412
413
414
415 if opts.runtime > 0:
416 print 'server started - will run for %ss' % opts.runtime
417 time.sleep(opts.runtime)
418 server.stop()
419 else:
420
421
422 while True:
423 time.sleep(60)
424
425 else:
426 sndr = RemoteMessagingSender(host=opts.ip, port=opts.port, name=LOGNAME)
427
428 for i in range(5):
429 print 'loop: %s' % i
430 extra = (i, socket.gethostname().split('.')[0], getLanIP())
431
432 sndr.info('%s : %s %s some sort of informational message', *extra)
433
434
435
436 time.sleep(1)
437
438 sndr.info('Done\n')
439 sndr.close()
440