Package qb :: Package backend :: Module pythonBackEnd
[hide private]
[frames] | no frames]

Source Code for Module qb.backend.pythonBackEnd

  1  ## ----------------------------------------------------------------------------- 
  2  ##    
  3  ##      Module defining the base class for Qube python jobTypes that communicate with a separate 
  4  ##      running python interpreter, usually running inside a 3rd-party application. 
  5  ## 
  6  ##      It also contains some convenience functions that are used by various backends. 
  7  ## 
  8  ##      Copyright: Pipelinefx L.L.C.  
  9  ## 
 10  ## ----------------------------------------------------------------------------- 
 11   
 12  #====================================== 
 13  #  $Revision: #20 $ 
 14  #  $Change: 16390 $ 
 15  #====================================== 
 16   
 17   
 18  import sys 
 19  import os 
 20  import time 
 21  import pprint 
 22  import logging 
 23  import stat 
 24   
 25  import qb.backend.utils as backendUtils 
 26  import qb.backend.logParser 
 27   
 28   
 29  #============================================================ 
 30  #       PythonQubeBackEnd class definition 
 31  #============================================================ 
 32   
33 -class PythonQubeBackEnd(object):
34 """ 35 The base class for PipelineFX's python back-end modules. 36 37 @cvar LOGREAD_TIME_THRESHOLD: how often to parse the job log data for regex matches 38 @type LOGREAD_TIME_THRESHOLD: C{int} 39 """ 40 41 QB_WAITING_TIMEOUT = 5 42 LOGREAD_TIME_THRESHOLD = 30 43
44 - def __init__(self, job):
45 """ 46 Function to initialize class members. 47 48 @param job: The qube job object passed to the worker, this represents a job instance. 49 50 @type job: qb.Job 51 """ 52 # submit the job with job['package']['dev'] = True to set the logging level to DEBUG 53 self.logging = logging.getLogger('%s' % self.__class__.__name__) 54 55 self.job = job 56 self.status = 0 57 58 self.dev = backendUtils.getDevBoolean(self.job) 59 60 if hasattr(backendUtils, 'getJobBoolean'): 61 self.outputPaths_required = backendUtils.getJobBoolean(self.job.get('package', {}).get('outputPaths_required', False)) 62 else: 63 # ------------------------------------------------------------ 64 # backwards compatibility - remove in the main branch for 6.9 65 # ------------------------------------------------------------ 66 try: 67 self.outputPaths_required = bool(self.job.get('package', {}).get('outputPaths_required', False)) 68 except ValueError: 69 self.outputPaths_required = False 70 71 # set the time to wait in seconds if you've updated the work result package for a running job 72 self.resultPkgUpdateThrottleTimeout = 5 73 74 self.redirectingStdErr = backendUtils.getJobPackageBoolean(self.job['package'].get('redirectStderrToStdout', False)) 75 76 # track the file offsets for each job log stream 77 self.jobLogs = dict( 78 zip(['stdout', 'stderr'], 79 backendUtils.getJobLogPaths(self.redirectingStdErr)) 80 ) 81 # catch development case, no job logs present, reset the jobLogs dict to empty, this will 82 # short-circuit the logHandler method 83 if '' in self.jobLogs.values(): 84 self.jobLogs = {} 85 86 self.fhOffsets = {'stderr': 0, 'stdout': 0} 87 88 # init the logParser, fall back to a default if one is not specified in the job. 89 if self.job.get('package', {}).get('logParser', {}).get('className'): 90 logParserClass = backendUtils.getClassFromJobData(self.job['package']['logParser']) 91 else: 92 logParserClass = qb.backend.logParser.LogParser 93 self.logParser = logParserClass(self.job) 94 95 if hasattr(qb, 'workerpathmap'): 96 self.workerPathMap = qb.workerpathmap() 97 else: 98 self.workerPathMap = {}
99
100 - def printClassInfo(self):
101 backendUtils.flushPrint('%16s: %s' % ('Backend class', self.__class__.__name__)) 102 backendUtils.flushPrint('%16s: %s' % ('Backend module', os.path.abspath(sys.modules[self.__class__.__module__].__file__))) 103 backendUtils.flushPrint('%16s: %s' % ('LogParser class', self.logParser.__class__.__name__)) 104 backendUtils.flushPrint('%16s: %s' % ('LogParser module', os.path.abspath(sys.modules[self.logParser.__class__.__module__].__file__)))
105
106 - def jobSetup(self):
107 """ 108 Perform any steps necessary to initialize the working enviroment prior to 109 beginning any agendaItem-specific steps. 110 """ 111 self.job['status'] = 'running' 112 113 if self.dev: 114 pp = pprint.PrettyPrinter(indent=2, width=1) 115 print "DEBUG BEGIN:" 116 print "JOB:" 117 pp.pprint(self.job) 118 print "DEBUG END:"
119
120 - def executeWork(self):
121 """ 122 Request an agendaItem (work) from the supervisor and do any steps necessary to 123 perform the work. 124 125 @raise NotImplementedError: Raised when this method is not overridden in a derived class. 126 """ 127 raise NotImplementedError
128
129 - def jobTeardown(self):
130 """ 131 Perform any steps necessary to clean up the working enviroment prior to 132 shutting down the job instance. 133 """ 134 pass
135
136 - def hasJobPhaseCmds(self, phase):
137 """ 138 Test for the existence of job setup/teardown cmds in the job package 139 """ 140 if 'package' in self.job and phase in self.job['package']: 141 hasCmds = True 142 else: 143 hasCmds = False 144 return hasCmds
145
146 - def getLogData(self, logpath, iostream, offset=0):
147 """ 148 seek fwd in a file, read to EOF, then record the file position, which will be the offset 149 the next time we enter this function 150 151 @param logpath: full path to a job log file to scan 152 @type logpath: C{str} 153 154 @param iostream: name of the iostream, usually "stderr" or "stdout" 155 @type iostream: C{str} 156 157 @param offset: position in file to begin to read data from 158 @type offset: C{int} 159 160 @ivar data: the section the the job log, usually multiple lines, as a single string 161 @type data: C{str} 162 163 @ivar fPos: the position in the file where reading stopped, length in bytes from BOF to EOF 164 @type fPos: C{int} 165 166 @return: a tuple containing the log data and the end position of the file 167 @rtype: C{tuple} 168 """ 169 data = '' 170 171 fh = open(logpath) 172 self.logging.debug('%s beginning offset: %s' % (iostream, offset)) 173 fh.seek(offset) 174 175 data = fh.read() 176 fPos = fh.tell() 177 178 self.logging.debug('%s offset: %s' % (iostream, offset)) 179 self.logging.debug('%s ending fPos: %s' % (iostream, fPos)) 180 self.logging.debug('%s len data (expected): %s' % (iostream, fPos-offset)) 181 182 fh.close() 183 return (data, fPos)
184
185 - def logHandler(self, work, qbTokens=None):
186 """ 187 Parse a job log for regex matches (progress, errors, outputPaths, etc...), update the 188 job or work packages with the logmatches, set the job status accordingly, and report 189 back to the supervisor. 190 191 @return: Return booleans for each error condition checked; error regex found a match, and 192 whether a size check failed 193 194 @rtype: C{tuple} 195 196 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the 197 context of the current running work item to aid in calculating the in-chunk progress. 198 199 @type qbTokens: C{dict} 200 """ 201 errorMatched = False 202 sizeCheckFailed = False 203 204 logMatches = {} 205 206 if self.jobLogs: 207 for (iostream, logpath) in self.jobLogs.items(): 208 209 self.logging.debug('%s %s %s' % ('-'*20, iostream, '-'*20)) 210 (logData, self.fhOffsets[iostream]) = self.getLogData(logpath, iostream, self.fhOffsets[iostream]) 211 212 if len(logData): 213 self.logging.debug('logData: %s' % len(logData)) 214 215 # Cull lines where the backend code itself is reporting an error regex match, 216 # causes subsequent frames to be marked as failed, since the matched error 217 # string is reported back as part of the error message... 218 if iostream == 'stderr': 219 culledLogData = [] 220 for l in logData.splitlines(): 221 if l.startswith('ERROR') and l.count('regex_errors found match'): 222 continue 223 else: 224 culledLogData.append(l) 225 logData = '\n'.join(culledLogData) 226 del culledLogData 227 228 logMatches.update( 229 self.parseLogData(logData, qbTokens) 230 ) 231 232 self.logging.debug('%s\n' % ('-'*48)) 233 234 if logMatches: 235 self.updateResultPackage(work, logMatches) 236 sizeCheckFailed = not(self.validateOutputFileSize(work)) 237 238 if 'errors' in logMatches: 239 errorMatched = True 240 self.logging.error('regex_errors found match: "%s"' % ', '.join(logMatches['errors'])) 241 242 if not (sizeCheckFailed or errorMatched): 243 # ------------------------------ 244 # TODO: get rid of these sleep()s as soon 245 # as qb.updateresultpackage is available 246 # ------------------------------ 247 if id(work) == id(self.job): 248 qb.reportjob(self.job) 249 time.sleep(5) 250 else: 251 qb.reportwork(work) 252 time.sleep(5) 253 254 return errorMatched, sizeCheckFailed
255
256 - def updateResultPackage(self, work, resultDict):
257 """ 258 Update the Qube qb.Work object's resultpackage dictionary. 259 260 The resultpackage is a dictionary that the Qube supervisor retrieves from 261 the worker after processing work. It can be used to update the job object 262 with data that is determined at execution time. 263 264 Any key/value pairs can be passed into the resultpackage as a dictionary; it's up to the 265 developer to pass key/values that are meaningful to Qube. 266 267 The most common use is to set the 'outputpaths' value for an agenda item. 268 This value is used by the Qube GUI to display the image in the "Output" 269 tab of the UI. 270 271 @param work: The Qube qb.Work object whose resultpackage dictionary is to be updated 272 273 @param resultDict: a dictionary containing key/value pairs to be inserted into the work's 274 resultpackage dictionary. 275 276 @type work: qb.Work 277 """ 278 if 'resultpackage' not in work or not work['resultpackage']: 279 work['resultpackage'] = {} 280 281 rpkg = work['resultpackage'] 282 283 for k in resultDict: 284 if k in rpkg: 285 if k == 'outputPaths': 286 outputPaths = rpkg[k] 287 # avoid inserting duplicates 288 for imgPath in resultDict[k].split(','): 289 if imgPath not in outputPaths: 290 outputPaths += ',%s' % imgPath 291 rpkg[k] = outputPaths 292 elif k == 'progress': 293 rpkg[k] = resultDict[k] 294 elif isinstance(rpkg[k], list): 295 rpkg[k].extend(resultDict[k]) 296 elif isinstance(rpkg[k], dict): 297 rpkg[k].update(resultDict[k]) 298 else: 299 rpkg[k] = resultDict[k]
300
301 - def parseLogData(self, data, *args):
302 """ 303 Find any matches to the job's regular expressions. 304 305 @param data: a portion of a job instance's job log 306 @type data: string 307 """ 308 if self.logParser: 309 matches = self.logParser.parse(data, *args) 310 else: 311 backendUtils.flushPrint('WARNING: parseLogData method called, but %s instance has no logParser defined\n' % self.__class__.__name__, fhList=sys.stderr) 312 313 self.logging.debug('Matches: %s' % len(matches)) 314 return matches
315
316 - def validateOutputFileSize(self, work):
317 """ 318 Check the work/jobInstance's resultpackage for any files in the outputPaths, and ensure that 319 their file size exceeds the job's minimum file size 320 321 @param work: The work/job object whose resultpackage dictionary is to be scanned 322 @type work: qb.Work or qb.Subjob 323 324 @return: return False if the contents of the outputPaths contain a file path whose size does not 325 exceed the job's validate_fileMinSize, otherwise return True for all other cases. 326 327 @rtype: C{bool} 328 """ 329 fileSizeOK = True 330 331 if 'validate_fileMinSize' in self.job['package']: 332 333 minFileSize = int(self.job['package']['validate_fileMinSize']) 334 if minFileSize > 0: 335 chunk_size = work.get('resultpackage', {}).get('outputPaths', '').count(',') 336 337 # only check the file size for up to the first five frames in a chunk 338 for f in work.get('resultpackage', {}).get('outputPaths', '').split(',')[:4]: 339 if f: 340 if os.path.isfile(f): 341 if chunk_size == 1: 342 # sleep to allow the filesystem to finish writing the file to disk, 343 # occasionally it sees it as 0 bytes 344 time.sleep(0.5) 345 sys.stdout.flush() 346 fSize = os.stat(f)[stat.ST_SIZE] 347 348 if fSize >= minFileSize: 349 #sys.stdout.flush() 350 backendUtils.flushPrint('INFO: file size check passed: %s > %s' % (fSize, minFileSize)) 351 else: 352 fileSizeOK = False 353 backendUtils.flushPrint('ERROR: An output file has failed a size verification check, and is smaller than %s bytes.' % minFileSize, fhList=[sys.stdout, sys.stderr]) 354 backendUtils.flushPrint('ERROR: %s is %s bytes' % (f, fSize), fhList=[sys.stdout, sys.stderr]) 355 else: 356 backendUtils.flushPrint('Warning: A value that matched the job\'s "outputPaths" regular expression is not a file.') 357 backendUtils.flushPrint('Warning: The regular expression may need some refinement.') 358 backendUtils.flushPrint('Warning: regex = "%s"' % self.job['package'].get('regex_outputPaths', '< regex_outputPaths is undefined>')) 359 backendUtils.flushPrint('Warning: matched = "%s"' % f) 360 361 return fileSizeOK
362