Package qb :: Package backend :: Module commandBackEnd
[hide private]
[frames] | no frames]

Source Code for Module qb.backend.commandBackEnd

  1  ''' 
  2      A base class for command-line and command range jobtypes written in Python 
  3  ''' 
  4  #====================================== 
  5  #  $Revision: #28 $ 
  6  #  $Change: 16521 $ 
  7  #====================================== 
  8   
  9  # vim: set foldcolumn=4 foldnestmax=3 : 
 10   
 11  import sys 
 12  import os 
 13  import re 
 14  import time  
 15  import pprint 
 16  import logging 
 17  import socket 
 18  import platform 
 19   
 20  import qb.backend.pythonBackEnd 
 21  import qb.backend.logParser 
 22  import qb.backend.utils as backendUtils 
 23   
 24  from qb.backend import QubeBackEndError 
 25  import qb.utils 
 26   
 27   
 28  try: 
 29      import subprocess 
 30  except ImportError: 
 31      if os.name == 'posix': 
 32          import popen2 
 33   
 34   
35 -class CommandBackEnd(qb.backend.pythonBackEnd.PythonQubeBackEnd):
36 """ 37 A base class for command-line and command range jobtypes written in Python. 38 39 The basic runtime log parsing functionality is defined here. 40 """ 41
42 - def runCmd(self, work, cmd, qbTokens=None):
43 """ 44 Determine which module we can use to run the command: 45 46 * subprocess if we're on python >= 2.4 47 * popen2 if we're on an older python on linux 48 49 @param work: either a Qube job instance or an agenda item 50 @type work: C{dict} 51 52 @param cmd: the cmd for the child process 53 @type cmd: C{str} 54 55 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the 56 context of the current running work item to aid in calculating the in-chunk progress. 57 58 @type qbTokens: C{dict} 59 60 @return: the return code of the child process' command 61 @rtype: C{int} 62 """ 63 64 #============================================================= 65 # do run-time path translation by applying any worker-side 66 # path mappings to the entire cmd string 67 #============================================================= 68 preTranslationCmd = cmd 69 70 #=============================================== 71 # Do worker-side path translation if available 72 #=============================================== 73 if qb.utils.flags.isFlagSet('job_flags', 'convert_path', self.job['flags']): 74 # ============================================================ 75 # if convert_path job flag is set, apply the path conversion 76 # to the entire cmdline 77 # ============================================================ 78 cmd = qb.convertpath(cmd) 79 else: 80 # ============================================================ 81 # convert any paths wrapped in QB_CONVERT_PATH() 82 # ============================================================ 83 cmd = qb.utils.translateQbConvertPathStrings(cmd) 84 85 #============================================================= 86 # do the 'auto-pathing' for the job's application executable 87 #============================================================= 88 if 'appVersion' in self.job['package']: 89 self.logging.info('Scanning for 3rd-party application...') 90 try: 91 92 cmd = backendUtils.translateAppPath(cmd, self.job['package']['appVersion']) 93 94 except QubeBackEndError, e: 95 #===================================================== 96 # BAIL out here with an error condition, 97 # since the application can't be found 98 #===================================================== 99 100 # print an appropriate error message 101 hostname = socket.gethostname() 102 errMsg = backendUtils.formatExc(limit=1) 103 backendUtils.flushPrint('ERROR: %s - %s' % (hostname, errMsg), fhList=[sys.stdout, sys.stderr]) 104 105 #===================================================== 106 # update the job's resultpackage, then reportjob(), so that the errors show up in 107 # the highlights panel 108 #===================================================== 109 errors = self.job.get('resultpackage', {}).get('errors', []) 110 errors.append(e.value) 111 self.job.setdefault('resultpackage', {})['errors'] = errors 112 113 self.job['status'] = 'failed' 114 qb.reportjob(self.job) 115 116 # sleep, wait for supervisor to update the job resultpackage before this backend 117 # exits, so that the resultpackage is not tossed before it gets written to the db 118 time.sleep(5) 119 120 return 1 121 122 else: 123 self.logging.debug('not attempting 3rd-party application auto-pathing') 124 125 if cmd != preTranslationCmd: 126 self.logging.info('Paths in the command have been translated as per this worker\'s worker_path_map') 127 self.logging.info(r'%s %s' % (' '*4, preTranslationCmd)) 128 self.logging.info(r'%s -> %s' % (' '*1, cmd)) 129 130 #====================================================================================== 131 # CommandHandler instances provide the opportunity to setup the command for a specific 132 # application and OS, e.g, maya on OS X requires sourcing MayaEnv.sh 133 # 134 # For now, only appFinder jobs have a commandHandler in their job package, and it's added to 135 # the job by the appFinder's execute.py backend. But we're running the 136 # handler.prepareCommand() here so that in the future, other jobs have commandHandlers as 137 # well. 138 #====================================================================================== 139 preTranslationCmd = cmd 140 141 if 'commandHandler' in self.job['package']: 142 try: 143 cmd = self.job['package']['commandHandler'].prepareCommand(self.job, cmd) 144 if cmd != preTranslationCmd: 145 self.logging.info('The command has been modified via a application-specific commandHandler') 146 self.logging.info(r'%s %s' % (' '*4, preTranslationCmd)) 147 self.logging.info(r'%s -> %s' % (' '*1, cmd)) 148 except: 149 logging.error(backendUtils.formatExc()) 150 151 elif os.name == 'nt' and backendUtils.pyVerAsFloat() < 2.7: 152 # ============================================================ 153 # The default commandHandler for Windows already does this, 154 # but if there isn't one, then quote the cmd here 155 #------------------------------------------------------------- 156 # Why pre-python 2.7 ? 157 # Because subprocess.list2cmdline() behaves differently < 2.7 158 # ============================================================ 159 cmd = '"%s"' % cmd 160 161 #============================================================== 162 # build the argument list to start the child process 163 #============================================================== 164 try: 165 if os.name == 'posix': 166 if 'shell' in self.job['package']: 167 shell = self.job['package']['shell'] 168 else: 169 shell = '/bin/sh' 170 171 childArgs = [shell, '-c', r'%s' % cmd] 172 backendUtils.flushPrint('COMMAND: %s' % ' '.join(childArgs)) 173 174 else: 175 # Windows takes a single string, cast it as a raw string to avoid having to escape 176 # any potential esc-chars 177 childArgs = r'%s' % cmd 178 backendUtils.flushPrint('COMMAND: %s' % childArgs) 179 180 #================================== 181 # Actually run the command 182 #================================== 183 retCode = self.runCmdWithSubprocess(work, childArgs, qbTokens) 184 185 except: 186 retCode = 1 187 backendUtils.flushPrint(backendUtils.formatExc(), fhList=[sys.stderr]) 188 189 return retCode
190
191 - def runCmdWithSubprocess(self, work, childArgs, qbTokens=None):
192 """ 193 Run via subprocess(), monitor child process and parse worker's job logs while the child is alive 194 195 Log parsing is done while the child is alive, and once more after the job instance returns. 196 197 @param work: either a Qube job instance or an agenda item 198 @type work: C{dict} 199 200 @param childArgs: the arguments for the child process, the first element is the shell 201 @type childArgs: C{list} 202 203 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the 204 context of the current running work item to aid in calculating the in-chunk progress. 205 206 @type qbTokens: C{dict} 207 208 @return: the return code of the child process' command 209 @rtype: C{int} 210 """ 211 212 #=========================================================================== 213 # scan the job for anything that will customize the execution environment 214 #=========================================================================== 215 childCwd = None 216 if self.job['cwd']: 217 if os.path.isdir(self.job['cwd']): 218 self.logging.debug('Running job from directory: %s' % self.job['cwd']) 219 childCwd = self.job['cwd'] 220 else: 221 self.logging.debug('The working directory specified in the job is invalid: %s' % self.job['cwd']) 222 self.logging.debug('Will use the following working directory: %s' % os.getcwd()) 223 224 if self.job['env']: 225 os.environ.update(self.job['env']) 226 227 # run-time OS environment variables 228 if 'env_runTimeOS' in self.job['package']: 229 runTimeOSEnv = self.job['package']['env_runTimeOS'].get(platform.system(), {}) 230 os.environ.update(runTimeOSEnv) 231 232 childStderr = None 233 if self.redirectingStdErr: 234 childStderr = subprocess.STDOUT 235 236 if os.name == 'nt': 237 shell = True 238 else: 239 shell = False 240 #=========================== 241 # actually fire up the job 242 #=========================== 243 child = subprocess.Popen(childArgs, stderr=childStderr, env=os.environ, cwd=childCwd, shell=shell) 244 245 #=========================== 246 # setup to parse the logs 247 #=========================== 248 errorRegexMatched = False 249 fileSizeCheckFailed = False 250 251 stdoutLog, stderrLog = backendUtils.getJobLogPaths(self.redirectingStdErr) 252 253 if len(stdoutLog) and \ 254 (self.redirectingStdErr or ( 255 not(self.redirectingStdErr) and len(stderrLog))): 256 self.jobLogs = {'stdout': stdoutLog} 257 258 if not self.redirectingStdErr: 259 self.jobLogs['stderr'] = stderrLog 260 261 #=========================== 262 # now parse the logs 263 #=========================== 264 while child.poll() is None and not (errorRegexMatched or fileSizeCheckFailed): 265 266 backendUtils.flushPrint('INFO: Scanning logs for errors, outputPaths, etc...', fhList=sys.stderr) 267 errorRegexMatched, fileSizeCheckFailed = self.logHandler(work, qbTokens) 268 if errorRegexMatched or fileSizeCheckFailed: 269 work['status'] = 'failed' 270 break 271 272 for i in range(1, self.LOGREAD_TIME_THRESHOLD+1): 273 # don't get caught waiting for up to 5s if the child has just exited) 274 if child.poll() is None: 275 time.sleep(1) 276 277 #================================================================ 278 # now that the child has exited, check the logs one more time 279 #================================================================ 280 errorRegexMatched, fileSizeCheckFailed = self.logHandler(work, qbTokens) 281 if errorRegexMatched or fileSizeCheckFailed: 282 work['status'] = 'failed' 283 else: 284 self.logging.warning('Unable to find job logs, no log parsing will occur during job instance execution') 285 child.wait() 286 287 sys.stdout.flush() 288 289 #=========================================================================================== 290 # the job instance may have had its status set as a result of an error found during the log 291 # parsing, so even if the cmd returns 0, there may still be an error condition 292 #=========================================================================================== 293 if self.job['status'] == 'failed' or work['status'] == 'failed': 294 retCode = 1 295 else: 296 retCode = child.returncode 297 298 return retCode
299
300 - def executeWork(self):
301 """ 302 This method must be defined for all derived classes. 303 """ 304 raise NotImplementedError
305