1 '''
2 Module defining the base class for Qube python jobTypes which communicate with a separate
3 running python interpreter, usually running inside a 3rd-party application.
4
5 Copyright: Pipelinefx L.L.C.
6 '''
7
8
9
10
11
12
13 import sys
14 import os.path
15 import time
16 import pprint
17 import traceback as tb
18
19 import qb.utils
20 import qb.backend.utils as backendUtils
21 import qb.backend.pythonBackEnd
22 import qb.backend.pythonChildHandler
23
24
26 """
27 A python-based Qube backend that has a PyCmdDispatcher.
28
29 It will run another python interpreter of some sort (usually a 2D or 3D app) as a child process,
30 and interact with it via the PyCmdDispatcher.
31 """
32
33 DEFAULT_PYTHON = sys.executable
34
36 """
37 """
38 super(PythonChildBackEnd, self).__init__(job)
39
40 self.cmdDispatcher = None
41 self.childBootstrapper = None
42 self.pyExecutable = self.job['package'].get('pyExecutable', self.DEFAULT_PYTHON)
43
45 """
46 Determine the arguments necessary to invoke the child process.
47
48 This is the main method that differentiates derived classes, and is probably the only method
49 that will need to be overridden.
50
51 @param port: the port on which the PyCmdDispatcher's backchannel instance is listening, usually
52 passed as a parameter to the child_bootstrapper.py script which starts up the pyCmdExecutor
53 inside child process started by the PyCmdDispatcher
54
55 @type port: C{int}
56
57 @return: Return a tuple of a list of args to start the python interpreter, and a list of
58 python commands to initialize the python working environment.
59
60 @rtype: C{tuple} C{([childArgs], [pyInitCmds])}
61
62 @raise NotImplementedError: Raised when this method is not overridden in a derived class.
63 """
64 raise NotImplementedError
65
67 """
68 This is only overridden if a 3rd-party application which will be started cannot use the standard
69 child_bootstrapper.py script. This can happen if the 3rd-party app doesn't support passing
70 arguments to the bootstrapper.
71
72 @param port: the port on which the PyCmdDispatcher's backchannel instance is listening, usually
73 passed as a parameter to the child_bootstrapper.py script which starts up the pyCmdExecutor
74 inside child process started by the PyCmdDispatcher
75
76 @type port: C{int}
77 """
78 return os.path.join(os.path.dirname(backendUtils.getModulePath()), 'child_bootstrapper.py')
79
81 """
82 Initialize a PyCmdDispatcher instance.
83
84 The PyCmdDispatcher is this side of the bi-directional communication with a python
85 interpeter running in a child process. It is the mechanism to dispatch the commands to the
86 interpreter and handle any return values or exceptions.
87
88 @return: Return a pythonChildHandler.PyCmdDispatcher object, which will have a running
89 python process of some sort (python, mayaPy, houdini, etc.) as a child attribute
90
91 @rtype: pythonChildHandler.PyCmdDispatcher
92 """
93 initReturnCode = 0
94 cmdDispatcher = None
95 if os.path.exists(self.pyExecutable):
96
97
98
99
100 mergeStderr = self.job['package'].get('redirectStderrToStdout', False)
101
102
103 cmdDispatcher = qb.backend.pythonChildHandler.PyCmdDispatcher(logHandler=self.logHandler,
104 mergeStderr=mergeStderr)
105
106 self.childBootstrapper = self.generateChildBootstrapper(cmdDispatcher.backChannel.port)
107
108
109 childArgs, initCmds = self.getSubprocessArgs(cmdDispatcher.backChannel.port)
110
111
112 cmdDispatcher.startChild(childArgs)
113
114
115 cmdDispatcher.execute('print', self.job)
116
117
118 initReturnCode += cmdDispatcher.execute('import sys')
119 initReturnCode += cmdDispatcher.execute('import os')
120 initReturnCode += cmdDispatcher.execute('import time')
121
122 for cmd in initCmds:
123 initReturnCode += cmdDispatcher.execute(cmd, self.job)
124 else:
125 initReturnCode = 1
126 msg = 'Startup application not found: %s' % self.pyExecutable
127 print 'ERROR: %s' % msg
128 self.logging.error(msg)
129
130 return (cmdDispatcher, initReturnCode)
131
133 """
134 Perform any steps necessary to initialize the working enviroment prior to beginning any
135 agendaItem-specific steps.
136 """
137 super(PythonChildBackEnd, self).jobSetup()
138
139 backendUtils.bannerPrint('Starting Python initialization')
140 (self.cmdDispatcher, initReturnCode) = self.initPyCmdDispatcher()
141
142 if initReturnCode > 0:
143 backendUtils.bannerPrint('ERROR: Python initialization failed', fhList=[sys.stdout, sys.stderr])
144 self.logging.error('Unable to start application session.')
145 self.job['status'] = 'failed'
146 qb.reportjob(self.job)
147
148 try:
149 backendUtils.bannerPrint('Shutting down python session', fhList=[sys.stdout, sys.stderr])
150 self.cmdDispatcher.close()
151 except TypeError:
152 pass
153
154 sys.exit(initReturnCode)
155 else:
156 backendUtils.bannerPrint('Finished Python initialization')
157
158
159
160 cmdRetCode = 0
161 if 'jobSetupCmds' in self.job['package'] and self.job['package']['jobSetupCmds']:
162
163 backendUtils.bannerPrint('Starting job setup commands')
164 for cmd in self.job['package']['jobSetupCmds']:
165 cmd = qb.utils.translateQbConvertPathStrings(cmd)
166 cmdRetCode = self.cmdDispatcher.execute(cmd, self.job)
167
168 if cmdRetCode != 0:
169 sys.stdout.write('ERROR: unable to successfully execute command "%s"\n' % cmd)
170 sys.stdout.write('WARNING: reporting job instance as failed\n')
171 qb.reportjob('failed')
172 sys.exit(cmdRetCode)
173 backendUtils.bannerPrint('Finished job setup commands')
174
176 """
177 Request an agendaItem (work) from the supervisor and do any steps necessary to perform the
178 work.
179 """
180 while True:
181 work_status = 1
182
183 backendUtils.bannerPrint('Requesting work', fhList=[sys.stdout, sys.stderr])
184 work = qb.requestwork()
185
186 if self.dev:
187 print "DEBUG BEGIN:"
188 print "WORK:"
189 pprint.pprint(work)
190 print "DEBUG END:"
191
192
193 if work['status'] == "failed":
194
195 print 'preflights for work [%s:%s] failed' % (self.job['id'], work['name'])
196 work['status'] = 'failed'
197 qb.reportwork(work)
198 continue
199 if work['status'] == 'complete':
200 work_status = 0
201 break
202 elif work['status'] == 'pending':
203
204 print 'job %s has been preempted' % self.job['id']
205 work_status = 0
206 qb.reportjob('pending')
207 break
208 elif work['status'] == 'blocked':
209
210 print 'job %s has been blocked' % self.job['id']
211 work_status = 0
212 qb.reportjob('blocked')
213 break
214 elif work['status'] == 'waiting':
215
216 print 'job %s will be back in %s seconds' % (self.job['id'], self.QB_WAITING_TIMEOUT)
217 sys.stdout.flush()
218 for i in range(self.QB_WAITING_TIMEOUT*100):
219 time.sleep(0.01)
220 continue
221
222
223
224
225
226 backendUtils.bannerPrint('Executing work package commands for work: %s' % work['name'], fhList=[sys.stdout, sys.stderr])
227 wrkCmdRetCode = 0
228 try:
229 for cmd in work['package']['commands']:
230
231
232 wrkCmdRetCode = self.cmdDispatcher.execute(cmd, work)
233
234 if wrkCmdRetCode != 0:
235 work_status = wrkCmdRetCode
236 break
237
238 elif self.cmdDispatcher.child.returncode:
239
240 sys.stderr.write('subprocess seems to have died...\n')
241 self.status = self.cmdDispatcher.child.returncode
242 work_status = self.status
243 break
244
245 else:
246 work_status = 0
247
248 except Exception:
249 sys.stderr.write(tb.format_exc())
250 backendUtils.bannerPrint('Finished work package commands for work: %s' % work['name'])
251
252 if work.get('resultpackage') is None:
253 work['resultpackage'] = {}
254
255
256
257
258 if work_status != 0:
259
260 work['status'] = 'failed'
261 elif self.outputPaths_required and len(work.get('resultpackage', {}).get('outputPaths', '')) == 0:
262 work['status'] = 'failed'
263 backendUtils.flushPrint('WARNING: no "regex_outputPaths" match was found, setting agenda item status to "failed".', fhList=[sys.stdout, sys.stderr])
264 else:
265
266 work['status'] = 'complete'
267
268 backendUtils.bannerPrint('Reporting work as %(status)s: %(name)s ' % work, fhList=[sys.stderr])
269 qb.reportwork(work)
270
271
272
273 if self.status != 0:
274 print 'ERROR: The child python process (mayapy, houdini, etc) has exited prematurely.'
275 print 'INFO: Will attempt to restart this job instance on another host, if its job instance retry limit has not been exceeded.'
276 break
277
279 """
280 Perform any steps necessary to clean up the working enviroment prior to shutting down the
281 job instance.
282 """
283 if 'jobTeardownCmds' in self.job['package'] and self.job['package']['jobTeardownCmds']:
284 self.cmdDispatcher.execute(self.job['package']['jobTeardownCmds'])
285
286 retCode = self.cmdDispatcher.close(self.status)
287
288 print 'Python session exited with code %s' % retCode
289
290 if self.status == 0:
291 qb.reportjob('complete')
292 else:
293 qb.reportjob('failed')
294
295 sys.stderr.flush()
296