update sv,fr,lt,pl language
[enigma2.git] / lib / python / Components / Task.py
1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
3
4 from Tools.CList import CList
5
6 class Job(object):
7         NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8         def __init__(self, name):
9                 self.tasks = [ ]
10                 self.resident_tasks = [ ]
11                 self.workspace = "/tmp"
12                 self.current_task = 0
13                 self.callback = None
14                 self.name = name
15                 self.finished = False
16                 self.end = 100
17                 self.__progress = 0
18                 self.weightScale = 1
19
20                 self.state_changed = CList()
21
22                 self.status = self.NOT_STARTED
23
24         # description is a dict
25         def fromDescription(self, description):
26                 pass
27
28         def createDescription(self):
29                 return None
30
31         def getProgress(self):
32                 if self.current_task == len(self.tasks):
33                         return self.end
34                 t = self.tasks[self.current_task]
35                 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
36                 return int(jobprogress*self.weightScale)
37
38         progress = property(getProgress)
39
40         def task_progress_changed_CB(self):
41                 self.state_changed()
42
43         def addTask(self, task):
44                 task.job = self
45                 self.tasks.append(task)
46
47         def start(self, callback):
48                 assert self.callback is None
49                 self.callback = callback
50                 self.restart()
51
52         def restart(self):
53                 self.status = self.IN_PROGRESS
54                 self.state_changed()
55                 self.runNext()
56                 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
57                 self.weightScale = self.end / float(sumTaskWeightings)
58
59         def runNext(self):
60                 if self.current_task == len(self.tasks):
61                         if len(self.resident_tasks) == 0:
62                                 cb = self.callback
63                                 self.callback = None
64                                 self.status = self.FINISHED
65                                 self.state_changed()
66                                 cb(self, None, [])
67                         else:
68                                 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
69                 else:
70                         self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
71                         self.state_changed()
72
73         def taskCallback(self, task, res, stay_resident = False):
74                 cb_idx = self.tasks.index(task)
75                 if stay_resident:
76                         if cb_idx not in self.resident_tasks:
77                                 self.resident_tasks.append(self.current_task)
78                                 print "task going resident:", task
79                         else:
80                                 print "task keeps staying resident:", task
81                                 return
82                 if len(res):
83                         print ">>> Error:", res
84                         self.status = self.FAILED
85                         self.state_changed()
86                         self.callback(self, task, res)
87                 if cb_idx != self.current_task:
88                         if cb_idx in self.resident_tasks:
89                                 print "resident task finished:", task
90                                 self.resident_tasks.remove(cb_idx)
91                 if res == []:
92                         self.state_changed()
93                         self.current_task += 1
94                         self.runNext()
95
96         def retry(self):
97                 assert self.status == self.FAILED
98                 self.restart()
99
100         def abort(self):
101                 if self.current_task < len(self.tasks):
102                         self.tasks[self.current_task].abort()
103                 for i in self.resident_tasks:
104                         self.tasks[i].abort()
105
106         def cancel(self):
107                 # some Jobs might have a better idea of how to cancel a job
108                 self.abort()
109
110 class Task(object):
111         def __init__(self, job, name):
112                 self.name = name
113                 self.immediate_preconditions = [ ]
114                 self.global_preconditions = [ ]
115                 self.postconditions = [ ]
116                 self.returncode = None
117                 self.initial_input = None
118                 self.job = None
119
120                 self.end = 100
121                 self.weighting = 100
122                 self.__progress = 0
123                 self.cmd = None
124                 self.cwd = "/tmp"
125                 self.args = [ ]
126                 self.task_progress_changed = None
127                 self.output_line = ""
128                 job.addTask(self)
129
130         def setCommandline(self, cmd, args):
131                 self.cmd = cmd
132                 self.args = args
133
134         def setTool(self, tool):
135                 self.cmd = tool
136                 self.args = [tool]
137                 self.global_preconditions.append(ToolExistsPrecondition())
138                 self.postconditions.append(ReturncodePostcondition())
139
140         def checkPreconditions(self, immediate = False):
141                 not_met = [ ]
142                 if immediate:
143                         preconditions = self.immediate_preconditions
144                 else:
145                         preconditions = self.global_preconditions
146                 for precondition in preconditions:
147                         if not precondition.check(self):
148                                 not_met.append(precondition)
149                 return not_met
150
151         def run(self, callback, task_progress_changed):
152                 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
153                 if len(failed_preconditions):
154                         callback(self, failed_preconditions)
155                         return
156                 self.prepare()
157
158                 self.callback = callback
159                 self.task_progress_changed = task_progress_changed
160                 from enigma import eConsoleAppContainer
161                 self.container = eConsoleAppContainer()
162                 self.container.appClosed.get().append(self.processFinished)
163                 self.container.stdoutAvail.get().append(self.processStdout)
164                 self.container.stderrAvail.get().append(self.processStderr)
165
166                 assert self.cmd is not None
167                 assert len(self.args) >= 1
168
169                 if self.cwd is not None:
170                         self.container.setCWD(self.cwd)
171
172                 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, " ".join(self.args)
173                 if self.initial_input:
174                         self.writeInput(self.initial_input)
175
176         def prepare(self):
177                 pass
178
179         def cleanup(self, failed):
180                 pass
181         
182         def processStdout(self, data):
183                 self.processOutput(data)
184                 
185         def processStderr(self, data):
186                 self.processOutput(data)
187
188         def processOutput(self, data):
189                 self.output_line += data
190                 while True:
191                         i = self.output_line.find('\n')
192                         if i == -1:
193                                 break
194                         self.processOutputLine(self.output_line[:i+1])
195                         self.output_line = self.output_line[i+1:]
196
197         def processOutputLine(self, line):
198                 pass
199
200         def processFinished(self, returncode):
201                 self.returncode = returncode
202                 self.finish()
203
204         def abort(self):
205                 self.container.kill()
206                 self.finish(aborted = True)
207
208         def finish(self, aborted = False):
209                 self.afterRun()
210                 not_met = [ ]
211                 if aborted:
212                         not_met.append(AbortedPostcondition())
213                 else:
214                         for postcondition in self.postconditions:
215                                 if not postcondition.check(self):
216                                         not_met.append(postcondition)
217                 self.cleanup(not_met)
218                 self.callback(self, not_met)
219
220         def afterRun(self):
221                 pass
222
223         def writeInput(self, input):
224                 self.container.write(input)
225
226         def getProgress(self):
227                 return self.__progress
228
229         def setProgress(self, progress):
230                 if progress > self.end:
231                         progress = self.end
232                 if progress < 0:
233                         progress = 0
234                 self.__progress = progress
235                 self.task_progress_changed()
236
237         progress = property(getProgress, setProgress)
238
239 # The jobmanager will execute multiple jobs, each after another.
240 # later, it will also support suspending jobs (and continuing them after reboot etc)
241 # It also supports a notification when some error occured, and possibly a retry.
242 class JobManager:
243         def __init__(self):
244                 self.active_jobs = [ ]
245                 self.failed_jobs = [ ]
246                 self.job_classes = [ ]
247                 self.in_background = False
248                 self.active_job = None
249
250         def AddJob(self, job):
251                 self.active_jobs.append(job)
252                 self.kick()
253
254         def kick(self):
255                 if self.active_job is None:
256                         if len(self.active_jobs):
257                                 self.active_job = self.active_jobs.pop(0)
258                                 self.active_job.start(self.jobDone)
259
260         def jobDone(self, job, task, problems):
261                 print "job", job, "completed with", problems, "in", task
262                 from Tools import Notifications
263                 if self.in_background:
264                         from Screens.TaskView import JobView
265                         Notifications.AddNotification(JobView, self.active_job)
266                 if problems:
267                         from Screens.MessageBox import MessageBox
268                         if problems[0].RECOVERABLE:
269                                 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
270                         else:
271                                 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
272                                 self.errorCB(False)
273                         return
274                         #self.failed_jobs.append(self.active_job)
275
276                 self.active_job = None
277                 self.kick()
278
279         def errorCB(self, answer):
280                 if answer:
281                         print "retrying job"
282                         self.active_job.retry()
283                 else:
284                         print "not retrying job."
285                         self.failed_jobs.append(self.active_job)
286                         self.active_job = None
287                         self.kick()
288
289         def getPendingJobs(self):
290                 list = [ ]
291                 if self.active_job:
292                         list.append(self.active_job)
293                 list += self.active_jobs
294                 return list
295 # some examples:
296 #class PartitionExistsPostcondition:
297 #       def __init__(self, device):
298 #               self.device = device
299 #
300 #       def check(self, task):
301 #               import os
302 #               return os.access(self.device + "part1", os.F_OK)
303 #
304 #class CreatePartitionTask(Task):
305 #       def __init__(self, device):
306 #               Task.__init__(self, _("Create Partition"))
307 #               self.device = device
308 #               self.setTool("/sbin/sfdisk")
309 #               self.args += ["-f", self.device + "disc"]
310 #               self.initial_input = "0,\n;\n;\n;\ny\n"
311 #               self.postconditions.append(PartitionExistsPostcondition(self.device))
312 #
313 #class CreateFilesystemTask(Task):
314 #       def __init__(self, device, partition = 1, largefile = True):
315 #               Task.__init__(self, _("Create Filesystem"))
316 #               self.setTool("/sbin/mkfs.ext")
317 #               if largefile:
318 #                       self.args += ["-T", "largefile"]
319 #               self.args.append("-m0")
320 #               self.args.append(device + "part%d" % partition)
321 #
322 #class FilesystemMountTask(Task):
323 #       def __init__(self, device, partition = 1, filesystem = "ext3"):
324 #               Task.__init__(self, _("Mounting Filesystem"))
325 #               self.setTool("/bin/mount")
326 #               if filesystem is not None:
327 #                       self.args += ["-t", filesystem]
328 #               self.args.append(device + "part%d" % partition)
329
330 class Condition:
331         RECOVERABLE = False
332
333         def getErrorMessage(self, task):
334                 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
335
336 class WorkspaceExistsPrecondition(Condition):
337         def check(self, task):
338                 return os.access(task.job.workspace, os.W_OK)
339
340 class DiskspacePrecondition(Condition):
341         def __init__(self, diskspace_required):
342                 self.diskspace_required = diskspace_required
343                 self.diskspace_available = 0
344
345         def check(self, task):
346                 import os
347                 try:
348                         s = os.statvfs(task.job.workspace)
349                         self.diskspace_available = s.f_bsize * s.f_bavail
350                         return self.diskspace_available >= self.diskspace_required
351                 except OSError:
352                         return False
353
354         def getErrorMessage(self, task):
355                 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
356
357 class ToolExistsPrecondition(Condition):
358         def check(self, task):
359                 import os
360                 if task.cmd[0]=='/':
361                         realpath = task.cmd
362                 else:
363                         realpath = task.cwd + '/' + task.cmd
364                 self.realpath = realpath
365                 return os.access(realpath, os.X_OK)
366
367         def getErrorMessage(self, task):
368                 return _("A required tool (%s) was not found.") % (self.realpath)
369
370 class AbortedPostcondition(Condition):
371         pass
372
373 class ReturncodePostcondition(Condition):
374         def check(self, task):
375                 return task.returncode == 0
376
377 #class HDDInitJob(Job):
378 #       def __init__(self, device):
379 #               Job.__init__(self, _("Initialize Harddisk"))
380 #               self.device = device
381 #               self.fromDescription(self.createDescription())
382 #
383 #       def fromDescription(self, description):
384 #               self.device = description["device"]
385 #               self.addTask(CreatePartitionTask(self.device))
386 #               self.addTask(CreateFilesystemTask(self.device))
387 #               self.addTask(FilesystemMountTask(self.device))
388 #
389 #       def createDescription(self):
390 #               return {"device": self.device}
391
392 job_manager = JobManager()