1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.workspace = "/tmp"
19 self.state_changed = CList()
21 self.status = self.NOT_STARTED
23 # description is a dict
24 def fromDescription(self, description):
27 def createDescription(self):
30 def getProgress(self):
31 if self.current_task == len(self.tasks):
33 t = self.tasks[self.current_task]
34 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
35 return int(jobprogress*self.weightScale)
37 progress = property(getProgress)
39 def task_progress_changed_CB(self):
42 def addTask(self, task):
44 self.tasks.append(task)
46 def start(self, callback):
47 assert self.callback is None
48 self.callback = callback
49 self.status = self.IN_PROGRESS
52 sumTaskWeightings = sum([t.weighting for t in self.tasks])
53 self.weightScale = (self.end+1) / float(sumTaskWeightings)
56 if self.current_task == len(self.tasks):
57 self.callback(self, [])
58 self.status = self.FINISHED
61 self.tasks[self.current_task].run(self.taskCallback,self.task_progress_changed_CB)
64 def taskCallback(self, res):
66 print ">>> Error:", res
67 self.status = self.FAILED
69 self.callback(self, res)
72 self.current_task += 1
76 if self.current_task < len(self.tasks):
77 self.tasks[self.current_task].abort()
80 # some Jobs might have a better idea of how to cancel a job
84 def __init__(self, job, name):
86 self.immediate_preconditions = [ ]
87 self.global_preconditions = [ ]
88 self.postconditions = [ ]
89 self.returncode = None
90 self.initial_input = None
99 self.task_progress_changed = None
100 self.output_line = ""
103 def setCommandline(self, cmd, args):
107 def setTool(self, tool):
110 self.global_preconditions.append(ToolExistsPrecondition())
111 self.postconditions.append(ReturncodePostcondition())
113 def checkPreconditions(self, immediate = False):
116 preconditions = self.immediate_preconditions
118 preconditions = self.global_preconditions
119 for precondition in preconditions:
120 if not precondition.check(self):
121 not_met.append(precondition)
124 def run(self, callback, task_progress_changed):
125 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
126 if len(failed_preconditions):
127 callback(failed_preconditions)
131 self.callback = callback
132 self.task_progress_changed = task_progress_changed
133 from enigma import eConsoleAppContainer
134 self.container = eConsoleAppContainer()
135 self.container.appClosed.get().append(self.processFinished)
136 self.container.dataAvail.get().append(self.processOutput)
138 assert self.cmd is not None
139 assert len(self.args) >= 1
141 if self.cwd is not None:
142 self.container.setCWD(self.cwd)
144 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args
145 if self.initial_input:
146 self.writeInput(self.initial_input)
151 def cleanup(self, failed):
154 def processOutput(self, data):
155 self.output_line += data
157 i = self.output_line.find('\n')
160 self.processOutputLine(self.output_line[:i+1])
161 self.output_line = self.output_line[i+1:]
163 def processOutputLine(self, line):
166 def processFinished(self, returncode):
167 self.returncode = returncode
171 self.container.kill()
172 self.finish(aborted = True)
174 def finish(self, aborted = False):
178 not_met.append(AbortedPostcondition())
180 for postcondition in self.postconditions:
181 if not postcondition.check(self):
182 not_met.append(postcondition)
184 self.cleanup(not_met)
185 self.callback(not_met)
190 def writeInput(self, input):
191 self.container.write(input)
193 def getProgress(self):
194 return self.__progress
196 def setProgress(self, progress):
197 if progress > self.end:
201 print "progress now", progress
202 self.__progress = progress
203 self.task_progress_changed()
205 progress = property(getProgress, setProgress)
209 self.active_jobs = [ ]
210 self.failed_jobs = [ ]
211 self.job_classes = [ ]
212 self.active_job = None
214 def AddJob(self, job):
215 self.active_jobs.append(job)
219 if self.active_job is None:
220 if len(self.active_jobs):
221 self.active_job = self.active_jobs.pop(0)
222 self.active_job.start(self.jobDone)
224 def jobDone(self, job, problems):
225 print "job", job, "completed with", problems
227 self.failed_jobs.append(self.active_job)
229 self.active_job = None
233 #class PartitionExistsPostcondition:
234 # def __init__(self, device):
235 # self.device = device
237 # def check(self, task):
239 # return os.access(self.device + "part1", os.F_OK)
241 #class CreatePartitionTask(Task):
242 # def __init__(self, device):
243 # Task.__init__(self, _("Create Partition"))
244 # self.device = device
245 # self.setTool("/sbin/sfdisk")
246 # self.args += ["-f", self.device + "disc"]
247 # self.initial_input = "0,\n;\n;\n;\ny\n"
248 # self.postconditions.append(PartitionExistsPostcondition(self.device))
250 #class CreateFilesystemTask(Task):
251 # def __init__(self, device, partition = 1, largefile = True):
252 # Task.__init__(self, _("Create Filesystem"))
253 # self.setTool("/sbin/mkfs.ext")
255 # self.args += ["-T", "largefile"]
256 # self.args.append("-m0")
257 # self.args.append(device + "part%d" % partition)
259 #class FilesystemMountTask(Task):
260 # def __init__(self, device, partition = 1, filesystem = "ext3"):
261 # Task.__init__(self, _("Mounting Filesystem"))
262 # self.setTool("/bin/mount")
263 # if filesystem is not None:
264 # self.args += ["-t", filesystem]
265 # self.args.append(device + "part%d" % partition)
267 class WorkspaceExistsPrecondition:
268 def check(self, task):
269 return os.access(task.job.workspace, os.W_OK)
271 class DiskspacePrecondition:
272 def __init__(self, diskspace_required):
273 self.diskspace_required = diskspace_required
275 def check(self, task):
278 s = os.statvfs(task.job.workspace)
279 return s.f_bsize * s.f_bavail >= self.diskspace_required
283 class ToolExistsPrecondition:
284 def check(self, task):
289 realpath = self.cwd + '/' + self.cmd
290 return os.access(realpath, os.X_OK)
292 class AbortedPostcondition:
295 class ReturncodePostcondition:
296 def check(self, task):
297 return task.returncode == 0
299 #class HDDInitJob(Job):
300 # def __init__(self, device):
301 # Job.__init__(self, _("Initialize Harddisk"))
302 # self.device = device
303 # self.fromDescription(self.createDescription())
305 # def fromDescription(self, description):
306 # self.device = description["device"]
307 # self.addTask(CreatePartitionTask(self.device))
308 # self.addTask(CreateFilesystemTask(self.device))
309 # self.addTask(FilesystemMountTask(self.device))
311 # def createDescription(self):
312 # return {"device": self.device}
314 job_manager = JobManager()