save supposed afterEvent config in class Job member (fixes #394 DVDBurn shutdown...
[enigma2.git] / lib / python / Components / Task.py
index 66109ca0c1d337172a64063089782049b8bfb0e4..2e4e757de86525bdc2054d3535d8263d43f32de9 100644 (file)
@@ -3,24 +3,22 @@
 
 from Tools.CList import CList
 
-class Job:
+class Job(object):
        NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
        def __init__(self, name):
                self.tasks = [ ]
+               self.resident_tasks = [ ]
                self.workspace = "/tmp"
                self.current_task = 0
                self.callback = None
                self.name = name
                self.finished = False
                self.end = 100
-               self.progress = 0
+               self.__progress = 0
                self.weightScale = 1
+               self.afterEvent = None
 
-               self.job_state_changed = CList()
-               #triggered when task finishes/fails
-               
-               self.task_state_changed = CList()
-               #triggered when external app generates output
+               self.state_changed = CList()
 
                self.status = self.NOT_STARTED
 
@@ -31,46 +29,90 @@ class Job:
        def createDescription(self):
                return None
 
-       def task_state_changed_CB(self):
+       def getProgress(self):
+               if self.current_task == len(self.tasks):
+                       return self.end
                t = self.tasks[self.current_task]
                jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
-               self.progress = jobprogress*self.weightScale
-               self.task_state_changed()
+               return int(jobprogress*self.weightScale)
+
+       progress = property(getProgress)
+
+       def getStatustext(self):
+               return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
+
+       def task_progress_changed_CB(self):
+               self.state_changed()
 
        def addTask(self, task):
                task.job = self
+               task.task_progress_changed = self.task_progress_changed_CB
                self.tasks.append(task)
 
        def start(self, callback):
                assert self.callback is None
                self.callback = callback
+               self.restart()
+
+       def restart(self):
                self.status = self.IN_PROGRESS
-               self.job_state_changed()
+               self.state_changed()
                self.runNext()
-               sumTaskWeightings = sum([t.weighting for t in self.tasks])
-               self.weightScale = (self.end+1) / float(sumTaskWeightings)
+               sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
+               self.weightScale = self.end / float(sumTaskWeightings)
 
        def runNext(self):
                if self.current_task == len(self.tasks):
-                       self.callback(self, [])
-                       self.status = self.FINISHED
-                       self.job_state_changed()
+                       if len(self.resident_tasks) == 0:
+                               cb = self.callback
+                               self.callback = None
+                               self.status = self.FINISHED
+                               self.state_changed()
+                               cb(self, None, [])
+                       else:
+                               print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
                else:
-                       self.tasks[self.current_task].run(self.taskCallback,self.task_state_changed_CB)
-                       self.job_state_changed()
-
-       def taskCallback(self, res):
+                       self.tasks[self.current_task].run(self.taskCallback)
+                       self.state_changed()
+
+       def taskCallback(self, task, res, stay_resident = False):
+               cb_idx = self.tasks.index(task)
+               if stay_resident:
+                       if cb_idx not in self.resident_tasks:
+                               self.resident_tasks.append(self.current_task)
+                               print "task going resident:", task
+                       else:
+                               print "task keeps staying resident:", task
+                               return
                if len(res):
                        print ">>> Error:", res
                        self.status = self.FAILED
-                       self.job_state_changed()
-                       self.callback(self, res)
-               else:
-                       self.progress = (self.progress + self.tasks[self.current_task].weighting*self.weightScale )
+                       self.state_changed()
+                       self.callback(self, task, res)
+               if cb_idx != self.current_task:
+                       if cb_idx in self.resident_tasks:
+                               print "resident task finished:", task
+                               self.resident_tasks.remove(cb_idx)
+               if res == []:
+                       self.state_changed()
                        self.current_task += 1
                        self.runNext()
 
-class Task:
+       def retry(self):
+               assert self.status == self.FAILED
+               self.restart()
+
+       def abort(self):
+               if self.current_task < len(self.tasks):
+                       self.tasks[self.current_task].abort()
+               for i in self.resident_tasks:
+                       self.tasks[i].abort()
+
+       def cancel(self):
+               # some Jobs might have a better idea of how to cancel a job
+               self.abort()
+
+class Task(object):
        def __init__(self, job, name):
                self.name = name
                self.immediate_preconditions = [ ]
@@ -82,12 +124,15 @@ class Task:
 
                self.end = 100
                self.weighting = 100
-               self.progress = 0
+               self.__progress = 0
                self.cmd = None
                self.cwd = "/tmp"
                self.args = [ ]
-               self.task_state_changed = None
+               self.cmdline = None
+               self.task_progress_changed = None
+               self.output_line = ""
                job.addTask(self)
+               self.container = None
 
        def setCommandline(self, cmd, args):
                self.cmd = cmd
@@ -99,6 +144,9 @@ class Task:
                self.global_preconditions.append(ToolExistsPrecondition())
                self.postconditions.append(ReturncodePostcondition())
 
+       def setCmdline(self, cmdline):
+               self.cmdline = cmdline
+
        def checkPreconditions(self, immediate = False):
                not_met = [ ]
                if immediate:
@@ -110,27 +158,29 @@ class Task:
                                not_met.append(precondition)
                return not_met
 
-       def run(self, callback, task_state_changed):
+       def run(self, callback):
                failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
                if len(failed_preconditions):
-                       callback(failed_preconditions)
+                       callback(self, failed_preconditions)
                        return
                self.prepare()
 
                self.callback = callback
-               self.task_state_changed = task_state_changed
                from enigma import eConsoleAppContainer
                self.container = eConsoleAppContainer()
-               self.container.appClosed.get().append(self.processFinished)
-               self.container.dataAvail.get().append(self.processOutput)
+               self.container.appClosed.append(self.processFinished)
+               self.container.stdoutAvail.append(self.processStdout)
+               self.container.stderrAvail.append(self.processStderr)
 
-               assert self.cmd is not None
-               assert len(self.args) >= 1
-               
                if self.cwd is not None:
                        self.container.setCWD(self.cwd)
 
-               print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args
+               if not self.cmd and self.cmdline:
+                       print "execute:", self.container.execute(self.cmdline), self.cmdline
+               else:
+                       assert self.cmd is not None
+                       assert len(self.args) >= 1
+                       print "execute:", self.container.execute(self.cmd, *self.args), ' '.join(self.args)
                if self.initial_input:
                        self.writeInput(self.initial_input)
 
@@ -139,22 +189,45 @@ class Task:
 
        def cleanup(self, failed):
                pass
+       
+       def processStdout(self, data):
+               self.processOutput(data)
+               
+       def processStderr(self, data):
+               self.processOutput(data)
 
        def processOutput(self, data):
+               self.output_line += data
+               while True:
+                       i = self.output_line.find('\n')
+                       if i == -1:
+                               break
+                       self.processOutputLine(self.output_line[:i+1])
+                       self.output_line = self.output_line[i+1:]
+
+       def processOutputLine(self, line):
                pass
 
        def processFinished(self, returncode):
                self.returncode = returncode
                self.finish()
 
-       def finish(self):
+       def abort(self):
+               if self.container:
+                       self.container.kill()
+               self.finish(aborted = True)
+
+       def finish(self, aborted = False):
                self.afterRun()
                not_met = [ ]
-               for postcondition in self.postconditions:
-                       if not postcondition.check(self):
-                               not_met.append(postcondition)
-
-               self.callback(not_met)
+               if aborted:
+                       not_met.append(AbortedPostcondition())
+               else:
+                       for postcondition in self.postconditions:
+                               if not postcondition.check(self):
+                                       not_met.append(postcondition)
+               self.cleanup(not_met)
+               self.callback(self, not_met)
 
        def afterRun(self):
                pass
@@ -162,11 +235,29 @@ class Task:
        def writeInput(self, input):
                self.container.write(input)
 
+       def getProgress(self):
+               return self.__progress
+
+       def setProgress(self, progress):
+               if progress > self.end:
+                       progress = self.end
+               if progress < 0:
+                       progress = 0
+               self.__progress = progress
+               if self.task_progress_changed:
+                       self.task_progress_changed()
+
+       progress = property(getProgress, setProgress)
+
+# The jobmanager will execute multiple jobs, each after another.
+# later, it will also support suspending jobs (and continuing them after reboot etc)
+# It also supports a notification when some error occured, and possibly a retry.
 class JobManager:
        def __init__(self):
                self.active_jobs = [ ]
                self.failed_jobs = [ ]
                self.job_classes = [ ]
+               self.in_background = False
                self.active_job = None
 
        def AddJob(self, job):
@@ -179,14 +270,42 @@ class JobManager:
                                self.active_job = self.active_jobs.pop(0)
                                self.active_job.start(self.jobDone)
 
-       def jobDone(self, job, problems):
-               print "job", job, "completed with", problems
+       def jobDone(self, job, task, problems):
+               print "job", job, "completed with", problems, "in", task
+               from Tools import Notifications
+               if self.in_background:
+                       from Screens.TaskView import JobView
+                       self.in_background = False
+                       Notifications.AddNotification(JobView, self.active_job)
                if problems:
-                       self.failed_jobs.append(self.active_job)
+                       from Screens.MessageBox import MessageBox
+                       if problems[0].RECOVERABLE:
+                               Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
+                       else:
+                               Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
+                               self.errorCB(False)
+                       return
+                       #self.failed_jobs.append(self.active_job)
 
                self.active_job = None
                self.kick()
 
+       def errorCB(self, answer):
+               if answer:
+                       print "retrying job"
+                       self.active_job.retry()
+               else:
+                       print "not retrying job."
+                       self.failed_jobs.append(self.active_job)
+                       self.active_job = None
+                       self.kick()
+
+       def getPendingJobs(self):
+               list = [ ]
+               if self.active_job:
+                       list.append(self.active_job)
+               list += self.active_jobs
+               return list
 # some examples:
 #class PartitionExistsPostcondition:
 #      def __init__(self, device):
@@ -221,24 +340,60 @@ class JobManager:
 #              if filesystem is not None:
 #                      self.args += ["-t", filesystem]
 #              self.args.append(device + "part%d" % partition)
-#
-#class DiskspacePrecondition:
-#      def __init__(self, diskspace_required):
-#              self.diskspace_required = diskspace_required
-#
-#      def check(self, task):
-#              return getFreeDiskspace(task.workspace) >= self.diskspace_required
-#
-class ToolExistsPrecondition:
+
+class Condition:
+       RECOVERABLE = False
+
+       def getErrorMessage(self, task):
+               return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
+
+class WorkspaceExistsPrecondition(Condition):
+       def check(self, task):
+               return os.access(task.job.workspace, os.W_OK)
+
+class DiskspacePrecondition(Condition):
+       def __init__(self, diskspace_required):
+               self.diskspace_required = diskspace_required
+               self.diskspace_available = 0
+
+       def check(self, task):
+               import os
+               try:
+                       s = os.statvfs(task.job.workspace)
+                       self.diskspace_available = s.f_bsize * s.f_bavail
+                       return self.diskspace_available >= self.diskspace_required
+               except OSError:
+                       return False
+
+       def getErrorMessage(self, task):
+               return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
+
+class ToolExistsPrecondition(Condition):
        def check(self, task):
                import os
+               
                if task.cmd[0]=='/':
-                       realpath = task.cmd
+                       self.realpath = task.cmd
+                       print "[Task.py][ToolExistsPrecondition] WARNING: usage of absolute paths for tasks should be avoided!" 
+                       return os.access(self.realpath, os.X_OK)
                else:
-                       realpath = self.cwd + '/' + self.cmd
-               return os.access(realpath, os.X_OK)
-
-class ReturncodePostcondition:
+                       self.realpath = task.cmd
+                       path = os.environ.get('PATH', '').split(os.pathsep)
+                       path.append(task.cwd + '/')
+                       absolutes = filter(lambda file: os.access(file, os.X_OK), map(lambda directory, file = task.cmd: os.path.join(directory, file), path))
+                       if len(absolutes) > 0:
+                               self.realpath = task.cmd[0]
+                               return True
+               return False 
+
+       def getErrorMessage(self, task):
+               return _("A required tool (%s) was not found.") % (self.realpath)
+
+class AbortedPostcondition(Condition):
+       def getErrorMessage(self, task):
+               return "Cancelled upon user request"
+
+class ReturncodePostcondition(Condition):
        def check(self, task):
                return task.returncode == 0