1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import sys
19 import os
20 import time
21 import pprint
22 import logging
23 import stat
24
25 import qb.backend.utils as backendUtils
26 import qb.backend.logParser
27
28
29
30
31
32
34 """
35 The base class for PipelineFX's python back-end modules.
36
37 @cvar LOGREAD_TIME_THRESHOLD: how often to parse the job log data for regex matches
38 @type LOGREAD_TIME_THRESHOLD: C{int}
39 """
40
41 QB_WAITING_TIMEOUT = 5
42 LOGREAD_TIME_THRESHOLD = 30
43
45 """
46 Function to initialize class members.
47
48 @param job: The qube job object passed to the worker, this represents a job instance.
49
50 @type job: qb.Job
51 """
52
53 self.logging = logging.getLogger('%s' % self.__class__.__name__)
54
55 self.job = job
56 self.status = 0
57
58 self.dev = backendUtils.getDevBoolean(self.job)
59
60 if hasattr(backendUtils, 'getJobBoolean'):
61 self.outputPaths_required = backendUtils.getJobBoolean(self.job.get('package', {}).get('outputPaths_required', False))
62 else:
63
64
65
66 try:
67 self.outputPaths_required = bool(self.job.get('package', {}).get('outputPaths_required', False))
68 except ValueError:
69 self.outputPaths_required = False
70
71
72 self.resultPkgUpdateThrottleTimeout = 5
73
74 self.redirectingStdErr = backendUtils.getJobPackageBoolean(self.job['package'].get('redirectStderrToStdout', False))
75
76
77 self.jobLogs = dict(
78 zip(['stdout', 'stderr'],
79 backendUtils.getJobLogPaths(self.redirectingStdErr))
80 )
81
82
83 if '' in self.jobLogs.values():
84 self.jobLogs = {}
85
86 self.fhOffsets = {'stderr': 0, 'stdout': 0}
87
88
89 if self.job.get('package', {}).get('logParser', {}).get('className'):
90 logParserClass = backendUtils.getClassFromJobData(self.job['package']['logParser'])
91 else:
92 logParserClass = qb.backend.logParser.LogParser
93 self.logParser = logParserClass(self.job)
94
95 if hasattr(qb, 'workerpathmap'):
96 self.workerPathMap = qb.workerpathmap()
97 else:
98 self.workerPathMap = {}
99
101 backendUtils.flushPrint('%16s: %s' % ('Backend class', self.__class__.__name__))
102 backendUtils.flushPrint('%16s: %s' % ('Backend module', os.path.abspath(sys.modules[self.__class__.__module__].__file__)))
103 backendUtils.flushPrint('%16s: %s' % ('LogParser class', self.logParser.__class__.__name__))
104 backendUtils.flushPrint('%16s: %s' % ('LogParser module', os.path.abspath(sys.modules[self.logParser.__class__.__module__].__file__)))
105
107 """
108 Perform any steps necessary to initialize the working enviroment prior to
109 beginning any agendaItem-specific steps.
110 """
111 self.job['status'] = 'running'
112
113 if self.dev:
114 pp = pprint.PrettyPrinter(indent=2, width=1)
115 print "DEBUG BEGIN:"
116 print "JOB:"
117 pp.pprint(self.job)
118 print "DEBUG END:"
119
121 """
122 Request an agendaItem (work) from the supervisor and do any steps necessary to
123 perform the work.
124
125 @raise NotImplementedError: Raised when this method is not overridden in a derived class.
126 """
127 raise NotImplementedError
128
130 """
131 Perform any steps necessary to clean up the working enviroment prior to
132 shutting down the job instance.
133 """
134 pass
135
137 """
138 Test for the existence of job setup/teardown cmds in the job package
139 """
140 if 'package' in self.job and phase in self.job['package']:
141 hasCmds = True
142 else:
143 hasCmds = False
144 return hasCmds
145
146 - def getLogData(self, logpath, iostream, offset=0):
147 """
148 seek fwd in a file, read to EOF, then record the file position, which will be the offset
149 the next time we enter this function
150
151 @param logpath: full path to a job log file to scan
152 @type logpath: C{str}
153
154 @param iostream: name of the iostream, usually "stderr" or "stdout"
155 @type iostream: C{str}
156
157 @param offset: position in file to begin to read data from
158 @type offset: C{int}
159
160 @ivar data: the section the the job log, usually multiple lines, as a single string
161 @type data: C{str}
162
163 @ivar fPos: the position in the file where reading stopped, length in bytes from BOF to EOF
164 @type fPos: C{int}
165
166 @return: a tuple containing the log data and the end position of the file
167 @rtype: C{tuple}
168 """
169 data = ''
170
171 fh = open(logpath)
172 self.logging.debug('%s beginning offset: %s' % (iostream, offset))
173 fh.seek(offset)
174
175 data = fh.read()
176 fPos = fh.tell()
177
178 self.logging.debug('%s offset: %s' % (iostream, offset))
179 self.logging.debug('%s ending fPos: %s' % (iostream, fPos))
180 self.logging.debug('%s len data (expected): %s' % (iostream, fPos-offset))
181
182 fh.close()
183 return (data, fPos)
184
186 """
187 Parse a job log for regex matches (progress, errors, outputPaths, etc...), update the
188 job or work packages with the logmatches, set the job status accordingly, and report
189 back to the supervisor.
190
191 @return: Return booleans for each error condition checked; error regex found a match, and
192 whether a size check failed
193
194 @rtype: C{tuple}
195
196 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the
197 context of the current running work item to aid in calculating the in-chunk progress.
198
199 @type qbTokens: C{dict}
200 """
201 errorMatched = False
202 sizeCheckFailed = False
203
204 logMatches = {}
205
206 if self.jobLogs:
207 for (iostream, logpath) in self.jobLogs.items():
208
209 self.logging.debug('%s %s %s' % ('-'*20, iostream, '-'*20))
210 (logData, self.fhOffsets[iostream]) = self.getLogData(logpath, iostream, self.fhOffsets[iostream])
211
212 if len(logData):
213 self.logging.debug('logData: %s' % len(logData))
214
215
216
217
218 if iostream == 'stderr':
219 culledLogData = []
220 for l in logData.splitlines():
221 if l.startswith('ERROR') and l.count('regex_errors found match'):
222 continue
223 else:
224 culledLogData.append(l)
225 logData = '\n'.join(culledLogData)
226 del culledLogData
227
228 logMatches.update(
229 self.parseLogData(logData, qbTokens)
230 )
231
232 self.logging.debug('%s\n' % ('-'*48))
233
234 if logMatches:
235 self.updateResultPackage(work, logMatches)
236 sizeCheckFailed = not(self.validateOutputFileSize(work))
237
238 if 'errors' in logMatches:
239 errorMatched = True
240 self.logging.error('regex_errors found match: "%s"' % ', '.join(logMatches['errors']))
241
242 if not (sizeCheckFailed or errorMatched):
243
244
245
246
247 if id(work) == id(self.job):
248 qb.reportjob(self.job)
249 time.sleep(5)
250 else:
251 qb.reportwork(work)
252 time.sleep(5)
253
254 return errorMatched, sizeCheckFailed
255
257 """
258 Update the Qube qb.Work object's resultpackage dictionary.
259
260 The resultpackage is a dictionary that the Qube supervisor retrieves from
261 the worker after processing work. It can be used to update the job object
262 with data that is determined at execution time.
263
264 Any key/value pairs can be passed into the resultpackage as a dictionary; it's up to the
265 developer to pass key/values that are meaningful to Qube.
266
267 The most common use is to set the 'outputpaths' value for an agenda item.
268 This value is used by the Qube GUI to display the image in the "Output"
269 tab of the UI.
270
271 @param work: The Qube qb.Work object whose resultpackage dictionary is to be updated
272
273 @param resultDict: a dictionary containing key/value pairs to be inserted into the work's
274 resultpackage dictionary.
275
276 @type work: qb.Work
277 """
278 if 'resultpackage' not in work or not work['resultpackage']:
279 work['resultpackage'] = {}
280
281 rpkg = work['resultpackage']
282
283 for k in resultDict:
284 if k in rpkg:
285 if k == 'outputPaths':
286 outputPaths = rpkg[k]
287
288 for imgPath in resultDict[k].split(','):
289 if imgPath not in outputPaths:
290 outputPaths += ',%s' % imgPath
291 rpkg[k] = outputPaths
292 elif k == 'progress':
293 rpkg[k] = resultDict[k]
294 elif isinstance(rpkg[k], list):
295 rpkg[k].extend(resultDict[k])
296 elif isinstance(rpkg[k], dict):
297 rpkg[k].update(resultDict[k])
298 else:
299 rpkg[k] = resultDict[k]
300
302 """
303 Find any matches to the job's regular expressions.
304
305 @param data: a portion of a job instance's job log
306 @type data: string
307 """
308 if self.logParser:
309 matches = self.logParser.parse(data, *args)
310 else:
311 backendUtils.flushPrint('WARNING: parseLogData method called, but %s instance has no logParser defined\n' % self.__class__.__name__, fhList=sys.stderr)
312
313 self.logging.debug('Matches: %s' % len(matches))
314 return matches
315
317 """
318 Check the work/jobInstance's resultpackage for any files in the outputPaths, and ensure that
319 their file size exceeds the job's minimum file size
320
321 @param work: The work/job object whose resultpackage dictionary is to be scanned
322 @type work: qb.Work or qb.Subjob
323
324 @return: return False if the contents of the outputPaths contain a file path whose size does not
325 exceed the job's validate_fileMinSize, otherwise return True for all other cases.
326
327 @rtype: C{bool}
328 """
329 fileSizeOK = True
330
331 if 'validate_fileMinSize' in self.job['package']:
332
333 minFileSize = int(self.job['package']['validate_fileMinSize'])
334 if minFileSize > 0:
335 chunk_size = work.get('resultpackage', {}).get('outputPaths', '').count(',')
336
337
338 for f in work.get('resultpackage', {}).get('outputPaths', '').split(',')[:4]:
339 if f:
340 if os.path.isfile(f):
341 if chunk_size == 1:
342
343
344 time.sleep(0.5)
345 sys.stdout.flush()
346 fSize = os.stat(f)[stat.ST_SIZE]
347
348 if fSize >= minFileSize:
349
350 backendUtils.flushPrint('INFO: file size check passed: %s > %s' % (fSize, minFileSize))
351 else:
352 fileSizeOK = False
353 backendUtils.flushPrint('ERROR: An output file has failed a size verification check, and is smaller than %s bytes.' % minFileSize, fhList=[sys.stdout, sys.stderr])
354 backendUtils.flushPrint('ERROR: %s is %s bytes' % (f, fSize), fhList=[sys.stdout, sys.stderr])
355 else:
356 backendUtils.flushPrint('Warning: A value that matched the job\'s "outputPaths" regular expression is not a file.')
357 backendUtils.flushPrint('Warning: The regular expression may need some refinement.')
358 backendUtils.flushPrint('Warning: regex = "%s"' % self.job['package'].get('regex_outputPaths', '< regex_outputPaths is undefined>'))
359 backendUtils.flushPrint('Warning: matched = "%s"' % f)
360
361 return fileSizeOK
362