1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.resident_tasks = [ ]
11 self.workspace = "/tmp"
20 self.state_changed = CList()
22 self.status = self.NOT_STARTED
24 # description is a dict
25 def fromDescription(self, description):
28 def createDescription(self):
31 def getProgress(self):
32 if self.current_task == len(self.tasks):
34 t = self.tasks[self.current_task]
35 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
36 return int(jobprogress*self.weightScale)
38 progress = property(getProgress)
40 def getStatustext(self):
41 return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
43 def task_progress_changed_CB(self):
46 def addTask(self, task):
48 self.tasks.append(task)
50 def start(self, callback):
51 assert self.callback is None
52 self.callback = callback
56 self.status = self.IN_PROGRESS
59 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
60 self.weightScale = self.end / float(sumTaskWeightings)
63 if self.current_task == len(self.tasks):
64 if len(self.resident_tasks) == 0:
67 self.status = self.FINISHED
71 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
73 self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
76 def taskCallback(self, task, res, stay_resident = False):
77 cb_idx = self.tasks.index(task)
79 if cb_idx not in self.resident_tasks:
80 self.resident_tasks.append(self.current_task)
81 print "task going resident:", task
83 print "task keeps staying resident:", task
86 print ">>> Error:", res
87 self.status = self.FAILED
89 self.callback(self, task, res)
90 if cb_idx != self.current_task:
91 if cb_idx in self.resident_tasks:
92 print "resident task finished:", task
93 self.resident_tasks.remove(cb_idx)
96 self.current_task += 1
100 assert self.status == self.FAILED
104 if self.current_task < len(self.tasks):
105 self.tasks[self.current_task].abort()
106 for i in self.resident_tasks:
107 self.tasks[i].abort()
110 # some Jobs might have a better idea of how to cancel a job
114 def __init__(self, job, name):
116 self.immediate_preconditions = [ ]
117 self.global_preconditions = [ ]
118 self.postconditions = [ ]
119 self.returncode = None
120 self.initial_input = None
129 self.task_progress_changed = None
130 self.output_line = ""
133 def setCommandline(self, cmd, args):
137 def setTool(self, tool):
140 self.global_preconditions.append(ToolExistsPrecondition())
141 self.postconditions.append(ReturncodePostcondition())
143 def checkPreconditions(self, immediate = False):
146 preconditions = self.immediate_preconditions
148 preconditions = self.global_preconditions
149 for precondition in preconditions:
150 if not precondition.check(self):
151 not_met.append(precondition)
154 def run(self, callback, task_progress_changed):
155 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
156 if len(failed_preconditions):
157 callback(self, failed_preconditions)
161 self.callback = callback
162 self.task_progress_changed = task_progress_changed
163 from enigma import eConsoleAppContainer
164 self.container = eConsoleAppContainer()
165 self.container.appClosed.get().append(self.processFinished)
166 self.container.stdoutAvail.get().append(self.processStdout)
167 self.container.stderrAvail.get().append(self.processStderr)
169 assert self.cmd is not None
170 assert len(self.args) >= 1
172 if self.cwd is not None:
173 self.container.setCWD(self.cwd)
175 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, " ".join(self.args)
176 if self.initial_input:
177 self.writeInput(self.initial_input)
182 def cleanup(self, failed):
185 def processStdout(self, data):
186 self.processOutput(data)
188 def processStderr(self, data):
189 self.processOutput(data)
191 def processOutput(self, data):
192 self.output_line += data
194 i = self.output_line.find('\n')
197 self.processOutputLine(self.output_line[:i+1])
198 self.output_line = self.output_line[i+1:]
200 def processOutputLine(self, line):
203 def processFinished(self, returncode):
204 self.returncode = returncode
208 self.container.kill()
209 self.finish(aborted = True)
211 def finish(self, aborted = False):
215 not_met.append(AbortedPostcondition())
217 for postcondition in self.postconditions:
218 if not postcondition.check(self):
219 not_met.append(postcondition)
220 self.cleanup(not_met)
221 self.callback(self, not_met)
226 def writeInput(self, input):
227 self.container.write(input)
229 def getProgress(self):
230 return self.__progress
232 def setProgress(self, progress):
233 if progress > self.end:
237 self.__progress = progress
238 self.task_progress_changed()
240 progress = property(getProgress, setProgress)
242 # The jobmanager will execute multiple jobs, each after another.
243 # later, it will also support suspending jobs (and continuing them after reboot etc)
244 # It also supports a notification when some error occured, and possibly a retry.
247 self.active_jobs = [ ]
248 self.failed_jobs = [ ]
249 self.job_classes = [ ]
250 self.in_background = False
251 self.active_job = None
253 def AddJob(self, job):
254 self.active_jobs.append(job)
258 if self.active_job is None:
259 if len(self.active_jobs):
260 self.active_job = self.active_jobs.pop(0)
261 self.active_job.start(self.jobDone)
263 def jobDone(self, job, task, problems):
264 print "job", job, "completed with", problems, "in", task
265 from Tools import Notifications
266 if self.in_background:
267 from Screens.TaskView import JobView
268 Notifications.AddNotification(JobView, self.active_job)
270 from Screens.MessageBox import MessageBox
271 if problems[0].RECOVERABLE:
272 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
274 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
277 #self.failed_jobs.append(self.active_job)
279 self.active_job = None
282 def errorCB(self, answer):
285 self.active_job.retry()
287 print "not retrying job."
288 self.failed_jobs.append(self.active_job)
289 self.active_job = None
292 def getPendingJobs(self):
295 list.append(self.active_job)
296 list += self.active_jobs
299 #class PartitionExistsPostcondition:
300 # def __init__(self, device):
301 # self.device = device
303 # def check(self, task):
305 # return os.access(self.device + "part1", os.F_OK)
307 #class CreatePartitionTask(Task):
308 # def __init__(self, device):
309 # Task.__init__(self, _("Create Partition"))
310 # self.device = device
311 # self.setTool("/sbin/sfdisk")
312 # self.args += ["-f", self.device + "disc"]
313 # self.initial_input = "0,\n;\n;\n;\ny\n"
314 # self.postconditions.append(PartitionExistsPostcondition(self.device))
316 #class CreateFilesystemTask(Task):
317 # def __init__(self, device, partition = 1, largefile = True):
318 # Task.__init__(self, _("Create Filesystem"))
319 # self.setTool("/sbin/mkfs.ext")
321 # self.args += ["-T", "largefile"]
322 # self.args.append("-m0")
323 # self.args.append(device + "part%d" % partition)
325 #class FilesystemMountTask(Task):
326 # def __init__(self, device, partition = 1, filesystem = "ext3"):
327 # Task.__init__(self, _("Mounting Filesystem"))
328 # self.setTool("/bin/mount")
329 # if filesystem is not None:
330 # self.args += ["-t", filesystem]
331 # self.args.append(device + "part%d" % partition)
336 def getErrorMessage(self, task):
337 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
339 class WorkspaceExistsPrecondition(Condition):
340 def check(self, task):
341 return os.access(task.job.workspace, os.W_OK)
343 class DiskspacePrecondition(Condition):
344 def __init__(self, diskspace_required):
345 self.diskspace_required = diskspace_required
346 self.diskspace_available = 0
348 def check(self, task):
351 s = os.statvfs(task.job.workspace)
352 self.diskspace_available = s.f_bsize * s.f_bavail
353 return self.diskspace_available >= self.diskspace_required
357 def getErrorMessage(self, task):
358 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
360 class ToolExistsPrecondition(Condition):
361 def check(self, task):
366 realpath = task.cwd + '/' + task.cmd
367 self.realpath = realpath
368 return os.access(realpath, os.X_OK)
370 def getErrorMessage(self, task):
371 return _("A required tool (%s) was not found.") % (self.realpath)
373 class AbortedPostcondition(Condition):
376 class ReturncodePostcondition(Condition):
377 def check(self, task):
378 return task.returncode == 0
380 #class HDDInitJob(Job):
381 # def __init__(self, device):
382 # Job.__init__(self, _("Initialize Harddisk"))
383 # self.device = device
384 # self.fromDescription(self.createDescription())
386 # def fromDescription(self, description):
387 # self.device = description["device"]
388 # self.addTask(CreatePartitionTask(self.device))
389 # self.addTask(CreateFilesystemTask(self.device))
390 # self.addTask(FilesystemMountTask(self.device))
392 # def createDescription(self):
393 # return {"device": self.device}
395 job_manager = JobManager()