1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.resident_tasks = [ ]
11 self.workspace = "/tmp"
19 self.afterEvent = None
21 self.state_changed = CList()
23 self.status = self.NOT_STARTED
25 # description is a dict
26 def fromDescription(self, description):
29 def createDescription(self):
32 def getProgress(self):
33 if self.current_task == len(self.tasks):
35 t = self.tasks[self.current_task]
36 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
37 return int(jobprogress*self.weightScale)
39 progress = property(getProgress)
41 def getStatustext(self):
42 return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
44 def task_progress_changed_CB(self):
47 def addTask(self, task):
49 task.task_progress_changed = self.task_progress_changed_CB
50 self.tasks.append(task)
52 def start(self, callback):
53 assert self.callback is None
54 self.callback = callback
58 self.status = self.IN_PROGRESS
61 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
62 self.weightScale = self.end / float(sumTaskWeightings)
65 if self.current_task == len(self.tasks):
66 if len(self.resident_tasks) == 0:
69 self.status = self.FINISHED
73 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
75 self.tasks[self.current_task].run(self.taskCallback)
78 def taskCallback(self, task, res, stay_resident = False):
79 cb_idx = self.tasks.index(task)
81 if cb_idx not in self.resident_tasks:
82 self.resident_tasks.append(self.current_task)
83 print "task going resident:", task
85 print "task keeps staying resident:", task
88 print ">>> Error:", res
89 self.status = self.FAILED
91 self.callback(self, task, res)
92 if cb_idx != self.current_task:
93 if cb_idx in self.resident_tasks:
94 print "resident task finished:", task
95 self.resident_tasks.remove(cb_idx)
98 self.current_task += 1
102 assert self.status == self.FAILED
106 if self.current_task < len(self.tasks):
107 self.tasks[self.current_task].abort()
108 for i in self.resident_tasks:
109 self.tasks[i].abort()
112 # some Jobs might have a better idea of how to cancel a job
116 def __init__(self, job, name):
118 self.immediate_preconditions = [ ]
119 self.global_preconditions = [ ]
120 self.postconditions = [ ]
121 self.returncode = None
122 self.initial_input = None
132 self.task_progress_changed = None
133 self.output_line = ""
135 self.container = None
137 def setCommandline(self, cmd, args):
141 def setTool(self, tool):
144 self.global_preconditions.append(ToolExistsPrecondition())
145 self.postconditions.append(ReturncodePostcondition())
147 def setCmdline(self, cmdline):
148 self.cmdline = cmdline
150 def checkPreconditions(self, immediate = False):
153 preconditions = self.immediate_preconditions
155 preconditions = self.global_preconditions
156 for precondition in preconditions:
157 if not precondition.check(self):
158 not_met.append(precondition)
161 def run(self, callback):
162 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
163 if len(failed_preconditions):
164 callback(self, failed_preconditions)
168 self.callback = callback
169 from enigma import eConsoleAppContainer
170 self.container = eConsoleAppContainer()
171 self.container.appClosed.append(self.processFinished)
172 self.container.stdoutAvail.append(self.processStdout)
173 self.container.stderrAvail.append(self.processStderr)
175 if self.cwd is not None:
176 self.container.setCWD(self.cwd)
178 if not self.cmd and self.cmdline:
179 print "execute:", self.container.execute(self.cmdline), self.cmdline
181 assert self.cmd is not None
182 assert len(self.args) >= 1
183 print "execute:", self.container.execute(self.cmd, *self.args), ' '.join(self.args)
184 if self.initial_input:
185 self.writeInput(self.initial_input)
190 def cleanup(self, failed):
193 def processStdout(self, data):
194 self.processOutput(data)
196 def processStderr(self, data):
197 self.processOutput(data)
199 def processOutput(self, data):
200 self.output_line += data
202 i = self.output_line.find('\n')
205 self.processOutputLine(self.output_line[:i+1])
206 self.output_line = self.output_line[i+1:]
208 def processOutputLine(self, line):
211 def processFinished(self, returncode):
212 self.returncode = returncode
217 self.container.kill()
218 self.finish(aborted = True)
220 def finish(self, aborted = False):
224 not_met.append(AbortedPostcondition())
226 for postcondition in self.postconditions:
227 if not postcondition.check(self):
228 not_met.append(postcondition)
229 self.cleanup(not_met)
230 self.callback(self, not_met)
235 def writeInput(self, input):
236 self.container.write(input)
238 def getProgress(self):
239 return self.__progress
241 def setProgress(self, progress):
242 if progress > self.end:
246 self.__progress = progress
247 if self.task_progress_changed:
248 self.task_progress_changed()
250 progress = property(getProgress, setProgress)
252 # The jobmanager will execute multiple jobs, each after another.
253 # later, it will also support suspending jobs (and continuing them after reboot etc)
254 # It also supports a notification when some error occured, and possibly a retry.
257 self.active_jobs = [ ]
258 self.failed_jobs = [ ]
259 self.job_classes = [ ]
260 self.in_background = False
261 self.active_job = None
263 def AddJob(self, job):
264 self.active_jobs.append(job)
268 if self.active_job is None:
269 if len(self.active_jobs):
270 self.active_job = self.active_jobs.pop(0)
271 self.active_job.start(self.jobDone)
273 def jobDone(self, job, task, problems):
274 print "job", job, "completed with", problems, "in", task
275 from Tools import Notifications
276 if self.in_background:
277 from Screens.TaskView import JobView
278 self.in_background = False
279 Notifications.AddNotification(JobView, self.active_job)
281 from Screens.MessageBox import MessageBox
282 if problems[0].RECOVERABLE:
283 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
285 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
288 #self.failed_jobs.append(self.active_job)
290 self.active_job = None
293 def errorCB(self, answer):
296 self.active_job.retry()
298 print "not retrying job."
299 self.failed_jobs.append(self.active_job)
300 self.active_job = None
303 def getPendingJobs(self):
306 list.append(self.active_job)
307 list += self.active_jobs
310 #class PartitionExistsPostcondition:
311 # def __init__(self, device):
312 # self.device = device
314 # def check(self, task):
316 # return os.access(self.device + "part1", os.F_OK)
318 #class CreatePartitionTask(Task):
319 # def __init__(self, device):
320 # Task.__init__(self, _("Create Partition"))
321 # self.device = device
322 # self.setTool("/sbin/sfdisk")
323 # self.args += ["-f", self.device + "disc"]
324 # self.initial_input = "0,\n;\n;\n;\ny\n"
325 # self.postconditions.append(PartitionExistsPostcondition(self.device))
327 #class CreateFilesystemTask(Task):
328 # def __init__(self, device, partition = 1, largefile = True):
329 # Task.__init__(self, _("Create Filesystem"))
330 # self.setTool("/sbin/mkfs.ext")
332 # self.args += ["-T", "largefile"]
333 # self.args.append("-m0")
334 # self.args.append(device + "part%d" % partition)
336 #class FilesystemMountTask(Task):
337 # def __init__(self, device, partition = 1, filesystem = "ext3"):
338 # Task.__init__(self, _("Mounting Filesystem"))
339 # self.setTool("/bin/mount")
340 # if filesystem is not None:
341 # self.args += ["-t", filesystem]
342 # self.args.append(device + "part%d" % partition)
347 def getErrorMessage(self, task):
348 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
350 class WorkspaceExistsPrecondition(Condition):
351 def check(self, task):
352 return os.access(task.job.workspace, os.W_OK)
354 class DiskspacePrecondition(Condition):
355 def __init__(self, diskspace_required):
356 self.diskspace_required = diskspace_required
357 self.diskspace_available = 0
359 def check(self, task):
362 s = os.statvfs(task.job.workspace)
363 self.diskspace_available = s.f_bsize * s.f_bavail
364 return self.diskspace_available >= self.diskspace_required
368 def getErrorMessage(self, task):
369 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
371 class ToolExistsPrecondition(Condition):
372 def check(self, task):
376 self.realpath = task.cmd
377 print "[Task.py][ToolExistsPrecondition] WARNING: usage of absolute paths for tasks should be avoided!"
378 return os.access(self.realpath, os.X_OK)
380 self.realpath = task.cmd
381 path = os.environ.get('PATH', '').split(os.pathsep)
382 path.append(task.cwd + '/')
383 absolutes = filter(lambda file: os.access(file, os.X_OK), map(lambda directory, file = task.cmd: os.path.join(directory, file), path))
384 if len(absolutes) > 0:
385 self.realpath = task.cmd[0]
389 def getErrorMessage(self, task):
390 return _("A required tool (%s) was not found.") % (self.realpath)
392 class AbortedPostcondition(Condition):
393 def getErrorMessage(self, task):
394 return "Cancelled upon user request"
396 class ReturncodePostcondition(Condition):
397 def check(self, task):
398 return task.returncode == 0
400 #class HDDInitJob(Job):
401 # def __init__(self, device):
402 # Job.__init__(self, _("Initialize Harddisk"))
403 # self.device = device
404 # self.fromDescription(self.createDescription())
406 # def fromDescription(self, description):
407 # self.device = description["device"]
408 # self.addTask(CreatePartitionTask(self.device))
409 # self.addTask(CreateFilesystemTask(self.device))
410 # self.addTask(FilesystemMountTask(self.device))
412 # def createDescription(self):
413 # return {"device": self.device}
415 job_manager = JobManager()