add support for retrying when a task failed.
[enigma2.git] / lib / python / Components / Task.py
1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
3
4 from Tools.CList import CList
5
6 class Job(object):
7         NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8         def __init__(self, name):
9                 self.tasks = [ ]
10                 self.workspace = "/tmp"
11                 self.current_task = 0
12                 self.callback = None
13                 self.name = name
14                 self.finished = False
15                 self.end = 100
16                 self.__progress = 0
17                 self.weightScale = 1
18
19                 self.state_changed = CList()
20
21                 self.status = self.NOT_STARTED
22
23         # description is a dict
24         def fromDescription(self, description):
25                 pass
26
27         def createDescription(self):
28                 return None
29
30         def getProgress(self):
31                 if self.current_task == len(self.tasks):
32                         return self.end
33                 t = self.tasks[self.current_task]
34                 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
35                 return int(jobprogress*self.weightScale)
36
37         progress = property(getProgress)
38
39         def task_progress_changed_CB(self):
40                 self.state_changed()
41
42         def addTask(self, task):
43                 task.job = self
44                 self.tasks.append(task)
45
46         def start(self, callback):
47                 assert self.callback is None
48                 self.callback = callback
49                 self.restart()
50
51         def restart(self):
52                 self.status = self.IN_PROGRESS
53                 self.state_changed()
54                 self.runNext()
55                 sumTaskWeightings = sum([t.weighting for t in self.tasks])
56                 self.weightScale = (self.end+1) / float(sumTaskWeightings)
57
58         def runNext(self):
59                 if self.current_task == len(self.tasks):
60                         cb = self.callback
61                         self.callback = None
62                         self.status = self.FINISHED
63                         self.state_changed()
64                         cb(self, None, [])
65                 else:
66                         self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
67                         self.state_changed()
68
69         def taskCallback(self, task, res):
70                 if len(res):
71                         print ">>> Error:", res
72                         self.status = self.FAILED
73                         self.state_changed()
74                         self.callback(self, task, res)
75                 else:
76                         self.state_changed();
77                         self.current_task += 1
78                         self.runNext()
79
80         def retry(self):
81                 assert self.status == self.FAILED
82                 self.restart()
83
84         def abort(self):
85                 if self.current_task < len(self.tasks):
86                         self.tasks[self.current_task].abort()
87
88         def cancel(self):
89                 # some Jobs might have a better idea of how to cancel a job
90                 self.abort()
91
92 class Task(object)      :
93         def __init__(self, job, name):
94                 self.name = name
95                 self.immediate_preconditions = [ ]
96                 self.global_preconditions = [ ]
97                 self.postconditions = [ ]
98                 self.returncode = None
99                 self.initial_input = None
100                 self.job = None
101
102                 self.end = 100
103                 self.weighting = 100
104                 self.__progress = 0
105                 self.cmd = None
106                 self.cwd = "/tmp"
107                 self.args = [ ]
108                 self.task_progress_changed = None
109                 self.output_line = ""
110                 job.addTask(self)
111
112         def setCommandline(self, cmd, args):
113                 self.cmd = cmd
114                 self.args = args
115
116         def setTool(self, tool):
117                 self.cmd = tool
118                 self.args = [tool]
119                 self.global_preconditions.append(ToolExistsPrecondition())
120                 self.postconditions.append(ReturncodePostcondition())
121
122         def checkPreconditions(self, immediate = False):
123                 not_met = [ ]
124                 if immediate:
125                         preconditions = self.immediate_preconditions
126                 else:
127                         preconditions = self.global_preconditions
128                 for precondition in preconditions:
129                         if not precondition.check(self):
130                                 not_met.append(precondition)
131                 return not_met
132
133         def run(self, callback, task_progress_changed):
134                 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
135                 if len(failed_preconditions):
136                         callback(self, failed_preconditions)
137                         return
138                 self.prepare()
139
140                 self.callback = callback
141                 self.task_progress_changed = task_progress_changed
142                 from enigma import eConsoleAppContainer
143                 self.container = eConsoleAppContainer()
144                 self.container.appClosed.get().append(self.processFinished)
145                 self.container.dataAvail.get().append(self.processOutput)
146
147                 assert self.cmd is not None
148                 assert len(self.args) >= 1
149
150                 if self.cwd is not None:
151                         self.container.setCWD(self.cwd)
152
153                 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args
154                 if self.initial_input:
155                         self.writeInput(self.initial_input)
156
157         def prepare(self):
158                 pass
159
160         def cleanup(self, failed):
161                 pass
162
163         def processOutput(self, data):
164                 self.output_line += data
165                 while True:
166                         i = self.output_line.find('\n')
167                         if i == -1:
168                                 break
169                         self.processOutputLine(self.output_line[:i+1])
170                         self.output_line = self.output_line[i+1:]
171
172         def processOutputLine(self, line):
173                 pass
174
175         def processFinished(self, returncode):
176                 self.returncode = returncode
177                 self.finish()
178
179         def abort(self):
180                 self.container.kill()
181                 self.finish(aborted = True)
182
183         def finish(self, aborted = False):
184                 self.afterRun()
185                 not_met = [ ]
186                 if aborted:
187                         not_met.append(AbortedPostcondition())
188                 else:
189                         for postcondition in self.postconditions:
190                                 if not postcondition.check(self):
191                                         not_met.append(postcondition)
192
193                 self.cleanup(not_met)
194                 self.callback(self, not_met)
195
196         def afterRun(self):
197                 pass
198
199         def writeInput(self, input):
200                 self.container.write(input)
201
202         def getProgress(self):
203                 return self.__progress
204
205         def setProgress(self, progress):
206                 if progress > self.end:
207                         progress = self.end
208                 if progress < 0:
209                         progress = 0
210                 print "progress now", progress
211                 self.__progress = progress
212                 self.task_progress_changed()
213
214         progress = property(getProgress, setProgress)
215
216 # The jobmanager will execute multiple jobs, each after another.
217 # later, it will also support suspending jobs (and continuing them after reboot etc)
218 # It also supports a notification when some error occured, and possibly a retry.
219 class JobManager:
220         def __init__(self):
221                 self.active_jobs = [ ]
222                 self.failed_jobs = [ ]
223                 self.job_classes = [ ]
224                 self.active_job = None
225
226         def AddJob(self, job):
227                 self.active_jobs.append(job)
228                 self.kick()
229
230         def kick(self):
231                 if self.active_job is None:
232                         if len(self.active_jobs):
233                                 self.active_job = self.active_jobs.pop(0)
234                                 self.active_job.start(self.jobDone)
235
236         def jobDone(self, job, task, problems):
237                 print "job", job, "completed with", problems, "in", task
238                 if problems:
239                         from Tools import Notifications
240                         from Screens.MessageBox import MessageBox
241                         Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
242                         return
243                         #self.failed_jobs.append(self.active_job)
244
245                 self.active_job = None
246                 self.kick()
247
248         def errorCB(self, answer):
249                 if answer:
250                         print "retrying job"
251                         self.active_job.retry()
252                 else:
253                         print "not retrying job."
254                         self.failed_jobs.append(self.active_job)
255                         self.active_job = None
256                         self.kick()
257
258 # some examples:
259 #class PartitionExistsPostcondition:
260 #       def __init__(self, device):
261 #               self.device = device
262 #
263 #       def check(self, task):
264 #               import os
265 #               return os.access(self.device + "part1", os.F_OK)
266 #
267 #class CreatePartitionTask(Task):
268 #       def __init__(self, device):
269 #               Task.__init__(self, _("Create Partition"))
270 #               self.device = device
271 #               self.setTool("/sbin/sfdisk")
272 #               self.args += ["-f", self.device + "disc"]
273 #               self.initial_input = "0,\n;\n;\n;\ny\n"
274 #               self.postconditions.append(PartitionExistsPostcondition(self.device))
275 #
276 #class CreateFilesystemTask(Task):
277 #       def __init__(self, device, partition = 1, largefile = True):
278 #               Task.__init__(self, _("Create Filesystem"))
279 #               self.setTool("/sbin/mkfs.ext")
280 #               if largefile:
281 #                       self.args += ["-T", "largefile"]
282 #               self.args.append("-m0")
283 #               self.args.append(device + "part%d" % partition)
284 #
285 #class FilesystemMountTask(Task):
286 #       def __init__(self, device, partition = 1, filesystem = "ext3"):
287 #               Task.__init__(self, _("Mounting Filesystem"))
288 #               self.setTool("/bin/mount")
289 #               if filesystem is not None:
290 #                       self.args += ["-t", filesystem]
291 #               self.args.append(device + "part%d" % partition)
292
293 class Condition:
294         RECOVERABLE = False
295
296         def getErrorMessage(self, task):
297                 return _("An error has occured. (%s)") % (self.__class__.__name__)
298
299 class WorkspaceExistsPrecondition(Condition):
300         def check(self, task):
301                 return os.access(task.job.workspace, os.W_OK)
302
303 class DiskspacePrecondition(Condition):
304         def __init__(self, diskspace_required):
305                 self.diskspace_required = diskspace_required
306                 self.diskspace_available = None
307
308         def check(self, task):
309                 import os
310                 try:
311                         s = os.statvfs(task.job.workspace)
312                         self.diskspace_available = s.f_bsize * s.f_bavail 
313                         return self.diskspace_available >= self.diskspace_required
314                 except OSError:
315                         return False
316
317         def getErrorMessage(self, task):
318                 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
319
320 class ToolExistsPrecondition(Condition):
321         def check(self, task):
322                 import os
323                 if task.cmd[0]=='/':
324                         realpath = task.cmd
325                 else:
326                         realpath = self.cwd + '/' + self.cmd
327                 self.realpath = realpath
328                 return os.access(realpath, os.X_OK)
329
330         def getErrorMessage(self, task):
331                 return _("A required tool (%s) was not found.") % (self.realpath)
332
333 class AbortedPostcondition(Condition):
334         pass
335
336 class ReturncodePostcondition(Condition):
337         def check(self, task):
338                 return task.returncode == 0
339
340 #class HDDInitJob(Job):
341 #       def __init__(self, device):
342 #               Job.__init__(self, _("Initialize Harddisk"))
343 #               self.device = device
344 #               self.fromDescription(self.createDescription())
345 #
346 #       def fromDescription(self, description):
347 #               self.device = description["device"]
348 #               self.addTask(CreatePartitionTask(self.device))
349 #               self.addTask(CreateFilesystemTask(self.device))
350 #               self.addTask(FilesystemMountTask(self.device))
351 #
352 #       def createDescription(self):
353 #               return {"device": self.device}
354
355 job_manager = JobManager()