Package qb :: Package backend :: Module pythonChildBackEnd
[hide private]
[frames] | no frames]

Source Code for Module qb.backend.pythonChildBackEnd

  1  ''' 
  2      Module defining the base class for Qube python jobTypes which communicate with a separate 
  3      running python interpreter, usually running inside a 3rd-party application. 
  4   
  5      Copyright: Pipelinefx L.L.C.  
  6  ''' 
  7   
  8  #====================================== 
  9  #  $Revision: #16 $ 
 10  #  $Change: 16052 $ 
 11  #====================================== 
 12   
 13  import sys 
 14  import os.path 
 15  import time 
 16  import pprint 
 17  import traceback as tb 
 18   
 19  import qb.utils 
 20  import qb.backend.utils as backendUtils 
 21  import qb.backend.pythonBackEnd 
 22  import qb.backend.pythonChildHandler 
 23   
 24   
25 -class PythonChildBackEnd(qb.backend.pythonBackEnd.PythonQubeBackEnd):
26 """ 27 A python-based Qube backend that has a PyCmdDispatcher. 28 29 It will run another python interpreter of some sort (usually a 2D or 3D app) as a child process, 30 and interact with it via the PyCmdDispatcher. 31 """ 32 33 DEFAULT_PYTHON = sys.executable 34
35 - def __init__(self, job):
36 """ 37 """ 38 super(PythonChildBackEnd, self).__init__(job) 39 40 self.cmdDispatcher = None 41 self.childBootstrapper = None 42 self.pyExecutable = self.job['package'].get('pyExecutable', self.DEFAULT_PYTHON)
43
44 - def getSubprocessArgs(self, port):
45 """ 46 Determine the arguments necessary to invoke the child process. 47 48 This is the main method that differentiates derived classes, and is probably the only method 49 that will need to be overridden. 50 51 @param port: the port on which the PyCmdDispatcher's backchannel instance is listening, usually 52 passed as a parameter to the child_bootstrapper.py script which starts up the pyCmdExecutor 53 inside child process started by the PyCmdDispatcher 54 55 @type port: C{int} 56 57 @return: Return a tuple of a list of args to start the python interpreter, and a list of 58 python commands to initialize the python working environment. 59 60 @rtype: C{tuple} C{([childArgs], [pyInitCmds])} 61 62 @raise NotImplementedError: Raised when this method is not overridden in a derived class. 63 """ 64 raise NotImplementedError
65
66 - def generateChildBootstrapper(self, port):
67 """ 68 This is only overridden if a 3rd-party application which will be started cannot use the standard 69 child_bootstrapper.py script. This can happen if the 3rd-party app doesn't support passing 70 arguments to the bootstrapper. 71 72 @param port: the port on which the PyCmdDispatcher's backchannel instance is listening, usually 73 passed as a parameter to the child_bootstrapper.py script which starts up the pyCmdExecutor 74 inside child process started by the PyCmdDispatcher 75 76 @type port: C{int} 77 """ 78 return os.path.join(os.path.dirname(backendUtils.getModulePath()), 'child_bootstrapper.py')
79
80 - def initPyCmdDispatcher(self):
81 """ 82 Initialize a PyCmdDispatcher instance. 83 84 The PyCmdDispatcher is this side of the bi-directional communication with a python 85 interpeter running in a child process. It is the mechanism to dispatch the commands to the 86 interpreter and handle any return values or exceptions. 87 88 @return: Return a pythonChildHandler.PyCmdDispatcher object, which will have a running 89 python process of some sort (python, mayaPy, houdini, etc.) as a child attribute 90 91 @rtype: pythonChildHandler.PyCmdDispatcher 92 """ 93 initReturnCode = 0 94 cmdDispatcher = None 95 if os.path.exists(self.pyExecutable): 96 #============================================================ 97 # redirect stderr -> stdout if the job package dictates 98 # - this defines the "stderr" value for the subprocess.Popen constructor 99 #============================================================ 100 mergeStderr = self.job['package'].get('redirectStderrToStdout', False) 101 102 # create the Dispatcher instance, but don't yet start the child subprocess 103 cmdDispatcher = qb.backend.pythonChildHandler.PyCmdDispatcher(logHandler=self.logHandler, 104 mergeStderr=mergeStderr) 105 106 self.childBootstrapper = self.generateChildBootstrapper(cmdDispatcher.backChannel.port) 107 108 # inform the subprocess args of the port the PyCmdDispatcher is listening on 109 childArgs, initCmds = self.getSubprocessArgs(cmdDispatcher.backChannel.port) 110 111 # now actually start up the application that's going to do the work. 112 cmdDispatcher.startChild(childArgs) 113 114 # pump the cmdDispatcher for the app startup messages, pass them through the logHandler 115 cmdDispatcher.execute('print', self.job) 116 117 # Load up the minimal set of standard modules 118 initReturnCode += cmdDispatcher.execute('import sys') 119 initReturnCode += cmdDispatcher.execute('import os') 120 initReturnCode += cmdDispatcher.execute('import time') 121 122 for cmd in initCmds: 123 initReturnCode += cmdDispatcher.execute(cmd, self.job) 124 else: 125 initReturnCode = 1 126 msg = 'Startup application not found: %s' % self.pyExecutable 127 print 'ERROR: %s' % msg 128 self.logging.error(msg) 129 130 return (cmdDispatcher, initReturnCode)
131
132 - def jobSetup(self):
133 """ 134 Perform any steps necessary to initialize the working enviroment prior to beginning any 135 agendaItem-specific steps. 136 """ 137 super(PythonChildBackEnd, self).jobSetup() 138 139 backendUtils.bannerPrint('Starting Python initialization') 140 (self.cmdDispatcher, initReturnCode) = self.initPyCmdDispatcher() 141 142 if initReturnCode > 0: 143 backendUtils.bannerPrint('ERROR: Python initialization failed', fhList=[sys.stdout, sys.stderr]) 144 self.logging.error('Unable to start application session.') 145 self.job['status'] = 'failed' 146 qb.reportjob(self.job) 147 148 try: 149 backendUtils.bannerPrint('Shutting down python session', fhList=[sys.stdout, sys.stderr]) 150 self.cmdDispatcher.close() 151 except TypeError: 152 pass 153 154 sys.exit(initReturnCode) 155 else: 156 backendUtils.bannerPrint('Finished Python initialization') 157 #============================================= 158 # optional jobSetup commands 159 #============================================= 160 cmdRetCode = 0 161 if 'jobSetupCmds' in self.job['package'] and self.job['package']['jobSetupCmds']: 162 # execute the setup commands in the job package 163 backendUtils.bannerPrint('Starting job setup commands') 164 for cmd in self.job['package']['jobSetupCmds']: 165 cmd = qb.utils.translateQbConvertPathStrings(cmd) 166 cmdRetCode = self.cmdDispatcher.execute(cmd, self.job) 167 168 if cmdRetCode != 0: 169 sys.stdout.write('ERROR: unable to successfully execute command "%s"\n' % cmd) 170 sys.stdout.write('WARNING: reporting job instance as failed\n') 171 qb.reportjob('failed') 172 sys.exit(cmdRetCode) 173 backendUtils.bannerPrint('Finished job setup commands')
174
175 - def executeWork(self):
176 """ 177 Request an agendaItem (work) from the supervisor and do any steps necessary to perform the 178 work. 179 """ 180 while True: 181 work_status = 1 182 183 backendUtils.bannerPrint('Requesting work', fhList=[sys.stdout, sys.stderr]) 184 work = qb.requestwork() 185 186 if self.dev: 187 print "DEBUG BEGIN:" 188 print "WORK:" 189 pprint.pprint(work) 190 print "DEBUG END:" 191 192 # Deal with the minimal set of work statuses 193 if work['status'] == "failed": 194 # preflights failed, so skip this agenda item and mark it failed 195 print 'preflights for work [%s:%s] failed' % (self.job['id'], work['name']) 196 work['status'] = 'failed' 197 qb.reportwork(work) 198 continue 199 if work['status'] == 'complete': 200 work_status = 0 201 break 202 elif work['status'] == 'pending': 203 # preempted -- bail out 204 print 'job %s has been preempted' % self.job['id'] 205 work_status = 0 206 qb.reportjob('pending') 207 break 208 elif work['status'] == 'blocked': 209 # blocked -- perhaps part of a dependency chain 210 print 'job %s has been blocked' % self.job['id'] 211 work_status = 0 212 qb.reportjob('blocked') 213 break 214 elif work['status'] == 'waiting': 215 # waiting -- rare, come back in 30s 216 print 'job %s will be back in %s seconds' % (self.job['id'], self.QB_WAITING_TIMEOUT) 217 sys.stdout.flush() 218 for i in range(self.QB_WAITING_TIMEOUT*100): 219 time.sleep(0.01) 220 continue 221 222 # Dispatch the commands to the python running in the child process 223 # one at a time. If any of the commands fail, set the agenda item 224 # as failed. If the subprocess dies, set the job instance as failed, 225 # and requeue the work. 226 backendUtils.bannerPrint('Executing work package commands for work: %s' % work['name'], fhList=[sys.stdout, sys.stderr]) 227 wrkCmdRetCode = 0 228 try: 229 for cmd in work['package']['commands']: 230 231 # Do the actual work. 232 wrkCmdRetCode = self.cmdDispatcher.execute(cmd, work) 233 234 if wrkCmdRetCode != 0: 235 work_status = wrkCmdRetCode 236 break 237 238 elif self.cmdDispatcher.child.returncode: 239 # if the child process has died, mark the job instance and the work as failed... 240 sys.stderr.write('subprocess seems to have died...\n') 241 self.status = self.cmdDispatcher.child.returncode 242 work_status = self.status 243 break 244 245 else: 246 work_status = 0 247 248 except Exception: 249 sys.stderr.write(tb.format_exc()) 250 backendUtils.bannerPrint('Finished work package commands for work: %s' % work['name']) 251 252 if work.get('resultpackage') is None: 253 work['resultpackage'] = {} 254 # ----------------------------------------------------------- 255 # set the work status, then report it back to the supervisor 256 # so that it can update the server-side agenda 257 # ----------------------------------------------------------- 258 if work_status != 0: 259 # either the work or the job instance itself has failed 260 work['status'] = 'failed' 261 elif self.outputPaths_required and len(work.get('resultpackage', {}).get('outputPaths', '')) == 0: 262 work['status'] = 'failed' 263 backendUtils.flushPrint('WARNING: no "regex_outputPaths" match was found, setting agenda item status to "failed".', fhList=[sys.stdout, sys.stderr]) 264 else: 265 # mark the work as complete, and reset both the failure counter and timer 266 work['status'] = 'complete' 267 268 backendUtils.bannerPrint('Reporting work as %(status)s: %(name)s ' % work, fhList=[sys.stderr]) 269 qb.reportwork(work) 270 271 # don't do another requestwork() if the job instance status != 0 272 # - child process has crashed 273 if self.status != 0: 274 print 'ERROR: The child python process (mayapy, houdini, etc) has exited prematurely.' 275 print 'INFO: Will attempt to restart this job instance on another host, if its job instance retry limit has not been exceeded.' 276 break
277
278 - def jobTeardown(self):
279 """ 280 Perform any steps necessary to clean up the working enviroment prior to shutting down the 281 job instance. 282 """ 283 if 'jobTeardownCmds' in self.job['package'] and self.job['package']['jobTeardownCmds']: 284 self.cmdDispatcher.execute(self.job['package']['jobTeardownCmds']) 285 286 retCode = self.cmdDispatcher.close(self.status) 287 288 print 'Python session exited with code %s' % retCode 289 290 if self.status == 0: 291 qb.reportjob('complete') 292 else: 293 qb.reportjob('failed') 294 295 sys.stderr.flush()
296