1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.workspace = "/tmp"
19 self.job_state_changed = CList()
20 #triggered when task finishes/fails
22 self.task_state_changed = CList()
23 #triggered when external app generates output
25 self.status = self.NOT_STARTED
27 # description is a dict
28 def fromDescription(self, description):
31 def createDescription(self):
34 def task_state_changed_CB(self):
35 t = self.tasks[self.current_task]
36 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
37 self.progress = jobprogress*self.weightScale
38 self.task_state_changed()
40 def addTask(self, task):
42 self.tasks.append(task)
44 def start(self, callback):
45 assert self.callback is None
46 self.callback = callback
47 self.status = self.IN_PROGRESS
48 self.job_state_changed()
50 sumTaskWeightings = sum([t.weighting for t in self.tasks])
51 self.weightScale = (self.end+1) / float(sumTaskWeightings)
54 if self.current_task == len(self.tasks):
55 self.callback(self, [])
56 self.status = self.FINISHED
57 self.job_state_changed()
59 self.tasks[self.current_task].run(self.taskCallback,self.task_state_changed_CB)
60 self.job_state_changed()
62 def taskCallback(self, res):
64 print ">>> Error:", res
65 self.status = self.FAILED
66 self.job_state_changed()
67 self.callback(self, res)
69 self.progress = (self.progress + self.tasks[self.current_task].weighting*self.weightScale )
70 self.current_task += 1
74 def __init__(self, job, name):
76 self.immediate_preconditions = [ ]
77 self.global_preconditions = [ ]
78 self.postconditions = [ ]
79 self.returncode = None
80 self.initial_input = None
89 self.task_state_changed = None
92 def setCommandline(self, cmd, args):
96 def setTool(self, tool):
99 self.global_preconditions.append(ToolExistsPrecondition())
100 self.postconditions.append(ReturncodePostcondition())
102 def checkPreconditions(self, immediate = False):
105 preconditions = self.immediate_preconditions
107 preconditions = self.global_preconditions
108 for precondition in preconditions:
109 if not precondition.check(self):
110 not_met.append(precondition)
113 def run(self, callback, task_state_changed):
114 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
115 if len(failed_preconditions):
116 callback(failed_preconditions)
120 self.callback = callback
121 self.task_state_changed = task_state_changed
122 from enigma import eConsoleAppContainer
123 self.container = eConsoleAppContainer()
124 self.container.appClosed.get().append(self.processFinished)
125 self.container.dataAvail.get().append(self.processOutput)
127 assert self.cmd is not None
128 assert len(self.args) >= 1
130 if self.cwd is not None:
131 self.container.setCWD(self.cwd)
133 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, self.args
134 if self.initial_input:
135 self.writeInput(self.initial_input)
140 def cleanup(self, failed):
143 def processOutput(self, data):
146 def processFinished(self, returncode):
147 self.returncode = returncode
153 for postcondition in self.postconditions:
154 if not postcondition.check(self):
155 not_met.append(postcondition)
157 self.callback(not_met)
162 def writeInput(self, input):
163 self.container.write(input)
167 self.active_jobs = [ ]
168 self.failed_jobs = [ ]
169 self.job_classes = [ ]
170 self.active_job = None
172 def AddJob(self, job):
173 self.active_jobs.append(job)
177 if self.active_job is None:
178 if len(self.active_jobs):
179 self.active_job = self.active_jobs.pop(0)
180 self.active_job.start(self.jobDone)
182 def jobDone(self, job, problems):
183 print "job", job, "completed with", problems
185 self.failed_jobs.append(self.active_job)
187 self.active_job = None
191 #class PartitionExistsPostcondition:
192 # def __init__(self, device):
193 # self.device = device
195 # def check(self, task):
197 # return os.access(self.device + "part1", os.F_OK)
199 #class CreatePartitionTask(Task):
200 # def __init__(self, device):
201 # Task.__init__(self, _("Create Partition"))
202 # self.device = device
203 # self.setTool("/sbin/sfdisk")
204 # self.args += ["-f", self.device + "disc"]
205 # self.initial_input = "0,\n;\n;\n;\ny\n"
206 # self.postconditions.append(PartitionExistsPostcondition(self.device))
208 #class CreateFilesystemTask(Task):
209 # def __init__(self, device, partition = 1, largefile = True):
210 # Task.__init__(self, _("Create Filesystem"))
211 # self.setTool("/sbin/mkfs.ext")
213 # self.args += ["-T", "largefile"]
214 # self.args.append("-m0")
215 # self.args.append(device + "part%d" % partition)
217 #class FilesystemMountTask(Task):
218 # def __init__(self, device, partition = 1, filesystem = "ext3"):
219 # Task.__init__(self, _("Mounting Filesystem"))
220 # self.setTool("/bin/mount")
221 # if filesystem is not None:
222 # self.args += ["-t", filesystem]
223 # self.args.append(device + "part%d" % partition)
225 #class DiskspacePrecondition:
226 # def __init__(self, diskspace_required):
227 # self.diskspace_required = diskspace_required
229 # def check(self, task):
230 # return getFreeDiskspace(task.workspace) >= self.diskspace_required
232 class ToolExistsPrecondition:
233 def check(self, task):
238 realpath = self.cwd + '/' + self.cmd
239 return os.access(realpath, os.X_OK)
241 class ReturncodePostcondition:
242 def check(self, task):
243 return task.returncode == 0
245 #class HDDInitJob(Job):
246 # def __init__(self, device):
247 # Job.__init__(self, _("Initialize Harddisk"))
248 # self.device = device
249 # self.fromDescription(self.createDescription())
251 # def fromDescription(self, description):
252 # self.device = description["device"]
253 # self.addTask(CreatePartitionTask(self.device))
254 # self.addTask(CreateFilesystemTask(self.device))
255 # self.addTask(FilesystemMountTask(self.device))
257 # def createDescription(self):
258 # return {"device": self.device}
260 job_manager = JobManager()