1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.workspace = "/tmp"
19 self.state_changed = CList()
21 self.status = self.NOT_STARTED
23 # description is a dict
24 def fromDescription(self, description):
27 def createDescription(self):
30 def getProgress(self):
31 if self.current_task == len(self.tasks):
33 t = self.tasks[self.current_task]
34 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
35 return int(jobprogress*self.weightScale)
37 progress = property(getProgress)
39 def task_progress_changed_CB(self):
42 def addTask(self, task):
44 self.tasks.append(task)
46 def start(self, callback):
47 assert self.callback is None
48 self.callback = callback
52 self.status = self.IN_PROGRESS
55 sumTaskWeightings = sum([t.weighting for t in self.tasks])
56 self.weightScale = (self.end+1) / float(sumTaskWeightings)
59 if self.current_task == len(self.tasks):
62 self.status = self.FINISHED
66 self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
69 def taskCallback(self, task, res):
71 print ">>> Error:", res
72 self.status = self.FAILED
74 self.callback(self, task, res)
77 self.current_task += 1
81 assert self.status == self.FAILED
85 if self.current_task < len(self.tasks):
86 self.tasks[self.current_task].abort()
89 # some Jobs might have a better idea of how to cancel a job
93 def __init__(self, job, name):
95 self.immediate_preconditions = [ ]
96 self.global_preconditions = [ ]
97 self.postconditions = [ ]
98 self.returncode = None
99 self.initial_input = None
108 self.task_progress_changed = None
109 self.output_line = ""
112 def setCommandline(self, cmd, args):
116 def setTool(self, tool):
119 self.global_preconditions.append(ToolExistsPrecondition())
120 self.postconditions.append(ReturncodePostcondition())
122 def checkPreconditions(self, immediate = False):
125 preconditions = self.immediate_preconditions
127 preconditions = self.global_preconditions
128 for precondition in preconditions:
129 if not precondition.check(self):
130 not_met.append(precondition)
133 def run(self, callback, task_progress_changed):
134 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
135 if len(failed_preconditions):
136 callback(self, failed_preconditions)
140 self.callback = callback
141 self.task_progress_changed = task_progress_changed
142 from enigma import eConsoleAppContainer
143 self.container = eConsoleAppContainer()
144 self.container.appClosed.get().append(self.processFinished)
145 self.container.dataAvail.get().append(self.processOutput)
147 assert self.cmd is not None
148 assert len(self.args) >= 1
150 if self.cwd is not None:
151 self.container.setCWD(self.cwd)
153 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args
154 if self.initial_input:
155 self.writeInput(self.initial_input)
160 def cleanup(self, failed):
163 def processOutput(self, data):
164 self.output_line += data
166 i = self.output_line.find('\n')
169 self.processOutputLine(self.output_line[:i+1])
170 self.output_line = self.output_line[i+1:]
172 def processOutputLine(self, line):
175 def processFinished(self, returncode):
176 self.returncode = returncode
180 self.container.kill()
181 self.finish(aborted = True)
183 def finish(self, aborted = False):
187 not_met.append(AbortedPostcondition())
189 for postcondition in self.postconditions:
190 if not postcondition.check(self):
191 not_met.append(postcondition)
193 self.cleanup(not_met)
194 self.callback(self, not_met)
199 def writeInput(self, input):
200 self.container.write(input)
202 def getProgress(self):
203 return self.__progress
205 def setProgress(self, progress):
206 if progress > self.end:
210 print "progress now", progress
211 self.__progress = progress
212 self.task_progress_changed()
214 progress = property(getProgress, setProgress)
216 # The jobmanager will execute multiple jobs, each after another.
217 # later, it will also support suspending jobs (and continuing them after reboot etc)
218 # It also supports a notification when some error occured, and possibly a retry.
221 self.active_jobs = [ ]
222 self.failed_jobs = [ ]
223 self.job_classes = [ ]
224 self.active_job = None
226 def AddJob(self, job):
227 self.active_jobs.append(job)
231 if self.active_job is None:
232 if len(self.active_jobs):
233 self.active_job = self.active_jobs.pop(0)
234 self.active_job.start(self.jobDone)
236 def jobDone(self, job, task, problems):
237 print "job", job, "completed with", problems, "in", task
239 from Tools import Notifications
240 from Screens.MessageBox import MessageBox
241 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
243 #self.failed_jobs.append(self.active_job)
245 self.active_job = None
248 def errorCB(self, answer):
251 self.active_job.retry()
253 print "not retrying job."
254 self.failed_jobs.append(self.active_job)
255 self.active_job = None
259 #class PartitionExistsPostcondition:
260 # def __init__(self, device):
261 # self.device = device
263 # def check(self, task):
265 # return os.access(self.device + "part1", os.F_OK)
267 #class CreatePartitionTask(Task):
268 # def __init__(self, device):
269 # Task.__init__(self, _("Create Partition"))
270 # self.device = device
271 # self.setTool("/sbin/sfdisk")
272 # self.args += ["-f", self.device + "disc"]
273 # self.initial_input = "0,\n;\n;\n;\ny\n"
274 # self.postconditions.append(PartitionExistsPostcondition(self.device))
276 #class CreateFilesystemTask(Task):
277 # def __init__(self, device, partition = 1, largefile = True):
278 # Task.__init__(self, _("Create Filesystem"))
279 # self.setTool("/sbin/mkfs.ext")
281 # self.args += ["-T", "largefile"]
282 # self.args.append("-m0")
283 # self.args.append(device + "part%d" % partition)
285 #class FilesystemMountTask(Task):
286 # def __init__(self, device, partition = 1, filesystem = "ext3"):
287 # Task.__init__(self, _("Mounting Filesystem"))
288 # self.setTool("/bin/mount")
289 # if filesystem is not None:
290 # self.args += ["-t", filesystem]
291 # self.args.append(device + "part%d" % partition)
296 def getErrorMessage(self, task):
297 return _("An error has occured. (%s)") % (self.__class__.__name__)
299 class WorkspaceExistsPrecondition(Condition):
300 def check(self, task):
301 return os.access(task.job.workspace, os.W_OK)
303 class DiskspacePrecondition(Condition):
304 def __init__(self, diskspace_required):
305 self.diskspace_required = diskspace_required
306 self.diskspace_available = None
308 def check(self, task):
311 s = os.statvfs(task.job.workspace)
312 self.diskspace_available = s.f_bsize * s.f_bavail
313 return self.diskspace_available >= self.diskspace_required
317 def getErrorMessage(self, task):
318 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
320 class ToolExistsPrecondition(Condition):
321 def check(self, task):
326 realpath = self.cwd + '/' + self.cmd
327 self.realpath = realpath
328 return os.access(realpath, os.X_OK)
330 def getErrorMessage(self, task):
331 return _("A required tool (%s) was not found.") % (self.realpath)
333 class AbortedPostcondition(Condition):
336 class ReturncodePostcondition(Condition):
337 def check(self, task):
338 return task.returncode == 0
340 #class HDDInitJob(Job):
341 # def __init__(self, device):
342 # Job.__init__(self, _("Initialize Harddisk"))
343 # self.device = device
344 # self.fromDescription(self.createDescription())
346 # def fromDescription(self, description):
347 # self.device = description["device"]
348 # self.addTask(CreatePartitionTask(self.device))
349 # self.addTask(CreateFilesystemTask(self.device))
350 # self.addTask(FilesystemMountTask(self.device))
352 # def createDescription(self):
353 # return {"device": self.device}
355 job_manager = JobManager()