1 '''
2 A base class for command-line and command range jobtypes written in Python
3 '''
4
5
6
7
8
9
10
11 import sys
12 import os
13 import re
14 import time
15 import pprint
16 import logging
17 import socket
18 import platform
19
20 import qb.backend.pythonBackEnd
21 import qb.backend.logParser
22 import qb.backend.utils as backendUtils
23
24 from qb.backend import QubeBackEndError
25 import qb.utils
26
27
28 try:
29 import subprocess
30 except ImportError:
31 if os.name == 'posix':
32 import popen2
33
34
36 """
37 A base class for command-line and command range jobtypes written in Python.
38
39 The basic runtime log parsing functionality is defined here.
40 """
41
42 - def runCmd(self, work, cmd, qbTokens=None):
43 """
44 Determine which module we can use to run the command:
45
46 * subprocess if we're on python >= 2.4
47 * popen2 if we're on an older python on linux
48
49 @param work: either a Qube job instance or an agenda item
50 @type work: C{dict}
51
52 @param cmd: the cmd for the child process
53 @type cmd: C{str}
54
55 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the
56 context of the current running work item to aid in calculating the in-chunk progress.
57
58 @type qbTokens: C{dict}
59
60 @return: the return code of the child process' command
61 @rtype: C{int}
62 """
63
64
65
66
67
68 preTranslationCmd = cmd
69
70
71
72
73 if qb.utils.flags.isFlagSet('job_flags', 'convert_path', self.job['flags']):
74
75
76
77
78 cmd = qb.convertpath(cmd)
79 else:
80
81
82
83 cmd = qb.utils.translateQbConvertPathStrings(cmd)
84
85
86
87
88 if 'appVersion' in self.job['package']:
89 self.logging.info('Scanning for 3rd-party application...')
90 try:
91
92 cmd = backendUtils.translateAppPath(cmd, self.job['package']['appVersion'])
93
94 except QubeBackEndError, e:
95
96
97
98
99
100
101 hostname = socket.gethostname()
102 errMsg = backendUtils.formatExc(limit=1)
103 backendUtils.flushPrint('ERROR: %s - %s' % (hostname, errMsg), fhList=[sys.stdout, sys.stderr])
104
105
106
107
108
109 errors = self.job.get('resultpackage', {}).get('errors', [])
110 errors.append(e.value)
111 self.job.setdefault('resultpackage', {})['errors'] = errors
112
113 self.job['status'] = 'failed'
114 qb.reportjob(self.job)
115
116
117
118 time.sleep(5)
119
120 return 1
121
122 else:
123 self.logging.debug('not attempting 3rd-party application auto-pathing')
124
125 if cmd != preTranslationCmd:
126 self.logging.info('Paths in the command have been translated as per this worker\'s worker_path_map')
127 self.logging.info(r'%s %s' % (' '*4, preTranslationCmd))
128 self.logging.info(r'%s -> %s' % (' '*1, cmd))
129
130
131
132
133
134
135
136
137
138
139 preTranslationCmd = cmd
140
141 if 'commandHandler' in self.job['package']:
142 try:
143 cmd = self.job['package']['commandHandler'].prepareCommand(self.job, cmd)
144 if cmd != preTranslationCmd:
145 self.logging.info('The command has been modified via a application-specific commandHandler')
146 self.logging.info(r'%s %s' % (' '*4, preTranslationCmd))
147 self.logging.info(r'%s -> %s' % (' '*1, cmd))
148 except:
149 logging.error(backendUtils.formatExc())
150
151 elif os.name == 'nt' and backendUtils.pyVerAsFloat() < 2.7:
152
153
154
155
156
157
158
159 cmd = '"%s"' % cmd
160
161
162
163
164 try:
165 if os.name == 'posix':
166 if 'shell' in self.job['package']:
167 shell = self.job['package']['shell']
168 else:
169 shell = '/bin/sh'
170
171 childArgs = [shell, '-c', r'%s' % cmd]
172 backendUtils.flushPrint('COMMAND: %s' % ' '.join(childArgs))
173
174 else:
175
176
177 childArgs = r'%s' % cmd
178 backendUtils.flushPrint('COMMAND: %s' % childArgs)
179
180
181
182
183 retCode = self.runCmdWithSubprocess(work, childArgs, qbTokens)
184
185 except:
186 retCode = 1
187 backendUtils.flushPrint(backendUtils.formatExc(), fhList=[sys.stderr])
188
189 return retCode
190
192 """
193 Run via subprocess(), monitor child process and parse worker's job logs while the child is alive
194
195 Log parsing is done while the child is alive, and once more after the job instance returns.
196
197 @param work: either a Qube job instance or an agenda item
198 @type work: C{dict}
199
200 @param childArgs: the arguments for the child process, the first element is the shell
201 @type childArgs: C{list}
202
203 @param qbTokens: a dictionary containing the various QB_FRAME* cmdrange tokens evaluated in the
204 context of the current running work item to aid in calculating the in-chunk progress.
205
206 @type qbTokens: C{dict}
207
208 @return: the return code of the child process' command
209 @rtype: C{int}
210 """
211
212
213
214
215 childCwd = None
216 if self.job['cwd']:
217 if os.path.isdir(self.job['cwd']):
218 self.logging.debug('Running job from directory: %s' % self.job['cwd'])
219 childCwd = self.job['cwd']
220 else:
221 self.logging.debug('The working directory specified in the job is invalid: %s' % self.job['cwd'])
222 self.logging.debug('Will use the following working directory: %s' % os.getcwd())
223
224 if self.job['env']:
225 os.environ.update(self.job['env'])
226
227
228 if 'env_runTimeOS' in self.job['package']:
229 runTimeOSEnv = self.job['package']['env_runTimeOS'].get(platform.system(), {})
230 os.environ.update(runTimeOSEnv)
231
232 childStderr = None
233 if self.redirectingStdErr:
234 childStderr = subprocess.STDOUT
235
236 if os.name == 'nt':
237 shell = True
238 else:
239 shell = False
240
241
242
243 child = subprocess.Popen(childArgs, stderr=childStderr, env=os.environ, cwd=childCwd, shell=shell)
244
245
246
247
248 errorRegexMatched = False
249 fileSizeCheckFailed = False
250
251 stdoutLog, stderrLog = backendUtils.getJobLogPaths(self.redirectingStdErr)
252
253 if len(stdoutLog) and \
254 (self.redirectingStdErr or (
255 not(self.redirectingStdErr) and len(stderrLog))):
256 self.jobLogs = {'stdout': stdoutLog}
257
258 if not self.redirectingStdErr:
259 self.jobLogs['stderr'] = stderrLog
260
261
262
263
264 while child.poll() is None and not (errorRegexMatched or fileSizeCheckFailed):
265
266 backendUtils.flushPrint('INFO: Scanning logs for errors, outputPaths, etc...', fhList=sys.stderr)
267 errorRegexMatched, fileSizeCheckFailed = self.logHandler(work, qbTokens)
268 if errorRegexMatched or fileSizeCheckFailed:
269 work['status'] = 'failed'
270 break
271
272 for i in range(1, self.LOGREAD_TIME_THRESHOLD+1):
273
274 if child.poll() is None:
275 time.sleep(1)
276
277
278
279
280 errorRegexMatched, fileSizeCheckFailed = self.logHandler(work, qbTokens)
281 if errorRegexMatched or fileSizeCheckFailed:
282 work['status'] = 'failed'
283 else:
284 self.logging.warning('Unable to find job logs, no log parsing will occur during job instance execution')
285 child.wait()
286
287 sys.stdout.flush()
288
289
290
291
292
293 if self.job['status'] == 'failed' or work['status'] == 'failed':
294 retCode = 1
295 else:
296 retCode = child.returncode
297
298 return retCode
299
301 """
302 This method must be defined for all derived classes.
303 """
304 raise NotImplementedError
305