remove non working wrapper class to add a (unneeded) .get() call to
[enigma2.git] / lib / python / Components / Task.py
1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
3
4 from Tools.CList import CList
5
6 class Job(object):
7         NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8         def __init__(self, name):
9                 self.tasks = [ ]
10                 self.resident_tasks = [ ]
11                 self.workspace = "/tmp"
12                 self.current_task = 0
13                 self.callback = None
14                 self.name = name
15                 self.finished = False
16                 self.end = 100
17                 self.__progress = 0
18                 self.weightScale = 1
19
20                 self.state_changed = CList()
21
22                 self.status = self.NOT_STARTED
23
24         # description is a dict
25         def fromDescription(self, description):
26                 pass
27
28         def createDescription(self):
29                 return None
30
31         def getProgress(self):
32                 if self.current_task == len(self.tasks):
33                         return self.end
34                 t = self.tasks[self.current_task]
35                 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
36                 return int(jobprogress*self.weightScale)
37
38         progress = property(getProgress)
39
40         def getStatustext(self):
41                 return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
42
43         def task_progress_changed_CB(self):
44                 self.state_changed()
45
46         def addTask(self, task):
47                 task.job = self
48                 self.tasks.append(task)
49
50         def start(self, callback):
51                 assert self.callback is None
52                 self.callback = callback
53                 self.restart()
54
55         def restart(self):
56                 self.status = self.IN_PROGRESS
57                 self.state_changed()
58                 self.runNext()
59                 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
60                 self.weightScale = self.end / float(sumTaskWeightings)
61
62         def runNext(self):
63                 if self.current_task == len(self.tasks):
64                         if len(self.resident_tasks) == 0:
65                                 cb = self.callback
66                                 self.callback = None
67                                 self.status = self.FINISHED
68                                 self.state_changed()
69                                 cb(self, None, [])
70                         else:
71                                 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
72                 else:
73                         self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
74                         self.state_changed()
75
76         def taskCallback(self, task, res, stay_resident = False):
77                 cb_idx = self.tasks.index(task)
78                 if stay_resident:
79                         if cb_idx not in self.resident_tasks:
80                                 self.resident_tasks.append(self.current_task)
81                                 print "task going resident:", task
82                         else:
83                                 print "task keeps staying resident:", task
84                                 return
85                 if len(res):
86                         print ">>> Error:", res
87                         self.status = self.FAILED
88                         self.state_changed()
89                         self.callback(self, task, res)
90                 if cb_idx != self.current_task:
91                         if cb_idx in self.resident_tasks:
92                                 print "resident task finished:", task
93                                 self.resident_tasks.remove(cb_idx)
94                 if res == []:
95                         self.state_changed()
96                         self.current_task += 1
97                         self.runNext()
98
99         def retry(self):
100                 assert self.status == self.FAILED
101                 self.restart()
102
103         def abort(self):
104                 if self.current_task < len(self.tasks):
105                         self.tasks[self.current_task].abort()
106                 for i in self.resident_tasks:
107                         self.tasks[i].abort()
108
109         def cancel(self):
110                 # some Jobs might have a better idea of how to cancel a job
111                 self.abort()
112
113 class Task(object):
114         def __init__(self, job, name):
115                 self.name = name
116                 self.immediate_preconditions = [ ]
117                 self.global_preconditions = [ ]
118                 self.postconditions = [ ]
119                 self.returncode = None
120                 self.initial_input = None
121                 self.job = None
122
123                 self.end = 100
124                 self.weighting = 100
125                 self.__progress = 0
126                 self.cmd = None
127                 self.cwd = "/tmp"
128                 self.args = [ ]
129                 self.task_progress_changed = None
130                 self.output_line = ""
131                 job.addTask(self)
132
133         def setCommandline(self, cmd, args):
134                 self.cmd = cmd
135                 self.args = args
136
137         def setTool(self, tool):
138                 self.cmd = tool
139                 self.args = [tool]
140                 self.global_preconditions.append(ToolExistsPrecondition())
141                 self.postconditions.append(ReturncodePostcondition())
142
143         def checkPreconditions(self, immediate = False):
144                 not_met = [ ]
145                 if immediate:
146                         preconditions = self.immediate_preconditions
147                 else:
148                         preconditions = self.global_preconditions
149                 for precondition in preconditions:
150                         if not precondition.check(self):
151                                 not_met.append(precondition)
152                 return not_met
153
154         def run(self, callback, task_progress_changed):
155                 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
156                 if len(failed_preconditions):
157                         callback(self, failed_preconditions)
158                         return
159                 self.prepare()
160
161                 self.callback = callback
162                 self.task_progress_changed = task_progress_changed
163                 from enigma import eConsoleAppContainer
164                 self.container = eConsoleAppContainer()
165                 self.container.appClosed.append(self.processFinished)
166                 self.container.stdoutAvail.append(self.processStdout)
167                 self.container.stderrAvail.append(self.processStderr)
168
169                 assert self.cmd is not None
170                 assert len(self.args) >= 1
171
172                 if self.cwd is not None:
173                         self.container.setCWD(self.cwd)
174
175                 execstr = self.cmd + " ".join(self.args)
176                 print "execute:", self.container.execute(execstr), execstr
177                 if self.initial_input:
178                         self.writeInput(self.initial_input)
179
180         def prepare(self):
181                 pass
182
183         def cleanup(self, failed):
184                 pass
185         
186         def processStdout(self, data):
187                 self.processOutput(data)
188                 
189         def processStderr(self, data):
190                 self.processOutput(data)
191
192         def processOutput(self, data):
193                 self.output_line += data
194                 while True:
195                         i = self.output_line.find('\n')
196                         if i == -1:
197                                 break
198                         self.processOutputLine(self.output_line[:i+1])
199                         self.output_line = self.output_line[i+1:]
200
201         def processOutputLine(self, line):
202                 pass
203
204         def processFinished(self, returncode):
205                 self.returncode = returncode
206                 self.finish()
207
208         def abort(self):
209                 self.container.kill()
210                 self.finish(aborted = True)
211
212         def finish(self, aborted = False):
213                 self.afterRun()
214                 not_met = [ ]
215                 if aborted:
216                         not_met.append(AbortedPostcondition())
217                 else:
218                         for postcondition in self.postconditions:
219                                 if not postcondition.check(self):
220                                         not_met.append(postcondition)
221                 self.cleanup(not_met)
222                 self.callback(self, not_met)
223
224         def afterRun(self):
225                 pass
226
227         def writeInput(self, input):
228                 self.container.write(input)
229
230         def getProgress(self):
231                 return self.__progress
232
233         def setProgress(self, progress):
234                 if progress > self.end:
235                         progress = self.end
236                 if progress < 0:
237                         progress = 0
238                 self.__progress = progress
239                 self.task_progress_changed()
240
241         progress = property(getProgress, setProgress)
242
243 # The jobmanager will execute multiple jobs, each after another.
244 # later, it will also support suspending jobs (and continuing them after reboot etc)
245 # It also supports a notification when some error occured, and possibly a retry.
246 class JobManager:
247         def __init__(self):
248                 self.active_jobs = [ ]
249                 self.failed_jobs = [ ]
250                 self.job_classes = [ ]
251                 self.in_background = False
252                 self.active_job = None
253
254         def AddJob(self, job):
255                 self.active_jobs.append(job)
256                 self.kick()
257
258         def kick(self):
259                 if self.active_job is None:
260                         if len(self.active_jobs):
261                                 self.active_job = self.active_jobs.pop(0)
262                                 self.active_job.start(self.jobDone)
263
264         def jobDone(self, job, task, problems):
265                 print "job", job, "completed with", problems, "in", task
266                 from Tools import Notifications
267                 if self.in_background:
268                         from Screens.TaskView import JobView
269                         Notifications.AddNotification(JobView, self.active_job)
270                 if problems:
271                         from Screens.MessageBox import MessageBox
272                         if problems[0].RECOVERABLE:
273                                 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
274                         else:
275                                 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
276                                 self.errorCB(False)
277                         return
278                         #self.failed_jobs.append(self.active_job)
279
280                 self.active_job = None
281                 self.kick()
282
283         def errorCB(self, answer):
284                 if answer:
285                         print "retrying job"
286                         self.active_job.retry()
287                 else:
288                         print "not retrying job."
289                         self.failed_jobs.append(self.active_job)
290                         self.active_job = None
291                         self.kick()
292
293         def getPendingJobs(self):
294                 list = [ ]
295                 if self.active_job:
296                         list.append(self.active_job)
297                 list += self.active_jobs
298                 return list
299 # some examples:
300 #class PartitionExistsPostcondition:
301 #       def __init__(self, device):
302 #               self.device = device
303 #
304 #       def check(self, task):
305 #               import os
306 #               return os.access(self.device + "part1", os.F_OK)
307 #
308 #class CreatePartitionTask(Task):
309 #       def __init__(self, device):
310 #               Task.__init__(self, _("Create Partition"))
311 #               self.device = device
312 #               self.setTool("/sbin/sfdisk")
313 #               self.args += ["-f", self.device + "disc"]
314 #               self.initial_input = "0,\n;\n;\n;\ny\n"
315 #               self.postconditions.append(PartitionExistsPostcondition(self.device))
316 #
317 #class CreateFilesystemTask(Task):
318 #       def __init__(self, device, partition = 1, largefile = True):
319 #               Task.__init__(self, _("Create Filesystem"))
320 #               self.setTool("/sbin/mkfs.ext")
321 #               if largefile:
322 #                       self.args += ["-T", "largefile"]
323 #               self.args.append("-m0")
324 #               self.args.append(device + "part%d" % partition)
325 #
326 #class FilesystemMountTask(Task):
327 #       def __init__(self, device, partition = 1, filesystem = "ext3"):
328 #               Task.__init__(self, _("Mounting Filesystem"))
329 #               self.setTool("/bin/mount")
330 #               if filesystem is not None:
331 #                       self.args += ["-t", filesystem]
332 #               self.args.append(device + "part%d" % partition)
333
334 class Condition:
335         RECOVERABLE = False
336
337         def getErrorMessage(self, task):
338                 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
339
340 class WorkspaceExistsPrecondition(Condition):
341         def check(self, task):
342                 return os.access(task.job.workspace, os.W_OK)
343
344 class DiskspacePrecondition(Condition):
345         def __init__(self, diskspace_required):
346                 self.diskspace_required = diskspace_required
347                 self.diskspace_available = 0
348
349         def check(self, task):
350                 import os
351                 try:
352                         s = os.statvfs(task.job.workspace)
353                         self.diskspace_available = s.f_bsize * s.f_bavail
354                         return self.diskspace_available >= self.diskspace_required
355                 except OSError:
356                         return False
357
358         def getErrorMessage(self, task):
359                 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
360
361 class ToolExistsPrecondition(Condition):
362         def check(self, task):
363                 import os
364                 if task.cmd[0]=='/':
365                         realpath = task.cmd
366                 else:
367                         realpath = task.cwd + '/' + task.cmd
368                 self.realpath = realpath
369                 return os.access(realpath, os.X_OK)
370
371         def getErrorMessage(self, task):
372                 return _("A required tool (%s) was not found.") % (self.realpath)
373
374 class AbortedPostcondition(Condition):
375         pass
376
377 class ReturncodePostcondition(Condition):
378         def check(self, task):
379                 return task.returncode == 0
380
381 #class HDDInitJob(Job):
382 #       def __init__(self, device):
383 #               Job.__init__(self, _("Initialize Harddisk"))
384 #               self.device = device
385 #               self.fromDescription(self.createDescription())
386 #
387 #       def fromDescription(self, description):
388 #               self.device = description["device"]
389 #               self.addTask(CreatePartitionTask(self.device))
390 #               self.addTask(CreateFilesystemTask(self.device))
391 #               self.addTask(FilesystemMountTask(self.device))
392 #
393 #       def createDescription(self):
394 #               return {"device": self.device}
395
396 job_manager = JobManager()