from Screens.Satconfig import NimSelection from Screens.Screen import Screen from Screens.TextBox import TextBox from Screens.MessageBox import MessageBox from Plugins.Plugin import PluginDescriptor from Components.ActionMap import ActionMap, NumberActionMap from Components.NimManager import nimmanager from Components.ResourceManager import resourcemanager from Components.Sources.FrontendStatus import FrontendStatus from Components.TuneTest import TuneTest from Components.Sources.List import List from Components.Sources.Progress import Progress from Components.Sources.StaticText import StaticText from Components.ConfigList import ConfigListScreen from Components.config import getConfigListEntry, ConfigSelection, ConfigYesNo from Components.Harddisk import harddiskmanager import random # always use: # setResultType(type) # setResultParameter(parameter) # getTextualResult() class ResultParser: def __init__(self): pass TYPE_BYORBPOS = 0 TYPE_BYINDEX = 1 TYPE_ALL = 2 def setResultType(self, type): self.type = type def setResultParameter(self, parameter): if self.type == self.TYPE_BYORBPOS: self.orbpos = parameter elif self.type == self.TYPE_BYINDEX: self.index = parameter def getTextualResultForIndex(self, index, logfulltransponders = False): text = "" text += "%s:\n" % self.getTextualIndexRepresentation(index) failed, successful = self.results[index]["failed"], self.results[index]["successful"] countfailed = len(failed) countsuccessful = len(successful) countall = countfailed + countsuccessful percentfailed = round(countfailed / float(countall + 0.0001) * 100) percentsuccessful = round(countsuccessful / float(countall + 0.0001) * 100) text += "Tested %d transponders\n%d (%d %%) transponders succeeded\n%d (%d %%) transponders failed\n" % (countall, countsuccessful, percentsuccessful, countfailed, percentfailed) reasons = {} completelist = [] if countfailed > 0: for transponder in failed: completelist.append({"transponder": transponder[0], "fedata": transponder[-1]}) reasons[transponder[2]] = reasons.get(transponder[2], []) reasons[transponder[2]].append(transponder) if transponder[2] == "pids_failed": print transponder[2], "-", transponder[3] text += "The %d unsuccessful tuning attempts failed for the following reasons:\n" % countfailed for reason in reasons.keys(): text += "%s: %d transponders failed\n" % (reason, len(reasons[reason])) for reason in reasons.keys(): text += "\n" text += "%s previous planes:\n" % reason for transponder in reasons[reason]: if transponder[1] is not None: text += self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[1])) else: text += "No transponder tuned" text += " ==> " + self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[0])) text += "\n" if logfulltransponders: text += str(transponder[1]) text += " ==> " text += str(transponder[0]) text += "\n" if reason == "pids_failed": text += "(tsid, onid): " text += str(transponder[3]['real']) text += "(read from sat) != " text += str(transponder[3]['expected']) text += "(read from file)" text += "\n" text += "\n" if countsuccessful > 0: text += "\n" text += "Successfully tuned transponders' previous planes:\n" for transponder in successful: completelist.append({"transponder": transponder[0], "fedata": transponder[-1]}) if transponder[1] is not None: text += self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[1])) else: text += "No transponder tuned" text += " ==> " + self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[0])) text += "\n" text += "------------------------------------------------\n" text += "complete transponderlist:\n" for entry in completelist: text += str(entry["transponder"]) + " -- " + str(entry["fedata"]) + "\n" return text def getTextualResult(self): text = "" if self.type == self.TYPE_BYINDEX: text += self.getTextualResultForIndex(self.index) elif self.type == self.TYPE_BYORBPOS: for index in self.results.keys(): if index[2] == self.orbpos: text += self.getTextualResultForIndex(index) text += "\n-----------------------------------------------------\n" elif self.type == self.TYPE_ALL: orderedResults = {} for index in self.results.keys(): orbpos = index[2] orderedResults[orbpos] = orderedResults.get(orbpos, []) orderedResults[orbpos].append(index) ordered_orbpos = orderedResults.keys() ordered_orbpos.sort() for orbpos in ordered_orbpos: text += "\n*****************************************\n" text += "Orbital position %s:" % str(orbpos) text += "\n*****************************************\n" for index in orderedResults[orbpos]: text += self.getTextualResultForIndex(index, logfulltransponders = True) text += "\n-----------------------------------------------------\n" return text class DiseqcTester(Screen, TuneTest, ResultParser): skin = """ {"template": [ MultiContentEntryText(pos = (10, 0), size = (330, 25), flags = RT_HALIGN_LEFT, text = 1), # index 1 is the index name, MultiContentEntryText(pos = (330, 0), size = (150, 25), flags = RT_HALIGN_RIGHT, text = 2) # index 2 is the status, ], "fonts": [gFont("Regular", 20)], "itemHeight": 25 } """ TEST_TYPE_QUICK = 0 TEST_TYPE_RANDOM = 1 TEST_TYPE_COMPLETE = 2 def __init__(self, session, feid, test_type = TEST_TYPE_QUICK, loopsfailed = 3, loopssuccessful = 1, log = False): Screen.__init__(self, session) self.feid = feid self.test_type = test_type self.loopsfailed = loopsfailed self.loopssuccessful = loopssuccessful self.log = log self["actions"] = NumberActionMap(["SetupActions"], { "ok": self.select, "cancel": self.keyCancel, }, -2) TuneTest.__init__(self, feid, stopOnSuccess = self.loopssuccessful, stopOnError = self.loopsfailed) #self["Frontend"] = FrontendStatus(frontend_source = lambda : self.frontend, update_interval = 100) self["overall_progress"] = Progress() self["sub_progress"] = Progress() self["failed_counter"] = StaticText("0") self["succeeded_counter"] = StaticText("0") self["witherrors_counter"] = StaticText("0") self["untestable_counter"] = StaticText("0") self.list = [] self["progress_list"] = List(self.list) self["progress_list"].onSelectionChanged.append(self.selectionChanged) self["CmdText"] = StaticText(_("Please wait while scanning is in progress...")) self.indexlist = {} self.readTransponderList() self.running = False self.results = {} self.resultsstatus = {} self.onLayoutFinish.append(self.go) def getProgressListComponent(self, index, status): return (index, self.getTextualIndexRepresentation(index), status) def clearProgressList(self): self.list = [] self["progress_list"].list = self.list def addProgressListItem(self, index): if index in self.indexlist: for entry in self.list: if entry[0] == index: self.changeProgressListStatus(index, "working") return self.list.append(self.getProgressListComponent(index, _("working"))) self["progress_list"].list = self.list self["progress_list"].setIndex(len(self.list) - 1) def changeProgressListStatus(self, index, status): self.newlist = [] count = 0 indexpos = 0 for entry in self.list: if entry[0] == index: self.newlist.append(self.getProgressListComponent(index, status)) indexpos = count else: self.newlist.append(entry) count += 1 self.list = self.newlist self["progress_list"].list = self.list self["progress_list"].setIndex(indexpos) def readTransponderList(self): for sat in nimmanager.getSatListForNim(self.feid): for transponder in nimmanager.getTransponders(sat[0]): #print transponder mytransponder = (transponder[1] / 1000, transponder[2] / 1000, transponder[3], transponder[4], transponder[7], sat[0], transponder[5], transponder[6], transponder[8], transponder[9], transponder[10], transponder[11]) self.analyseTransponder(mytransponder) def getIndexForTransponder(self, transponder): if transponder[0] < 11700: band = 1 # low else: band = 0 # high polarisation = transponder[2] sat = transponder[5] index = (band, polarisation, sat) return index # sort the transponder into self.transponderlist def analyseTransponder(self, transponder): index = self.getIndexForTransponder(transponder) if index not in self.indexlist: self.indexlist[index] = [] self.indexlist[index].append(transponder) #print "self.indexlist:", self.indexlist # returns a string for the user representing a human readable output for index def getTextualIndexRepresentation(self, index): print "getTextualIndexRepresentation:", index text = "" text += nimmanager.getSatDescription(index[2]) + ", " if index[0] == 1: text += "Low Band, " else: text += "High Band, " if index[1] == 0: text += "H" else: text += "V" return text def fillTransponderList(self): self.clearTransponder() print "----------- fillTransponderList" print "index:", self.currentlyTestedIndex keys = self.indexlist.keys() if self.getContinueScanning(): print "index:", self.getTextualIndexRepresentation(self.currentlyTestedIndex) for transponder in self.indexlist[self.currentlyTestedIndex]: self.addTransponder(transponder) print "transponderList:", self.transponderlist return True else: return False def progressCallback(self, progress): if progress[0] != self["sub_progress"].getRange(): self["sub_progress"].setRange(progress[0]) self["sub_progress"].setValue(progress[1]) # logic for scanning order of transponders # on go getFirstIndex is called def getFirstIndex(self): # TODO use other function to scan more randomly if self.test_type == self.TEST_TYPE_QUICK: self.myindex = 0 keys = self.indexlist.keys() keys.sort(key = lambda a: a[2]) # sort by orbpos self["overall_progress"].setRange(len(keys)) self["overall_progress"].setValue(self.myindex) return keys[0] elif self.test_type == self.TEST_TYPE_RANDOM: self.randomkeys = self.indexlist.keys() random.shuffle(self.randomkeys) self.myindex = 0 self["overall_progress"].setRange(len(self.randomkeys)) self["overall_progress"].setValue(self.myindex) return self.randomkeys[0] elif self.test_type == self.TEST_TYPE_COMPLETE: keys = self.indexlist.keys() print "keys:", keys successorindex = {} for index in keys: successorindex[index] = [] for otherindex in keys: if otherindex != index: successorindex[index].append(otherindex) random.shuffle(successorindex[index]) self.keylist = [] stop = False currindex = None while not stop: if currindex is None or len(successorindex[currindex]) == 0: oldindex = currindex for index in successorindex.keys(): if len(successorindex[index]) > 0: currindex = index self.keylist.append(currindex) break if currindex == oldindex: stop = True else: currindex = successorindex[currindex].pop() self.keylist.append(currindex) print "self.keylist:", self.keylist self.myindex = 0 self["overall_progress"].setRange(len(self.keylist)) self["overall_progress"].setValue(self.myindex) return self.keylist[0] # after each index is finished, getNextIndex is called to get the next index to scan def getNextIndex(self): # TODO use other function to scan more randomly if self.test_type == self.TEST_TYPE_QUICK: self.myindex += 1 keys = self.indexlist.keys() keys.sort(key = lambda a: a[2]) # sort by orbpos self["overall_progress"].setValue(self.myindex) if self.myindex < len(keys): return keys[self.myindex] else: return None elif self.test_type == self.TEST_TYPE_RANDOM: self.myindex += 1 keys = self.randomkeys self["overall_progress"].setValue(self.myindex) if self.myindex < len(keys): return keys[self.myindex] else: return None elif self.test_type == self.TEST_TYPE_COMPLETE: self.myindex += 1 keys = self.keylist self["overall_progress"].setValue(self.myindex) if self.myindex < len(keys): return keys[self.myindex] else: return None # after each index is finished and the next index is returned by getNextIndex # the algorithm checks, if we should continue scanning def getContinueScanning(self): if self.test_type == self.TEST_TYPE_QUICK or self.test_type == self.TEST_TYPE_RANDOM: return (self.myindex < len(self.indexlist.keys())) elif self.test_type == self.TEST_TYPE_COMPLETE: return (self.myindex < len(self.keylist)) def addResult(self, index, status, failedTune, successfullyTune): self.results[index] = self.results.get(index, {"failed": [], "successful": [], "status": None, "internalstatus": None}) self.resultsstatus[status] = self.resultsstatus.get(status, []) oldstatus = self.results[index]["internalstatus"] if oldstatus is None: self.results[index]["status"] = status elif oldstatus == "successful": if status == "failed": self.results[index]["status"] = "with_errors" elif status == "successful": self.results[index]["status"] = oldstatus elif status == "with_errors": self.results[index]["status"] = "with_errors" elif status == "not_tested": self.results[index]["status"] = oldstatus elif oldstatus == "failed": if status == "failed": self.results[index]["status"] = oldstatus elif status == "successful": self.results[index]["status"] = "with_errors" elif status == "with_errors": self.results[index]["status"] = "with_errors" elif status == "not_tested": self.results[index]["status"] = oldstatus elif oldstatus == "with_errors": if status == "failed": self.results[index]["status"] = oldstatus elif status == "successful": self.results[index]["status"] = oldstatus elif status == "with_errors": self.results[index]["status"] = oldstatus elif status == "not_tested": self.results[index]["status"] = oldstatus elif oldstatus == "not_tested": self.results[index]["status"] = status if self.results[index]["status"] != "working": self.results[index]["internalstatus"] = self.results[index]["status"] self.results[index]["failed"] = failedTune + self.results[index]["failed"] self.results[index]["successful"] = successfullyTune + self.results[index]["successful"] self.resultsstatus[status].append(index) def finishedChecking(self): print "finishedChecking" TuneTest.finishedChecking(self) if not self.results.has_key(self.currentlyTestedIndex): self.results[self.currentlyTestedIndex] = {"failed": [], "successful": [], "status": None, "internalstatus": None} if len(self.failedTune) > 0 and len(self.successfullyTune) > 0: self.changeProgressListStatus(self.currentlyTestedIndex, "with errors") self["witherrors_counter"].setText(str(int(self["witherrors_counter"].getText()) + 1)) self.addResult(self.currentlyTestedIndex, "with_errors", self.failedTune, self.successfullyTune) elif len(self.failedTune) == 0 and len(self.successfullyTune) == 0: self.changeProgressListStatus(self.currentlyTestedIndex, "not tested") self["untestable_counter"].setText(str(int(self["untestable_counter"].getText()) + 1)) self.addResult(self.currentlyTestedIndex, "untestable", self.failedTune, self.successfullyTune) elif len(self.failedTune) > 0: self.changeProgressListStatus(self.currentlyTestedIndex, "failed") #self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + len(self.failedTune))) self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + 1)) self.addResult(self.currentlyTestedIndex, "failed", self.failedTune, self.successfullyTune) else: self.changeProgressListStatus(self.currentlyTestedIndex, "successful") #self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + len(self.successfullyTune))) self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + 1)) self.addResult(self.currentlyTestedIndex, "successful", self.failedTune, self.successfullyTune) #self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + len(self.failedTune))) #self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + len(self.successfullyTune))) #if len(self.failedTune) == 0 and len(self.successfullyTune) == 0: #self["untestable_counter"].setText(str(int(self["untestable_counter"].getText()) + 1)) self.currentlyTestedIndex = self.getNextIndex() self.addProgressListItem(self.currentlyTestedIndex) if self.fillTransponderList(): self.run(checkPIDs = True) else: self.running = False self["progress_list"].setIndex(0) print "results:", self.results print "resultsstatus:", self.resultsstatus if self.log: file = open("/media/hdd/diseqctester.log", "w") self.setResultType(ResultParser.TYPE_ALL) file.write(self.getTextualResult()) file.close() self.session.open(MessageBox, text=_("The results have been written to %s.") % "/media/hdd/diseqctester.log", type = MessageBox.TYPE_INFO) def go(self): self.running = True self["failed_counter"].setText("0") self["succeeded_counter"].setText("0") self["untestable_counter"].setText("0") self.currentlyTestedIndex = self.getFirstIndex() self.clearProgressList() self.addProgressListItem(self.currentlyTestedIndex) if self.fillTransponderList(): self.run(True) def keyCancel(self): self.close() def select(self): print "selectedIndex:", self["progress_list"].getCurrent()[0] if not self.running: index = self["progress_list"].getCurrent()[0] #self.setResultType(ResultParser.TYPE_BYORBPOS) #self.setResultParameter(index[2]) self.setResultType(ResultParser.TYPE_BYINDEX) self.setResultParameter(index) #self.setResultType(ResultParser.TYPE_ALL) self.session.open(TextBox, self.getTextualResult()) def selectionChanged(self): print "selection changed" if len(self.list) > 0 and not self.running: self["CmdText"].setText(_("Press OK to get further details for %s") % str(self["progress_list"].getCurrent()[1])) class DiseqcTesterTestTypeSelection(Screen, ConfigListScreen): def __init__(self, session, feid): Screen.__init__(self, session) # for the skin: first try MediaPlayerSettings, then Setup, this allows individual skinning self.skinName = ["DiseqcTesterTestTypeSelection", "Setup" ] self.setup_title = _("DiSEqC-Tester settings") self.onChangedEntry = [ ] self.feid = feid self.list = [] ConfigListScreen.__init__(self, self.list, session = self.session, on_change = self.changedEntry) self["actions"] = ActionMap(["SetupActions"], { "cancel": self.keyCancel, "save": self.keyOK, "ok": self.keyOK, }, -2) self["key_red"] = StaticText(_("Cancel")) self["key_green"] = StaticText(_("OK")) self.createSetup() self.onLayoutFinish.append(self.layoutFinished) def layoutFinished(self): self.setTitle(self.setup_title) def createSetup(self): self.testtype = ConfigSelection(choices={"quick": _("Quick"), "random": _("Random"), "complete": _("Complete")}, default = "quick") self.testtypeEntry = getConfigListEntry(_("Test Type"), self.testtype) self.list.append(self.testtypeEntry) self.loopsfailed = ConfigSelection(choices={"-1": "Every known", "1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6", "7": "7", "8": "8"}, default = "3") self.loopsfailedEntry = getConfigListEntry(_("Stop testing plane after # failed transponders"), self.loopsfailed) self.list.append(self.loopsfailedEntry) self.loopssuccessful = ConfigSelection(choices={"-1": "Every known", "1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6", "7": "7", "8": "8"}, default = "1") self.loopssuccessfulEntry = getConfigListEntry(_("Stop testing plane after # successful transponders"), self.loopssuccessful) self.list.append(self.loopssuccessfulEntry) self.log = ConfigYesNo(False) if harddiskmanager.HDDCount() > 0: self.logEntry = getConfigListEntry(_("Log results to harddisk"), self.log) self.list.append(self.logEntry) self["config"].list = self.list self["config"].l.setList(self.list) def keyOK(self): print self.testtype.getValue() testtype = DiseqcTester.TEST_TYPE_QUICK if self.testtype.getValue() == "quick": testtype = DiseqcTester.TEST_TYPE_QUICK elif self.testtype.getValue() == "random": testtype = DiseqcTester.TEST_TYPE_RANDOM elif self.testtype.getValue() == "complete": testtype = DiseqcTester.TEST_TYPE_COMPLETE self.session.open(DiseqcTester, feid = self.feid, test_type = testtype, loopsfailed = int(self.loopsfailed.value), loopssuccessful = int(self.loopssuccessful.value), log = self.log.value) def keyCancel(self): self.close() # for summary: def changedEntry(self): for x in self.onChangedEntry: x() def getCurrentEntry(self): return self["config"].getCurrent()[0] def getCurrentValue(self): return str(self["config"].getCurrent()[1].getText()) def createSummary(self): from Screens.Setup import SetupSummary return SetupSummary class DiseqcTesterNimSelection(NimSelection): skin = """ {"template": [ MultiContentEntryText(pos = (10, 5), size = (360, 30), flags = RT_HALIGN_LEFT, text = 1), # index 1 is the nim name, MultiContentEntryText(pos = (50, 30), size = (320, 30), font = 1, flags = RT_HALIGN_LEFT, text = 2), # index 2 is a description of the nim settings, ], "fonts": [gFont("Regular", 20), gFont("Regular", 15)], "itemHeight": 70 } """ def __init__(self, session, args = None): NimSelection.__init__(self, session) def setResultClass(self): #self.resultclass = DiseqcTester self.resultclass = DiseqcTesterTestTypeSelection def showNim(self, nim): nimConfig = nimmanager.getNimConfig(nim.slot) if nim.isCompatible("DVB-S"): if nimConfig.configMode.value in ("loopthrough", "equal", "satposdepends", "nothing"): return False if nimConfig.configMode.value == "simple": if nimConfig.diseqcMode.value == "positioner": return True return True return False def DiseqcTesterMain(session, **kwargs): session.open(DiseqcTesterNimSelection) def autostart(reason, **kwargs): resourcemanager.addResource("DiseqcTester", DiseqcTesterMain) def Plugins(**kwargs): return [ PluginDescriptor(name="DiSEqC Tester", description=_("Test DiSEqC settings"), where = PluginDescriptor.WHERE_PLUGINMENU, needsRestart = False, fnc=DiseqcTesterMain), PluginDescriptor(where = PluginDescriptor.WHERE_AUTOSTART, needsRestart = False, fnc = autostart)]