Package qb :: Package backend :: Module pythonChildHandler
[hide private]
[frames] | no frames]

Source Code for Module qb.backend.pythonChildHandler

  1  """ 
  2      This module defines 2 tightly-coupled classes; PyCmdDispatcher and PyCmdExecutor, along with 3 other 
  3      classes used as FIFO's, PfxUnixFIFO and PfxNamedPipeFIFO, which derive from PfxFIFO. 
  4   
  5      They are used to spawn a python child process and send commands to that process.  The PyCmdDispatcher 
  6      instance is the parent and dispatches commands to the child, which has a PyCmdExecutor instance. 
  7   
  8      Commands are dispatched over the child's stdin. 
  9   
 10      The success or failure of the PyCmdExecutor's ability to execute the command is communicated back to 
 11      the PyCmdDispatcher over a FIFO-like object, which is an PfxFIFO instance. 
 12  """ 
 13  #====================================== 
 14  #  $Revision: #11 $ 
 15  #  $Change: 16370 $ 
 16  #  $File: //depot/main/qube/src/api/python/qb/backend/pythonChildHandler.py $ 
 17  #====================================== 
 18   
 19  import sys 
 20  import os 
 21  import time 
 22  import logging 
 23  import subprocess 
 24   
 25  import inspect 
 26  import traceback as tb 
 27   
 28  import socket 
 29  import SocketServer 
 30  import threading 
 31  import xdrlib 
 32   
 33  from qb.utils import translateQbConvertPathStrings 
 34  from qb.backend.utils import PFXSimpleTimer, flushPrint 
 35   
 36   
37 -def getThisModulePath():
38 frame = inspect.currentframe() 39 thisModulePath = os.path.abspath(inspect.getfile(frame)) 40 jobtypeDir = os.path.dirname(thisModulePath) 41 del frame 42 return (thisModulePath, jobtypeDir)
43 44 45 # A global lock used to arbitrate access to the PFXSimpleServer buffer string - the PFXSimpleServer's 46 # requestHandler writes to the buffer, and the PyCmdDispatcher resets the buffer once it's read. 47 buffer_lock = threading.Lock() 48 49
50 -class PyCmdDispatcher(object):
51 """ 52 The PyCmdDispatcher class is used by qube python jobtypes to execute commands through an 53 abstracted application or shell. 54 55 The jobtype can have several sets of commands, defined as either a list of strings or a single 56 string, for job setup, job teardown, and per-agendaItem commands. 57 58 Command Execution 59 ================= 60 Commands are sent as string to the child process via stdin. The child is responsible for 61 communicating the success/failure of the command over a FIFO-like object known as the backChannel. 62 A successful command execution results in a zero being written to the backChannel by the 63 child. A non-zero indicates some sort of an error occurred attempting to execute the 64 command. 65 66 What Constitutes Command Failure 67 -------------------------------- 68 In the case of a python child process, failure is defined as any command that raises an 69 uncaught exception. The traceback is passed as a string from the child to the parent to be 70 printed to sys.stderr. 71 72 @cvar BLOCKSIZE: Blocksize for reading from an i/o stream. 73 @type BLOCKSIZE: int 74 75 @cvar POLL_TIMEOUT: Timeout value in milliseconds for select.poll() objects. 76 @type POLL_TIMEOUT: int 77 78 @cvar CHILD_CHECK_INTERVAL: how long (in seconds) to sleep between checks to see if the child 79 process is finsihed executing the command 80 @type CHILD_CHECK_INTERVAL: C{int} 81 82 @cvar LOG_PARSE_INTERVAL: how long (in seconds) to wait between parsing the logs for errors and 83 other regex matches 84 @type LOG_PARSE_INTERVAL: C{int} 85 86 @ivar child: The running child process, which will contain a PyCmdExecutor instance. 87 @type child: U{subprocess.Popen<http://www.python.org/doc/2.5.2/lib/node532.html>} 88 89 @ivar backChannel: The "out of band" pipe used to communicate success/failure from the 90 PyCmdExecutor to the PyCmdDispatcher; distinct from the PyCmdExecutor's stdout/stderr, avoids 91 having to parse the child's stdout/stderr for a sign that the last-sent command has completed 92 executing. 93 @type backChannel: L{PFXSimpleSocketServer} 94 """ 95 CHILD_CHECK_INTERVAL = 0.2 96 LOG_PARSE_INTERVAL = 5 97
98 - def __init__(self, logHandler=None, mergeStderr=False, debug=False):
99 """ 100 init function to set up members of the class 101 102 @param logHandler: a code object, it's the C{logHandler()} method of the backend class that's running the job 103 that's instantiating this. 104 105 @type logHandler: C{function} 106 107 @param mergeStderr: whether to merge stderr->stdout 108 109 @type mergeStderr: C{bool} 110 """ 111 self.logging = logging.getLogger('%s' % self.__class__.__name__) 112 113 loggingLevel = logging.INFO 114 loggingFormat = logging.BASIC_FORMAT 115 116 self.debug = debug 117 if self.debug: 118 loggingLevel = logging.DEBUG 119 loggingFormat = '%(name)20s : %(levelname)-8s : %(message)s (%(filename)s:%(lineno)d, %(threadName)s)' 120 121 loggerHdlr = logging.StreamHandler(None) 122 loggerHdlr.setFormatter(logging.Formatter(loggingFormat, None)) 123 self.logging.addHandler(loggerHdlr) 124 125 self.logging.setLevel(loggingLevel) 126 127 self.logHandler = logHandler 128 self.mergeStderr = mergeStderr 129 130 # startup the backChannel for out-of-band communication between this object and the child 131 self.backChannel = self.__startupBackChannel() 132 self.packer = xdrlib.Packer() 133 self.simple_server_thread = None 134 135 # a timer so that we only check the job logs every 5s or so 136 self.timer = PFXSimpleTimer() 137 138 # Start up the child which will instantiate a PyCmdExecutor object that we will talk to. 139 self.child = None 140 self.child_ready_for_cmd = False
141
142 - def __startupBackChannel(self):
143 """ 144 Instantiate a L{PFXSimpleSocketServer} and start it in another thread 145 """ 146 backChannel = PFXSimpleSocketServer() 147 148 self.simple_server_thread = threading.Thread(target=backChannel.start, kwargs={'poll_interval': 0.1}) 149 self.simple_server_thread.setName('ChildHandlerBackChannel') 150 self.simple_server_thread.setDaemon(True) 151 self.simple_server_thread.start() 152 153 while backChannel.port is None: 154 self.logging.warning('looping waiting for backChannel SimpleServer to start') 155 time.sleep(0.5) 156 157 return backChannel
158
159 - def __send(self, msg):
160 """ 161 Send a string to the child process's stdin. 162 163 @param msg: string to send 164 @type msg: C{string} 165 166 @return: the message length, or -1 if the child is non-responsive 167 @rtype: int 168 """ 169 # ensure that the child is still alive 170 if self.child.returncode is not None: 171 self.logging.debug('child returncode is not None') 172 return -1 173 else: 174 self.packer.pack_int(len(msg)) 175 msgLenPacked = self.packer.get_buffer() 176 177 self.logging.debug('sending %s bytes...' % len(msg)) 178 # send the packed msg length to the child first as the payload size 179 self.child.stdin.write(msgLenPacked) 180 self.logging.debug('sent %s in a packed format...' % len(msg)) 181 182 # now send the actual message 183 self.logging.debug('sending msg: %s' % msg) 184 self.child.stdin.write(msg) 185 self.logging.debug('sent msg: %s' % msg) 186 self.child.stdin.flush() 187 188 self.packer.reset() 189 self.logging.debug('reset XDRLib packer') 190 191 return len(msg)
192
193 - def __getBackChannelBuffer(self):
194 """ 195 Read the first 4 bytes from the PFXSimpleServer's buffer; the first 4 bytes which are 196 expected to contain the payload length as an integer in a packed format. Retrieve the 197 message 198 199 @return: The message length and the message, message length is -1 if the pipe has closed. 200 @rtype: C{tuple} 201 """ 202 (msgLenPacked, msg) = self.backChannel.buffer 203 204 if len(msgLenPacked) == 4: 205 unpacker = xdrlib.Unpacker(msgLenPacked) 206 msgLen = unpacker.unpack_int() 207 else: 208 # indicate that the pipe has closed 209 msgLen = -1 210 211 # reset the buffer 212 buffer_lock.acquire() 213 self.backChannel.buffer = () 214 buffer_lock.release() 215 216 return msgLen, msg
217
218 - def startChild(self, subprocessArgs):
219 """ 220 Start up a child process, which will actually do the work for the job. 221 222 @param subprocessArgs: arg is a 3-element array, suitable for passing as the first parameter 223 to U{subprocess.Popen<http://www.python.org/doc/2.5.2/lib/node528.html>}. 224 225 1. full path the the shell eg:'/bin/tcsh' 226 2. '-c' 227 3. the entire command to run to launch the child process, as a single string 228 229 @type subprocessArgs: list 230 """ 231 def __printChildArgs(): 232 flushPrint('Subprocess initialization command:') 233 flushPrint('%s' % '-'*50) 234 if sys.platform == 'win32': 235 flushPrint(subprocessArgs[0]) 236 else: 237 flushPrint('%s %s' % tuple(subprocessArgs[0:2])) 238 for x in subprocessArgs[2].split('; '): 239 flushPrint('%s%s' % (' '*4, x)) 240 flushPrint('%s' % '-'*50)
241 242 __printChildArgs() 243 244 #============================================================ 245 # redirect stderr -> stdout if desired, set at job submission 246 #============================================================ 247 childStderr = None 248 if self.mergeStderr: 249 childStderr = subprocess.STDOUT 250 251 if os.name == 'nt': 252 childArgs = ''.join(subprocessArgs) 253 else: 254 childArgs = subprocessArgs 255 256 ON_POSIX = 'posix' in sys.builtin_module_names 257 258 self.child = subprocess.Popen(childArgs, 259 stdin=subprocess.PIPE, 260 stderr=childStderr, 261 close_fds=ON_POSIX, 262 env=os.environ) 263 264 self.logging.debug('child started') 265 self.logging.debug('child pid: %s' % self.child.pid) 266 self.child_ready_for_cmd = True 267 self.timer.startTimer()
268
269 - def execute(self, commands, work=None):
270 """ 271 Pass the commands in a list to a child process one by one for execution by C{eval}. 272 273 @param commands: A list (optionally a string for a single command) of commands to be 274 executed by a child process. If commands is a string, it will be re-cast as 275 a single-element list. 276 277 @type commands: list 278 279 The following two forms yield identical results:: 280 281 >>> cmds = ['import sys', 'print sys.version_info' ] 282 >>> execute(cmds) 283 284 >>> execute('import sys') 285 >>> execute('print sys.version_info') 286 287 @return: 0 for success, 1 for failure. 288 @rtype: C{int} 289 """ 290 if isinstance(commands, str): 291 commands = [commands] 292 293 self.logging.debug('commands: %s' % commands.__repr__()) 294 return_code = 0 295 error_regex_matched = False 296 file_size_check_failed = False 297 298 #======================================= 299 # setup to parse the logs 300 #======================================= 301 if not self.timer.timerIsRunning: 302 self.timer.startTimer() 303 304 while self.child.returncode is None and not (error_regex_matched or file_size_check_failed): 305 if self.child_ready_for_cmd: 306 # send the child the next command 307 try: 308 cmd = commands.pop(0) 309 self.logging.debug('cmd: "%s"' % cmd) 310 cmd = translateQbConvertPathStrings(cmd) 311 self.logging.debug('cmd - post-Xlate: "%s"' % cmd) 312 except IndexError: 313 break 314 315 send_result = self.__send(cmd) 316 if send_result > -1: 317 # this will be set True again when the child returns something on the backChannel 318 self.child_ready_for_cmd = False 319 else: 320 self.logging.warning('child process seems to have died\n') 321 sys.stdout.write('child process seems to have died\n') 322 return_code = 1 323 break 324 325 if len(self.backChannel.buffer) == 2: 326 #=================================================================================== 327 # once the child executes the command and the command returns, this buffer will hold 328 # a tuple of payloadSize and an optional message - the child returns a zero-length 329 # message if the command executed does not raise an exception; if it does, the 330 # message is the traceback itself 331 #=================================================================================== 332 333 # the child responded, it's ready for the next command to be sent 334 self.child_ready_for_cmd = True 335 336 # get the payload size from the backChannel 337 (msgLen, msg) = self.__getBackChannelBuffer() 338 339 #=================================================================================== 340 # a 0-length msg == successful command execution 341 #=================================================================================== 342 # now deal with any message that was not zero bytes long, 343 if msgLen != 0: # an unnecessary conditional, added for visual clarity 344 if msgLen > 0: 345 #=========================================================================== 346 # a non-zero msgLen indicates that a message has been passed back up from 347 # the child on the back-channel 348 # 349 # this only happens when there's been an error executing a command,the msg 350 # is the string representation of the traceback. 351 #=========================================================================== 352 return_code = 1 353 flushPrint(msg, fhList=[sys.stderr]) 354 break 355 elif msgLen < 0: 356 self.logging.debug('msgLen < 0') 357 self.logging.error('backChannel closed unexpectedly') 358 return_code = 1 359 break 360 361 # parse any new data written to the job logs for errors, highlights, etc... 362 if self.timer.elapsedTime() > self.LOG_PARSE_INTERVAL and self.logHandler: 363 (error_regex_matched, file_size_check_failed) = self.logHandler(work) 364 365 if (error_regex_matched or file_size_check_failed): 366 self.logging.warning('error_regex matched or file size check failed...') 367 break 368 369 self.timer.startTimer() 370 371 for fh in [sys.stdout, sys.stderr]: 372 fh.flush() 373 374 time.sleep(self.CHILD_CHECK_INTERVAL) 375 376 # check to see if the cmd just executed shut down the child 377 if self.child.poll(): 378 break 379 380 if commands and (commands[0].count('sys.exit(0)') == 0): 381 return_code = 1 382 sys.stderr.write('%s' % '-'*60 + '\n') 383 sys.stderr.write('ERROR: from %s.execute()\n' % self.__class__.__name__) 384 sys.stderr.write('\tChild process has exited prematurely with commands remaining\n') 385 sys.stderr.write('\tReturning a code of %i, indicating failure\n' % self.child.wait()) 386 sys.stderr.write('%s' % '-'*60 + '\n') 387 sys.stderr.flush() 388 389 # now check the job logs one last time 390 if self.logHandler and not (error_regex_matched or file_size_check_failed): 391 (error_regex_matched, file_size_check_failed) = self.logHandler(work) 392 if (error_regex_matched or file_size_check_failed): 393 self.logging.warning('error_regex matched or file size check failed...') 394 395 if error_regex_matched or file_size_check_failed: 396 return_code = 1 397 398 return return_code
399
400 - def close(self, exitcode=0):
401 """ 402 Signal the child process to shut down. 403 404 @param exitcode: Suggested exit code of the child process. 405 @type exitcode: int 406 407 @return: Actual value the child exited with. 408 @rtype: int 409 """ 410 if not self.child.returncode: 411 self.execute('sys.exit(%i)' % exitcode) 412 else: 413 sys.stderr.write('ERROR: from %s.close()\n' % self.__class__.__name__) 414 sys.stderr.write('\tUnable to close child process, it seems to already be closed\n') 415 416 # shut down the PFXSimpleServer 417 self.backChannel.stop() 418 419 if self.simple_server_thread: 420 self.simple_server_thread.join(2.0) 421 422 return self.child.wait()
423 424
425 -class PyCmdExecutor(object):
426 """ 427 The PyCmdExecutor class is used by qube loadOnce python jobtypes. It is tightly coupled to the 428 PyCmdDispatcher class; the 2 classes are server/client relations. 429 430 A PyCmdExecutor object is instantiated by the child process and is responsible for executing 431 commands sent from the PyCmdDispatcher over stdin. It is normally instantiated inside a child 432 bootstrapper script. 433 434 Initialize 2 empty dicts to serve as the locals() and globals() for the C{exec} statement used 435 to execute the command. 436 437 B{Note}: The definition of failure is not a boolean result of the command executed, it is the 438 inability of the PyCmdExecutor object to actually execute the command without raising an 439 exception. 440 """ 441
442 - def __init__(self, buffer_port, promptType=None, debug=False):
443 """ 444 Function to set up members of the class. 445 446 @param buffer_port: The port on which a PFXSimpleServer instance is running on the localhost, 447 used as a backChannel for communication between this instance and the pyCmdDispatcher 448 instance sending it commands to be executed. 449 450 @type buffer_port: C{int} 451 452 @param promptType: Used to determine the string used as a prompt that precedes the command 453 when echoing the command to stdout. 454 455 @type promptType: C{str} 456 """ 457 self.logging = logging.getLogger('%s' % self.__class__.__name__) 458 459 loggingLevel = logging.WARNING 460 loggingFormat = logging.BASIC_FORMAT 461 462 if debug: 463 loggingLevel = logging.DEBUG 464 loggingFormat = '%(name)20s : %(levelname)-8s : %(message)s (%(filename)s:%(lineno)d, %(threadName)s)' 465 466 loggerHdlr = logging.StreamHandler(None) 467 loggerHdlr.setFormatter(logging.Formatter(loggingFormat, None)) 468 469 self.logging.addHandler(loggerHdlr) 470 self.logging.setLevel(loggingLevel) 471 472 self.backChannel = PFXSimpleClient(int(buffer_port)) 473 474 self.exitCode = 0 475 self.packer = xdrlib.Packer() 476 self.ZERO_PACKED = self.__initPacker() 477 478 self.globals = {} 479 self.locals = {} 480 481 self.cmdPrompt = self.__getCommandPrompt(promptType)
482
483 - def __initPacker(self):
484 """ 485 Initialize an U{xdrlib.Packer<p://www.python.org/doc/2.5.2/lib/xdr-packer-objects.html>} 486 instance for internal use by the class member. 487 488 The packer instance is accessed via the self.packer attribute. 489 490 @return: the integer 0 packed into 4 bytes. This is the data most commonly sent back to the 491 PyCmdDispatcher parent, so it's packed once and re-used. 492 493 @rtype: C{str} 494 """ 495 self.packer.pack_int(0) 496 ZERO_PACKED = self.packer.get_buffer() 497 self.packer.reset() 498 499 return ZERO_PACKED
500
501 - def __getCommandPrompt(self, promptType='>>'):
502 """ 503 Determine the string that is printed prior to echoing out to stdout the command about to be run. 504 505 @return: The string to be used as a command prompt. 506 507 @rtype: str 508 """ 509 prompt = { 510 'maya': 'mayaPy>', 511 'houdini': 'houdiniPy>', 512 'nuke': 'nukePy>', 513 } 514 515 return prompt.get(promptType, '%s>' % promptType)
516
517 - def _send(self, msg):
518 """ 519 Write a message via a PFXSimpleClient to the pyCmdDispatcher sending this instance commands. 520 521 Successful execution results in the integer 0, packed into 4 bytes, being sent with no other 522 message behind it. Failure, defined as any command which raises an uncaught exception, 523 results in the traceback from the exception being sent, preceded by the packed messageLength. 524 525 @param msg: The message to be sent, prepended with the payload size packed into 4 bytes. 526 527 @type msg: C{string} 528 """ 529 530 if msg == self.ZERO_PACKED: 531 self.backChannel.send(msg) 532 self.logging.debug('sent: ZERO_PACKED = %s' % self.ZERO_PACKED.__repr__()) 533 else: 534 self.logging.debug('sending msg: %s' % msg) 535 self.packer.pack_int(len(msg)) 536 msgLenPacked = self.packer.get_buffer() 537 self.packer.reset() 538 539 sent = 0 540 while sent < len(msg): 541 sent = self.backChannel.send(msg[sent:]) 542 543 self.logging.debug('bytes sent: %s' % sent)
544
545 - def execute(self, cmd):
546 """ 547 C{exec} a string as a python command. 548 549 Most exceptions raised by the failure of C{exec} will be caught and handled by 550 L{PyCmdExecutor.mainloop}. The only exception caught here will be SystemExit. 551 552 @param cmd: The command to be run. 553 @type cmd: C{string} 554 555 @return: C{None} on success or the exitcode passed to sys.exit() 556 557 @rtype: NoneType or int 558 """ 559 560 sys.stdout.write('%s %s\n' % (self.cmdPrompt, cmd)) 561 if hasattr(sys.stdout, 'flush'): 562 sys.stdout.flush() 563 564 try: 565 exec cmd in self.globals, self.locals 566 #================================================================= 567 # definition of successful execution == no exception 568 # send an int 0, which the Dispatcher will interpret as success 569 #================================================================= 570 self._send(self.ZERO_PACKED) 571 return None 572 573 except SystemExit, exitValue: 574 self.logging.debug('SystemExit, returncode:%s' % exitValue) 575 return exitValue
576
577 - def mainloop(self):
578 """ 579 Main loop of the PyCmdExecutor. Responsible solely for reading commands to execute over 580 stdin. 581 582 Successful execution results in a zero integer being passed back to the the PyCmdDispatcher 583 via the PFXClient instance (the backChannel). Unsuccessful commands result in an error 584 message of some sort being returned via the backChannel. 585 586 The traceback generated by the failure to execute the command is what gets written to the 587 backChannel. 588 589 @return: The exit code of the child process 590 591 @rtype: int 592 """ 593 594 while True: 595 #======================================================================================= 596 # Read a command over stdin. 597 # 598 # The commands are preceded by a 4-byte packed integer which describes the length of the 599 # string to follow. 600 # 601 # The string is the command(s) to be executed. 602 #======================================================================================= 603 604 unpacker = xdrlib.Unpacker(sys.stdin.read(4)) 605 cmd = sys.stdin.read(unpacker.unpack_int()) 606 unpacker.reset('') 607 608 try: 609 self.logging.debug('executing: %s' % cmd) 610 611 result = self.execute(cmd) 612 613 self.logging.debug('result: %s' % result) 614 615 if result is not None: 616 # if we're here, the command was sys.exit() 617 self.exitCode = result.code 618 break 619 except Exception: 620 # some sort of exception has occurred, 621 # so send the traceback up to the parent 622 tback = '%s' % '-'*80 + '\n' 623 try: 624 tback += ' ERROR: %s\n' % cmd.__str__() 625 tback += '%s' % '-'*80 + '\n' 626 except Exception: 627 pass 628 629 tback += tb.format_exc() 630 tback += '%s' % '-'*80 + '\n' 631 632 self._send(tback) 633 634 return self.exitCode
635 636
637 -class PFXRequestHandler(SocketServer.BaseRequestHandler):
638 """ 639 """
640 - def __init__(self, request, client_address, server):
641 self.logging = logging.getLogger(self.__class__.__name__) 642 SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
643
644 - def handle(self):
645 """ 646 Each request is expected to have the payload size packed by xdrlib.Packer encoded in the first 647 4 bytes, optionally followed by a message. 648 """ 649 payloadSize = self.request.recv(4) 650 while len(payloadSize) < 4: 651 payloadSize = self.request.recv(4-len(payloadSize)) 652 653 self.logging.debug('payloadSize: %s' % payloadSize.__repr__()) 654 655 if len(payloadSize) == 4: 656 unpacker = xdrlib.Unpacker(payloadSize) 657 msgLen = unpacker.unpack_int() 658 unpacker.reset('') 659 self.logging.debug('recvd: %s' % msgLen) 660 else: 661 raise Exception 662 663 msg = '' 664 while len(msg) < msgLen: 665 msg = msg + self.request.recv(msgLen - len(msg)) 666 667 if len(msg): 668 self.logging.debug('MSG: %s' % msg) 669 670 buffer_lock.acquire() 671 self.server.buffer = (payloadSize, msg) 672 buffer_lock.release()
673 674
675 -class PFXSimpleSocketServer(SocketServer.TCPServer):
676 """ 677 Listens for output from PFXSimpleClient on the same host, usually running in another python 678 interpreter inside a 3rd-party application such as Maya. 679 680 Uses a port auto-assigned by the OS if possible, doesn't seem to be available on python 2.4 681 running on CentOS 5.x, otherwise use a port in the range 55000-56000 682 683 @ivar buffer: The requestHandler class will store any received messages as a tuple in the 684 instance's buffer attribute; the tuple will contain the message length as a packed integer, 685 and the message itself. 686 687 @type buffer: C{tuple} 688 """ 689 690 # only used for IPC on the same host 691 # port 0 allows the OS to auto-assign the next available port 692 SERVER_ADDRESS = ('127.0.0.1', 0) 693 694 if sys.version_info[1] < 6: 695 import random 696 SERVER_ADDRESS = (SERVER_ADDRESS[0], 55000 + random.randrange(1, 1000)) 697
698 - def __init__(self, handlerClass=PFXRequestHandler, timeout=1):
699 SocketServer.TCPServer.__init__(self, self.SERVER_ADDRESS, handlerClass) 700 self.port = self.server_address[1] 701 self.logging = logging.getLogger(' %s @ %i' % (self.__class__.__name__, self.port)) 702 703 self.buffer = tuple()
704
705 - def start(self, **kwargs):
706 self.logging.debug('Starting mainloop') 707 self.logging.debug(sys.version_info.__repr__()) 708 if sys.version_info[1] < 6: 709 if kwargs: 710 self.logging.warning('SocketServer.BaseServer.serve_forever takes no args in python versions < 2.6') 711 self.logging.warning('Skipping args: %s' % kwargs.keys()) 712 self.logging.debug('Starting serve_forever') 713 self.serve_forever() 714 else: 715 self.logging.debug('Starting serve_forever with kwargs: %s' % kwargs.__repr__()) 716 self.serve_forever(**kwargs)
717
718 - def stop(self):
719 if hasattr(self, 'shutdown'): 720 # no shutdown() in SocketServer from python 2.4 721 self.shutdown() 722 self.server_close()
723 724
725 -class PFXSimpleClient(object):
726 """ 727 """
728 - def __init__(self, server_port):
729 self.logging = logging.getLogger(self.__class__.__name__) 730 731 self.server_address = ('127.0.0.1', server_port) 732 self.logging.debug('server address: %s' % self.server_address.__repr__()) 733 734 self.packer = xdrlib.Packer() 735 self.ZERO_PACKED = self.__initPacker()
736
737 - def __initPacker(self):
738 """ 739 Initialize an U{xdrlib.Packer<p://www.python.org/doc/2.5.2/lib/xdr-packer-objects.html>} 740 object for internal use by the class member. 741 742 The packer object is accessed via the self.packer attribute. 743 744 @return: the integer 0 packed into 4 bytes. This is the data most commonly sent back to the 745 PyCmdDispatcher parent, so it's packed once and re-used. 746 747 @rtype: C{str} 748 """ 749 self.packer.pack_int(0) 750 ZERO_PACKED = self.packer.get_buffer() 751 self.packer.reset() 752 753 return ZERO_PACKED
754
755 - def send(self, msg):
756 skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 757 skt.connect(self.server_address) 758 759 if msg == self.ZERO_PACKED: 760 sent = skt.send(msg) 761 else: 762 self.packer.pack_int(len(msg)) 763 msgLenPacked = self.packer.get_buffer() 764 self.packer.reset() 765 766 skt.send(msgLenPacked) 767 768 sent = 0 769 while sent < len(msg): 770 sent += skt.send(msg[sent:]) 771 772 self.logging.debug('sent: %s' % sent) 773 774 skt.close() 775 776 return sent
777 778 if False: # __name__ == '__main__': #or os.environ.get('PFX_DEV'): 779 logging.basicConfig(level=logging.INFO) 780 thisDir = getThisModulePath()[1] 781 782 childBootStrapper = os.path.join(thisDir, 'child_bootstrapper.py') 783 pyCmdLine = '%s -u "%s" --port __PORT__' % (sys.executable, os.path.normpath(childBootStrapper)) 784 785 if sys.platform == 'win32': 786 childArgs = [pyCmdLine] 787 else: 788 childArgs = ['/bin/tcsh', '-c', pyCmdLine] 789 cd = PyCmdDispatcher(childArgs, debug=False) 790 791 cd.execute('print "Hello World"') 792 cd.execute('import sys') 793 cd.execute('import os') 794 cd.execute('print "%s" % "+"*16*1024') 795 cd.execute('print "that was a 16K string, check the length.."') 796 cd.execute(('1/0')) 797 cd.execute("""import time 798 for i in range(10): 799 print 'sleeping, %s' % i 800 time.sleep(1)""") 801 cd.execute('pid = os.getpid()') 802 cd.execute('pgid = os.getpgid(pid)') 803 cd.execute('print "from child - pgid:%s" % pgid') 804 805 try: 806 print '__main__: Parent pid: %s' % os.getpid() 807 print '__main__: Child pid: %s' % cd.child.pid 808 if hasattr(os, 'getpgid'): 809 print '__main__: Child pgrp: %s' % os.getpgid(cd.child.pid) 810 except: 811 print 'child process died' 812 813 exit_code = cd.close() 814 logging.info('PyCmdDispatcher closed with exitCode: %s' % exit_code) 815 sys.exit(exit_code) 816