X-Git-Url: https://git.cweiske.de/enigma2.git/blobdiff_plain/fa9ec93b201ff3312817e7562b100bdf1d23270a..08068420717613d1f0437ca51fb5a00039170ecc:/lib/python/Components/Task.py?ds=sidebyside diff --git a/lib/python/Components/Task.py b/lib/python/Components/Task.py index 66109ca0..2e4e757d 100644 --- a/lib/python/Components/Task.py +++ b/lib/python/Components/Task.py @@ -3,24 +3,22 @@ from Tools.CList import CList -class Job: +class Job(object): NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4) def __init__(self, name): self.tasks = [ ] + self.resident_tasks = [ ] self.workspace = "/tmp" self.current_task = 0 self.callback = None self.name = name self.finished = False self.end = 100 - self.progress = 0 + self.__progress = 0 self.weightScale = 1 + self.afterEvent = None - self.job_state_changed = CList() - #triggered when task finishes/fails - - self.task_state_changed = CList() - #triggered when external app generates output + self.state_changed = CList() self.status = self.NOT_STARTED @@ -31,46 +29,90 @@ class Job: def createDescription(self): return None - def task_state_changed_CB(self): + def getProgress(self): + if self.current_task == len(self.tasks): + return self.end t = self.tasks[self.current_task] jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]]) - self.progress = jobprogress*self.weightScale - self.task_state_changed() + return int(jobprogress*self.weightScale) + + progress = property(getProgress) + + def getStatustext(self): + return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status] + + def task_progress_changed_CB(self): + self.state_changed() def addTask(self, task): task.job = self + task.task_progress_changed = self.task_progress_changed_CB self.tasks.append(task) def start(self, callback): assert self.callback is None self.callback = callback + self.restart() + + def restart(self): self.status = self.IN_PROGRESS - self.job_state_changed() + self.state_changed() self.runNext() - sumTaskWeightings = sum([t.weighting for t in self.tasks]) - self.weightScale = (self.end+1) / float(sumTaskWeightings) + sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1 + self.weightScale = self.end / float(sumTaskWeightings) def runNext(self): if self.current_task == len(self.tasks): - self.callback(self, []) - self.status = self.FINISHED - self.job_state_changed() + if len(self.resident_tasks) == 0: + cb = self.callback + self.callback = None + self.status = self.FINISHED + self.state_changed() + cb(self, None, []) + else: + print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks)) else: - self.tasks[self.current_task].run(self.taskCallback,self.task_state_changed_CB) - self.job_state_changed() - - def taskCallback(self, res): + self.tasks[self.current_task].run(self.taskCallback) + self.state_changed() + + def taskCallback(self, task, res, stay_resident = False): + cb_idx = self.tasks.index(task) + if stay_resident: + if cb_idx not in self.resident_tasks: + self.resident_tasks.append(self.current_task) + print "task going resident:", task + else: + print "task keeps staying resident:", task + return if len(res): print ">>> Error:", res self.status = self.FAILED - self.job_state_changed() - self.callback(self, res) - else: - self.progress = (self.progress + self.tasks[self.current_task].weighting*self.weightScale ) + self.state_changed() + self.callback(self, task, res) + if cb_idx != self.current_task: + if cb_idx in self.resident_tasks: + print "resident task finished:", task + self.resident_tasks.remove(cb_idx) + if res == []: + self.state_changed() self.current_task += 1 self.runNext() -class Task: + def retry(self): + assert self.status == self.FAILED + self.restart() + + def abort(self): + if self.current_task < len(self.tasks): + self.tasks[self.current_task].abort() + for i in self.resident_tasks: + self.tasks[i].abort() + + def cancel(self): + # some Jobs might have a better idea of how to cancel a job + self.abort() + +class Task(object): def __init__(self, job, name): self.name = name self.immediate_preconditions = [ ] @@ -82,12 +124,15 @@ class Task: self.end = 100 self.weighting = 100 - self.progress = 0 + self.__progress = 0 self.cmd = None self.cwd = "/tmp" self.args = [ ] - self.task_state_changed = None + self.cmdline = None + self.task_progress_changed = None + self.output_line = "" job.addTask(self) + self.container = None def setCommandline(self, cmd, args): self.cmd = cmd @@ -99,6 +144,9 @@ class Task: self.global_preconditions.append(ToolExistsPrecondition()) self.postconditions.append(ReturncodePostcondition()) + def setCmdline(self, cmdline): + self.cmdline = cmdline + def checkPreconditions(self, immediate = False): not_met = [ ] if immediate: @@ -110,27 +158,29 @@ class Task: not_met.append(precondition) return not_met - def run(self, callback, task_state_changed): + def run(self, callback): failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False) if len(failed_preconditions): - callback(failed_preconditions) + callback(self, failed_preconditions) return self.prepare() self.callback = callback - self.task_state_changed = task_state_changed from enigma import eConsoleAppContainer self.container = eConsoleAppContainer() - self.container.appClosed.get().append(self.processFinished) - self.container.dataAvail.get().append(self.processOutput) + self.container.appClosed.append(self.processFinished) + self.container.stdoutAvail.append(self.processStdout) + self.container.stderrAvail.append(self.processStderr) - assert self.cmd is not None - assert len(self.args) >= 1 - if self.cwd is not None: self.container.setCWD(self.cwd) - print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args + if not self.cmd and self.cmdline: + print "execute:", self.container.execute(self.cmdline), self.cmdline + else: + assert self.cmd is not None + assert len(self.args) >= 1 + print "execute:", self.container.execute(self.cmd, *self.args), ' '.join(self.args) if self.initial_input: self.writeInput(self.initial_input) @@ -139,22 +189,45 @@ class Task: def cleanup(self, failed): pass + + def processStdout(self, data): + self.processOutput(data) + + def processStderr(self, data): + self.processOutput(data) def processOutput(self, data): + self.output_line += data + while True: + i = self.output_line.find('\n') + if i == -1: + break + self.processOutputLine(self.output_line[:i+1]) + self.output_line = self.output_line[i+1:] + + def processOutputLine(self, line): pass def processFinished(self, returncode): self.returncode = returncode self.finish() - def finish(self): + def abort(self): + if self.container: + self.container.kill() + self.finish(aborted = True) + + def finish(self, aborted = False): self.afterRun() not_met = [ ] - for postcondition in self.postconditions: - if not postcondition.check(self): - not_met.append(postcondition) - - self.callback(not_met) + if aborted: + not_met.append(AbortedPostcondition()) + else: + for postcondition in self.postconditions: + if not postcondition.check(self): + not_met.append(postcondition) + self.cleanup(not_met) + self.callback(self, not_met) def afterRun(self): pass @@ -162,11 +235,29 @@ class Task: def writeInput(self, input): self.container.write(input) + def getProgress(self): + return self.__progress + + def setProgress(self, progress): + if progress > self.end: + progress = self.end + if progress < 0: + progress = 0 + self.__progress = progress + if self.task_progress_changed: + self.task_progress_changed() + + progress = property(getProgress, setProgress) + +# The jobmanager will execute multiple jobs, each after another. +# later, it will also support suspending jobs (and continuing them after reboot etc) +# It also supports a notification when some error occured, and possibly a retry. class JobManager: def __init__(self): self.active_jobs = [ ] self.failed_jobs = [ ] self.job_classes = [ ] + self.in_background = False self.active_job = None def AddJob(self, job): @@ -179,14 +270,42 @@ class JobManager: self.active_job = self.active_jobs.pop(0) self.active_job.start(self.jobDone) - def jobDone(self, job, problems): - print "job", job, "completed with", problems + def jobDone(self, job, task, problems): + print "job", job, "completed with", problems, "in", task + from Tools import Notifications + if self.in_background: + from Screens.TaskView import JobView + self.in_background = False + Notifications.AddNotification(JobView, self.active_job) if problems: - self.failed_jobs.append(self.active_job) + from Screens.MessageBox import MessageBox + if problems[0].RECOVERABLE: + Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task))) + else: + Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR ) + self.errorCB(False) + return + #self.failed_jobs.append(self.active_job) self.active_job = None self.kick() + def errorCB(self, answer): + if answer: + print "retrying job" + self.active_job.retry() + else: + print "not retrying job." + self.failed_jobs.append(self.active_job) + self.active_job = None + self.kick() + + def getPendingJobs(self): + list = [ ] + if self.active_job: + list.append(self.active_job) + list += self.active_jobs + return list # some examples: #class PartitionExistsPostcondition: # def __init__(self, device): @@ -221,24 +340,60 @@ class JobManager: # if filesystem is not None: # self.args += ["-t", filesystem] # self.args.append(device + "part%d" % partition) -# -#class DiskspacePrecondition: -# def __init__(self, diskspace_required): -# self.diskspace_required = diskspace_required -# -# def check(self, task): -# return getFreeDiskspace(task.workspace) >= self.diskspace_required -# -class ToolExistsPrecondition: + +class Condition: + RECOVERABLE = False + + def getErrorMessage(self, task): + return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__) + +class WorkspaceExistsPrecondition(Condition): + def check(self, task): + return os.access(task.job.workspace, os.W_OK) + +class DiskspacePrecondition(Condition): + def __init__(self, diskspace_required): + self.diskspace_required = diskspace_required + self.diskspace_available = 0 + + def check(self, task): + import os + try: + s = os.statvfs(task.job.workspace) + self.diskspace_available = s.f_bsize * s.f_bavail + return self.diskspace_available >= self.diskspace_required + except OSError: + return False + + def getErrorMessage(self, task): + return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024) + +class ToolExistsPrecondition(Condition): def check(self, task): import os + if task.cmd[0]=='/': - realpath = task.cmd + self.realpath = task.cmd + print "[Task.py][ToolExistsPrecondition] WARNING: usage of absolute paths for tasks should be avoided!" + return os.access(self.realpath, os.X_OK) else: - realpath = self.cwd + '/' + self.cmd - return os.access(realpath, os.X_OK) - -class ReturncodePostcondition: + self.realpath = task.cmd + path = os.environ.get('PATH', '').split(os.pathsep) + path.append(task.cwd + '/') + absolutes = filter(lambda file: os.access(file, os.X_OK), map(lambda directory, file = task.cmd: os.path.join(directory, file), path)) + if len(absolutes) > 0: + self.realpath = task.cmd[0] + return True + return False + + def getErrorMessage(self, task): + return _("A required tool (%s) was not found.") % (self.realpath) + +class AbortedPostcondition(Condition): + def getErrorMessage(self, task): + return "Cancelled upon user request" + +class ReturncodePostcondition(Condition): def check(self, task): return task.returncode == 0