Package pyCmdrange :: Module cmdrangeBackEnd
[hide private]
[frames] | no frames]

Source Code for Module pyCmdrange.cmdrangeBackEnd

  1  ''' 
  2      A class that implements the standard Qube! cmdrange jobtype in python. 
  3      It should be simpler to maintain and extend than the older C++ variant. 
  4  ''' 
  5  #====================================== 
  6  #  $Revision: #9 $ 
  7  #  $Change: 16052 $ 
  8  #====================================== 
  9   
 10   
 11  import os.path 
 12  import sys 
 13  import re 
 14  import time  
 15  import select 
 16  import pprint 
 17  import logging 
 18   
 19  import qb.backend.commandBackEnd 
 20  import qb.backend.logParser 
 21  import qb.backend.utils as backendUtils 
 22   
 23  try: 
 24      import subprocess 
 25  except ImportError: 
 26      if os.name == 'posix': 
 27          import popen2 
 28   
 29  #============================================================ 
 30  #       only compile the regex objects once 
 31  #============================================================ 
 32  QB_TOKEN_RGX = [ 
 33      re.compile('QB_FRAME_NUMBER'), 
 34      re.compile('QB_FRAME_START'), 
 35      re.compile('QB_FRAME_END'), 
 36      re.compile('QB_FRAME_STEP'), 
 37      re.compile('QB_FRAME_RANGE'), 
 38  ] 
 39   
 40   
41 -def evaluateQBTokens(cmd, fRange, padding=1):
42 ''' 43 Replace all QB_* tokens with the appropriate values from the object's frame range or job stage 44 45 fRange is either the job's frame range value, or the agenda item's specific frame range 46 47 @return: Both the cmd with all QB_FRAME* tokens evaluted, and a dictionary of all tokens and their values. 48 49 @rtype: C{tuple} 50 ''' 51 padding = int(padding) 52 fStep = 1 53 54 if not re.search('[-,]', fRange): 55 # find the trivial single-frame case, with neither a '-' nor a ',' 56 fEnd = fStart = fRange 57 58 elif fRange.count('-') and not fRange.count(','): 59 if not fRange.startswith('-'): 60 # most common case, simple frame range beginning with a positive number 61 (fStart, fEnd) = fRange.split('-', 1) 62 elif fRange.count('-') == 1: 63 # a single negative number 64 fStart = fEnd = fRange 65 else: 66 # rare, a frame range whose start is a negative number 67 m = re.search('(-\d+)-(-?\d+.*)', fRange) 68 (fStart, fEnd) = m.groups() 69 70 if fEnd.count('x'): 71 (fEnd, fStep) = fEnd.split('x') 72 else: 73 # a disjointed frame range containing a comma 74 fList = qb.rangesplit(fRange) 75 fStart = fList[0] 76 fEnd = fList[-1] 77 78 tokens = {} 79 tokens['QB_FRAME_START'] = tokens['QB_FRAME_NUMBER'] = str('%0*d' % (padding, int(fStart))) 80 tokens['QB_FRAME_END'] = str('%0*d' % (padding, int(fEnd))) 81 tokens['QB_FRAME_STEP'] = str(fStep) 82 tokens['QB_FRAME_RANGE'] = fRange 83 84 for rgx in QB_TOKEN_RGX: 85 cmd = rgx.sub(tokens[rgx.pattern], cmd) 86 87 return (cmd, tokens)
88 89
90 -class CmdRangeBackEnd(qb.backend.commandBackEnd.CommandBackEnd):
91 ''' 92 A backend for a python-based cmdrange jobtype which allows for extensible log parsing. 93 ''' 94
95 - def __init__(self, job):
96 super(CmdRangeBackEnd, self).__init__(job) 97 98 # submit the job with job['package']['dev'] = True to set the logging level to DEBUG 99 self.logging = logging.getLogger('%s' % self.__class__.__name__)
100
101 - def executeWork(self):
102 ''' 103 Now that we've got a job from the supervisor and have fired up the python 104 interpreter in the child process, it's time to ask the supervisor for an 105 agenda item to process. 106 107 Everything we need to know to get the work done is in either the job's or 108 the work's package dict. 109 ''' 110 if self.dev: 111 pp = pprint.PrettyPrinter(indent=2, width=1) 112 113 while True: 114 work_status = 1 115 116 backendUtils.bannerPrint('Requesting work', fhList=[sys.stdout, sys.stderr]) 117 work = qb.requestwork() 118 119 if self.dev: 120 print "WORK:" 121 pp.pprint(work) 122 123 # Deal with the minimal set of work statuses 124 if work['status'] == "failed": 125 # preflights failed, so skip this agenda item and mark it failed 126 print 'preflights for work [%s:%s] failed' % (self.job['id'], work['name']) 127 work['status'] = 'failed' 128 qb.reportwork(work) 129 continue 130 elif work['status'] == 'complete': 131 work_status = 0 132 break 133 elif work['status'] == 'pending': 134 # preempted -- bail out 135 print 'job %s has been preempted' % self.job['id'] 136 work_status = 0 137 qb.reportjob('pending') 138 break 139 elif work['status'] == 'blocked': 140 # blocked -- perhaps part of a dependency chain 141 print 'job %s has been blocked' % self.job['id'] 142 work_status = 0 143 qb.reportjob('blocked') 144 break 145 elif work['status'] == 'waiting': 146 # waiting -- rare, come back in QB_WAITING_TIMEOUT seconds 147 print 'job %s will be back in %s seconds' % (self.job['id'], self.QB_WAITING_TIMEOUT) 148 time.sleep(self.QB_WAITING_TIMEOUT) 149 continue 150 151 if not work['package']: 152 work['package'] = {} 153 154 #============================================================ 155 # Do the actual work 156 #============================================================ 157 backendUtils.bannerPrint('Processing work: %s' % work['name']) 158 159 cmd = self.job['package']['cmdline'] 160 (cmd, qbTokens) = evaluateQBTokens(cmd, work['name'], self.job['package'].get('padding', 1)) 161 162 retCode = self.runCmd(work, cmd, qbTokens) 163 164 if work['status'] == 'failed': 165 backendUtils.flushPrint('ERROR: Processing work failed\n', fhList=[sys.stdout, sys.stderr]) 166 work_status = 1 167 else: 168 work_status = retCode 169 170 if work.get('resultpackage') is None: 171 work['resultpackage'] = {} 172 # ----------------------------------------------------------- 173 # set the work status, then report it back to the supervisor 174 # so that it can update the server-side agenda 175 # ----------------------------------------------------------- 176 if work_status != 0: 177 # either the work or the job instance itself has failed 178 work['status'] = 'failed' 179 elif self.outputPaths_required and len(work.get('resultpackage', {}).get('outputPaths', '')) == 0: 180 work['status'] = 'failed' 181 backendUtils.flushPrint('WARNING: no "regex_outputPaths" match was found, setting agenda item status to "failed".', fhList=[sys.stdout, sys.stderr]) 182 else: 183 # mark the work as complete, and reset both the failure counter and timer 184 work['status'] = 'complete' 185 186 # report the work status back to the supervisor 187 backendUtils.bannerPrint('Reporting work as %(status)s: %(name)s ' % work, fhList=[sys.stdout, sys.stderr]) 188 189 qb.reportwork(work)
190 191 192 if __name__ == '__main__': 193 ''' 194 A "hello world", meant to show how to build and submit a pyCmdrange job. 195 ''' 196 import sys, os 197 import qb 198 import cmdrangeFrontEnd 199 200 # It's only neccessary to setsupervisor() if you have multiple supervisors in your environment 201 #qb.setsupervisor('build01') 202 203 fRange = '1-50' 204 chunkSize = 25 205 206 jobList = [] 207 208 if '--dev' in sys.argv: 209 dev = True 210 else: 211 dev = False 212 213 cmd = '%s/test/printFrameNumber.py QB_FRAME_START QB_FRAME_END 1' % os.getcwd() 214 jobA = cmdrangeFrontEnd.CmdRangeJob(cmd, fRange, chunkSize=chunkSize, padding=2, dev=dev) 215 jobA['cpus'] = 2 216 jobA['package']['regex_progress'] = 'Frame: (.*) of' 217 jobA.setLogParser('CmdRangeChunkLogParser') 218 jobA['name'] = 'print frame numbers' 219 220 jobList.append(jobA) 221 222 cmd = '%s/test/printProgressPercent.py 0.25' % os.getcwd() 223 jobB = cmdrangeFrontEnd.CmdRangeJob(cmd, '1-8x2', padding=4, dev=dev) 224 jobB['cpus'] = 2 225 jobB['package']['regex_progress'] = 'Progress:(.*)%' 226 jobB.setLogParser('ProgressPercentageLogParser') 227 jobB['name'] = 'print progress percentage message' 228 229 jobList.append(jobB) 230 231 cmd = '%s/test/mimicAErender.py QB_FRAME_START QB_FRAME_END QB_FRAME_STEP 0.25' % os.getcwd() 232 jobC = cmdrangeFrontEnd.CmdRangeJob(cmd, fRange, chunkSize=chunkSize, dev=dev) 233 jobC['cpus'] = 2 234 jobC['package']['regex_progress'] = '^PROGRESS:.*\((\d+)\): \d+ Seconds' 235 jobC.setLogParser('CmdRangeChunkLogParser') 236 jobC['name'] = 'AfterEffects sequence render messages' 237 238 jobList.append(jobC) 239 240 cmd = '%s/test/mimicAErender.py QB_FRAME_START QB_FRAME_END QB_FRAME_STEP 0.5' % os.getcwd() 241 jobD = cmdrangeFrontEnd.CmdRangeJob(cmd, fRange, partitionCount=1, dev=dev) 242 jobD['cpus'] = 2 243 jobD['package']['regex_progress'] = '^PROGRESS:.*\((\d+)\): \d+ Seconds' 244 jobD.setLogParser('CmdRangeChunkLogParser') 245 jobD['name'] = 'AfterEffects movie render messages' 246 247 jobList.append(jobD) 248 249 if '--arc' in sys.argv: 250 arcFile = './job.qja' 251 arcSize = qb.archivejob(arcFile, jobA, qb.QB_API_BINARY) 252 print 'exported job. wrote: %s bytes to file: %s' % (arcSize, arcFile) 253 else: 254 for j in qb.submit(jobList): 255 print 'submitted: %(id)s' % j 256 257 sys.exit() 258