X-Git-Url: https://git.cweiske.de/enigma2.git/blobdiff_plain/216dbbfd18a36835b8a48d2f2e1e1ce2c65c8f14..f918deab76b3ebd790b64a5694805ecbf944ca66:/lib/python/Plugins/SystemPlugins/DiseqcTester/plugin.py
diff --git a/lib/python/Plugins/SystemPlugins/DiseqcTester/plugin.py b/lib/python/Plugins/SystemPlugins/DiseqcTester/plugin.py
index 57340368..bb4df74a 100644
--- a/lib/python/Plugins/SystemPlugins/DiseqcTester/plugin.py
+++ b/lib/python/Plugins/SystemPlugins/DiseqcTester/plugin.py
@@ -1,20 +1,135 @@
from Screens.Satconfig import NimSelection
from Screens.Screen import Screen
+from Screens.TextBox import TextBox
+from Screens.MessageBox import MessageBox
from Plugins.Plugin import PluginDescriptor
-from Components.ActionMap import NumberActionMap
+from Components.ActionMap import ActionMap, NumberActionMap
from Components.NimManager import nimmanager
from Components.ResourceManager import resourcemanager
from Components.Sources.FrontendStatus import FrontendStatus
from Components.TuneTest import TuneTest
+from Components.Sources.List import List
from Components.Sources.Progress import Progress
from Components.Sources.StaticText import StaticText
+from Components.ConfigList import ConfigListScreen
+from Components.config import getConfigListEntry, ConfigSelection, ConfigYesNo
+from Components.Harddisk import harddiskmanager
-class DiseqcTester(Screen, TuneTest):
+import random
+
+# always use:
+# setResultType(type)
+# setResultParameter(parameter)
+# getTextualResult()
+class ResultParser:
+ def __init__(self):
+ pass
+
+ TYPE_BYORBPOS = 0
+ TYPE_BYINDEX = 1
+ TYPE_ALL = 2
+ def setResultType(self, type):
+ self.type = type
+
+ def setResultParameter(self, parameter):
+ if self.type == self.TYPE_BYORBPOS:
+ self.orbpos = parameter
+ elif self.type == self.TYPE_BYINDEX:
+ self.index = parameter
+
+ def getTextualResultForIndex(self, index, logfulltransponders = False):
+ text = ""
+ text += "%s:\n" % self.getTextualIndexRepresentation(index)
+
+ failed, successful = self.results[index]["failed"], self.results[index]["successful"]
+ countfailed = len(failed)
+ countsuccessful = len(successful)
+ countall = countfailed + countsuccessful
+ percentfailed = round(countfailed / float(countall + 0.0001) * 100)
+ percentsuccessful = round(countsuccessful / float(countall + 0.0001) * 100)
+ text += "Tested %d transponders\n%d (%d %%) transponders succeeded\n%d (%d %%) transponders failed\n" % (countall, countsuccessful, percentsuccessful, countfailed, percentfailed)
+ reasons = {}
+ if countfailed > 0:
+ for transponder in failed:
+ reasons[transponder[2]] = reasons.get(transponder[2], [])
+ reasons[transponder[2]].append(transponder)
+ if transponder[2] == "pids_failed":
+ print transponder[2], "-", transponder[3]
+
+ text += "The %d unsuccessful tuning attempts failed for the following reasons:\n" % countfailed
+
+ for reason in reasons.keys():
+ text += "%s: %d transponders failed\n" % (reason, len(reasons[reason]))
+
+ for reason in reasons.keys():
+ text += "\n"
+ text += "%s previous planes:\n" % reason
+ for transponder in reasons[reason]:
+ if transponder[1] is not None:
+ text += self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[1]))
+ else:
+ text += "No transponder tuned"
+ text += " ==> " + self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[0]))
+ text += "\n"
+ if logfulltransponders:
+ text += str(transponder[1])
+ text += " ==> "
+ text += str(transponder[0])
+ text += "\n"
+ if reason == "pids_failed":
+ text += "(tsid, onid): "
+ text += str(transponder[3]['real'])
+ text += "(read from sat) != "
+ text += str(transponder[3]['expected'])
+ text += "(read from file)"
+ text += "\n"
+ text += "\n"
+ if countsuccessful > 0:
+ text += "\n"
+ text += "Successfully tuned transponders' previous planes:\n"
+ for transponder in successful:
+ if transponder[1] is not None:
+ text += self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[1]))
+ else:
+ text += "No transponder tuned"
+ text += " ==> " + self.getTextualIndexRepresentation(self.getIndexForTransponder(transponder[0]))
+ text += "\n"
+ return text
+
+ def getTextualResult(self):
+ text = ""
+ if self.type == self.TYPE_BYINDEX:
+ text += self.getTextualResultForIndex(self.index)
+ elif self.type == self.TYPE_BYORBPOS:
+ for index in self.results.keys():
+ if index[2] == self.orbpos:
+ text += self.getTextualResultForIndex(index)
+ text += "\n-----------------------------------------------------\n"
+ elif self.type == self.TYPE_ALL:
+ orderedResults = {}
+ for index in self.results.keys():
+ orbpos = index[2]
+ orderedResults[orbpos] = orderedResults.get(orbpos, [])
+ orderedResults[orbpos].append(index)
+ ordered_orbpos = orderedResults.keys()
+ ordered_orbpos.sort()
+ for orbpos in ordered_orbpos:
+ text += "\n*****************************************\n"
+ text += "Orbital position %s:" % str(orbpos)
+ text += "\n*****************************************\n"
+ for index in orderedResults[orbpos]:
+ text += self.getTextualResultForIndex(index, logfulltransponders = True)
+ text += "\n-----------------------------------------------------\n"
+
+
+ return text
+
+class DiseqcTester(Screen, TuneTest, ResultParser):
skin = """
-
+
+
+
+ {"template": [
+ MultiContentEntryText(pos = (10, 0), size = (330, 25), flags = RT_HALIGN_LEFT, text = 1), # index 1 is the index name,
+ MultiContentEntryText(pos = (330, 0), size = (150, 25), flags = RT_HALIGN_RIGHT, text = 2) # index 2 is the status,
+ ],
+ "fonts": [gFont("Regular", 20)],
+ "itemHeight": 25
+ }
+
@@ -54,49 +180,109 @@ class DiseqcTester(Screen, TuneTest):
-
+
-
+
+
+
+
+
+
+
+
+
"""
TEST_TYPE_QUICK = 0
TEST_TYPE_RANDOM = 1
TEST_TYPE_COMPLETE = 2
- def __init__(self, session, feid, test_type = TEST_TYPE_QUICK):
+ def __init__(self, session, feid, test_type = TEST_TYPE_QUICK, loopsfailed = 3, loopssuccessful = 1, log = False):
Screen.__init__(self, session)
self.feid = feid
self.test_type = test_type
+ self.loopsfailed = loopsfailed
+ self.loopssuccessful = loopssuccessful
+ self.log = log
self["actions"] = NumberActionMap(["SetupActions"],
{
- "ok": self.keyGo,
+ "ok": self.select,
"cancel": self.keyCancel,
}, -2)
- TuneTest.__init__(self, feid, stopOnSuccess = True)
- self["Frontend"] = FrontendStatus(frontend_source = lambda : self.frontend, update_interval = 100)
+ TuneTest.__init__(self, feid, stopOnSuccess = self.loopssuccessful, stopOnError = self.loopsfailed)
+ #self["Frontend"] = FrontendStatus(frontend_source = lambda : self.frontend, update_interval = 100)
self["overall_progress"] = Progress()
self["sub_progress"] = Progress()
- self["failed_counter"] = StaticText("10")
- self["succeeded_counter"] = StaticText("10")
+
+ self["failed_counter"] = StaticText("0")
+ self["succeeded_counter"] = StaticText("0")
+ self["witherrors_counter"] = StaticText("0")
+ self["untestable_counter"] = StaticText("0")
+
+ self.list = []
+ self["progress_list"] = List(self.list)
+ self["progress_list"].onSelectionChanged.append(self.selectionChanged)
+
+ self["CmdText"] = StaticText(_("Please wait while scanning is in progress..."))
self.indexlist = {}
self.readTransponderList()
+
+ self.running = False
+
+ self.results = {}
+ self.resultsstatus = {}
+
+ self.onLayoutFinish.append(self.go)
+
+ def getProgressListComponent(self, index, status):
+ return (index, self.getTextualIndexRepresentation(index), status)
+
+ def clearProgressList(self):
+ self.list = []
+ self["progress_list"].list = self.list
+
+ def addProgressListItem(self, index):
+ if index in self.indexlist:
+ for entry in self.list:
+ if entry[0] == index:
+ self.changeProgressListStatus(index, "working")
+ return
+ self.list.append(self.getProgressListComponent(index, _("working")))
+ self["progress_list"].list = self.list
+ self["progress_list"].setIndex(len(self.list) - 1)
+
+ def changeProgressListStatus(self, index, status):
+ self.newlist = []
+ count = 0
+ indexpos = 0
+ for entry in self.list:
+ if entry[0] == index:
+ self.newlist.append(self.getProgressListComponent(index, status))
+ indexpos = count
+ else:
+ self.newlist.append(entry)
+ count += 1
+ self.list = self.newlist
+ self["progress_list"].list = self.list
+ self["progress_list"].setIndex(indexpos)
def readTransponderList(self):
for sat in nimmanager.getSatListForNim(self.feid):
for transponder in nimmanager.getTransponders(sat[0]):
#print transponder
- mytransponder = (transponder[1] / 1000, transponder[2] / 1000, transponder[3], transponder[4], transponder[5], sat[0], None, None, transponder[10], transponder[11])
+ mytransponder = (transponder[1] / 1000, transponder[2] / 1000, transponder[3], transponder[4], transponder[7], sat[0], transponder[5], transponder[6], transponder[8], transponder[9], transponder[10], transponder[11])
self.analyseTransponder(mytransponder)
def getIndexForTransponder(self, transponder):
+
if transponder[0] < 11700:
band = 1 # low
else:
band = 0 # high
-
+
polarisation = transponder[2]
sat = transponder[5]
@@ -117,8 +303,7 @@ class DiseqcTester(Screen, TuneTest):
print "getTextualIndexRepresentation:", index
text = ""
- # TODO better sat representation
- text += "%s, " % index[2]
+ text += nimmanager.getSatDescription(index[2]) + ", "
if index[0] == 1:
text += "Low Band, "
@@ -157,43 +342,268 @@ class DiseqcTester(Screen, TuneTest):
if self.test_type == self.TEST_TYPE_QUICK:
self.myindex = 0
keys = self.indexlist.keys()
+ keys.sort(key = lambda a: a[2]) # sort by orbpos
self["overall_progress"].setRange(len(keys))
self["overall_progress"].setValue(self.myindex)
return keys[0]
-
+ elif self.test_type == self.TEST_TYPE_RANDOM:
+ self.randomkeys = self.indexlist.keys()
+ random.shuffle(self.randomkeys)
+ self.myindex = 0
+ self["overall_progress"].setRange(len(self.randomkeys))
+ self["overall_progress"].setValue(self.myindex)
+ return self.randomkeys[0]
+ elif self.test_type == self.TEST_TYPE_COMPLETE:
+ keys = self.indexlist.keys()
+ print "keys:", keys
+ successorindex = {}
+ for index in keys:
+ successorindex[index] = []
+ for otherindex in keys:
+ if otherindex != index:
+ successorindex[index].append(otherindex)
+ random.shuffle(successorindex[index])
+ self.keylist = []
+ stop = False
+ currindex = None
+ while not stop:
+ if currindex is None or len(successorindex[currindex]) == 0:
+ oldindex = currindex
+ for index in successorindex.keys():
+ if len(successorindex[index]) > 0:
+ currindex = index
+ self.keylist.append(currindex)
+ break
+ if currindex == oldindex:
+ stop = True
+ else:
+ currindex = successorindex[currindex].pop()
+ self.keylist.append(currindex)
+ print "self.keylist:", self.keylist
+ self.myindex = 0
+ self["overall_progress"].setRange(len(self.keylist))
+ self["overall_progress"].setValue(self.myindex)
+ return self.keylist[0]
+
+
# after each index is finished, getNextIndex is called to get the next index to scan
def getNextIndex(self):
# TODO use other function to scan more randomly
if self.test_type == self.TEST_TYPE_QUICK:
self.myindex += 1
keys = self.indexlist.keys()
+ keys.sort(key = lambda a: a[2]) # sort by orbpos
self["overall_progress"].setValue(self.myindex)
if self.myindex < len(keys):
return keys[self.myindex]
else:
return None
-
+ elif self.test_type == self.TEST_TYPE_RANDOM:
+ self.myindex += 1
+ keys = self.randomkeys
+
+ self["overall_progress"].setValue(self.myindex)
+ if self.myindex < len(keys):
+ return keys[self.myindex]
+ else:
+ return None
+ elif self.test_type == self.TEST_TYPE_COMPLETE:
+ self.myindex += 1
+ keys = self.keylist
+
+ self["overall_progress"].setValue(self.myindex)
+ if self.myindex < len(keys):
+ return keys[self.myindex]
+ else:
+ return None
+
# after each index is finished and the next index is returned by getNextIndex
# the algorithm checks, if we should continue scanning
def getContinueScanning(self):
- if self.test_type == self.TEST_TYPE_QUICK:
+ if self.test_type == self.TEST_TYPE_QUICK or self.test_type == self.TEST_TYPE_RANDOM:
return (self.myindex < len(self.indexlist.keys()))
+ elif self.test_type == self.TEST_TYPE_COMPLETE:
+ return (self.myindex < len(self.keylist))
+
+ def addResult(self, index, status, failedTune, successfullyTune):
+ self.results[index] = self.results.get(index, {"failed": [], "successful": [], "status": None, "internalstatus": None})
+ self.resultsstatus[status] = self.resultsstatus.get(status, [])
+
+ oldstatus = self.results[index]["internalstatus"]
+ if oldstatus is None:
+ self.results[index]["status"] = status
+ elif oldstatus == "successful":
+ if status == "failed":
+ self.results[index]["status"] = "with_errors"
+ elif status == "successful":
+ self.results[index]["status"] = oldstatus
+ elif status == "with_errors":
+ self.results[index]["status"] = "with_errors"
+ elif status == "not_tested":
+ self.results[index]["status"] = oldstatus
+ elif oldstatus == "failed":
+ if status == "failed":
+ self.results[index]["status"] = oldstatus
+ elif status == "successful":
+ self.results[index]["status"] = "with_errors"
+ elif status == "with_errors":
+ self.results[index]["status"] = "with_errors"
+ elif status == "not_tested":
+ self.results[index]["status"] = oldstatus
+ elif oldstatus == "with_errors":
+ if status == "failed":
+ self.results[index]["status"] = oldstatus
+ elif status == "successful":
+ self.results[index]["status"] = oldstatus
+ elif status == "with_errors":
+ self.results[index]["status"] = oldstatus
+ elif status == "not_tested":
+ self.results[index]["status"] = oldstatus
+ elif oldstatus == "not_tested":
+ self.results[index]["status"] = status
+
+ if self.results[index]["status"] != "working":
+ self.results[index]["internalstatus"] = self.results[index]["status"]
+ self.results[index]["failed"] = failedTune + self.results[index]["failed"]
+ self.results[index]["successful"] = successfullyTune + self.results[index]["successful"]
+
+ self.resultsstatus[status].append(index)
def finishedChecking(self):
print "finishedChecking"
TuneTest.finishedChecking(self)
+
+ if not self.results.has_key(self.currentlyTestedIndex):
+ self.results[self.currentlyTestedIndex] = {"failed": [], "successful": [], "status": None, "internalstatus": None}
+
+ if len(self.failedTune) > 0 and len(self.successfullyTune) > 0:
+ self.changeProgressListStatus(self.currentlyTestedIndex, "with errors")
+ self["witherrors_counter"].setText(str(int(self["witherrors_counter"].getText()) + 1))
+ self.addResult(self.currentlyTestedIndex, "with_errors", self.failedTune, self.successfullyTune)
+ elif len(self.failedTune) == 0 and len(self.successfullyTune) == 0:
+ self.changeProgressListStatus(self.currentlyTestedIndex, "not tested")
+ self["untestable_counter"].setText(str(int(self["untestable_counter"].getText()) + 1))
+ self.addResult(self.currentlyTestedIndex, "untestable", self.failedTune, self.successfullyTune)
+ elif len(self.failedTune) > 0:
+ self.changeProgressListStatus(self.currentlyTestedIndex, "failed")
+ #self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + len(self.failedTune)))
+ self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + 1))
+ self.addResult(self.currentlyTestedIndex, "failed", self.failedTune, self.successfullyTune)
+ else:
+ self.changeProgressListStatus(self.currentlyTestedIndex, "successful")
+ #self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + len(self.successfullyTune)))
+ self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + 1))
+ self.addResult(self.currentlyTestedIndex, "successful", self.failedTune, self.successfullyTune)
+
+
+ #self["failed_counter"].setText(str(int(self["failed_counter"].getText()) + len(self.failedTune)))
+ #self["succeeded_counter"].setText(str(int(self["succeeded_counter"].getText()) + len(self.successfullyTune)))
+ #if len(self.failedTune) == 0 and len(self.successfullyTune) == 0:
+ #self["untestable_counter"].setText(str(int(self["untestable_counter"].getText()) + 1))
+
self.currentlyTestedIndex = self.getNextIndex()
+ self.addProgressListItem(self.currentlyTestedIndex)
+
if self.fillTransponderList():
self.run(checkPIDs = True)
+ else:
+ self.running = False
+ self["progress_list"].setIndex(0)
+ print "results:", self.results
+ print "resultsstatus:", self.resultsstatus
+ if self.log:
+ file = open("/media/hdd/diseqctester.log", "w")
+ self.setResultType(ResultParser.TYPE_ALL)
+ file.write(self.getTextualResult())
+ file.close()
+ self.session.open(MessageBox, text=_("The results have been written to %s.") % "/media/hdd/diseqctester.log", type = MessageBox.TYPE_INFO)
- def keyGo(self):
+ def go(self):
+ self.running = True
+ self["failed_counter"].setText("0")
+ self["succeeded_counter"].setText("0")
+ self["untestable_counter"].setText("0")
self.currentlyTestedIndex = self.getFirstIndex()
+
+ self.clearProgressList()
+ self.addProgressListItem(self.currentlyTestedIndex)
+
if self.fillTransponderList():
self.run(True)
def keyCancel(self):
self.close()
+
+ def select(self):
+ print "selectedIndex:", self["progress_list"].getCurrent()[0]
+ if not self.running:
+ index = self["progress_list"].getCurrent()[0]
+ #self.setResultType(ResultParser.TYPE_BYORBPOS)
+ #self.setResultParameter(index[2])
+ self.setResultType(ResultParser.TYPE_BYINDEX)
+ self.setResultParameter(index)
+ #self.setResultType(ResultParser.TYPE_ALL)
+ self.session.open(TextBox, self.getTextualResult())
+
+ def selectionChanged(self):
+ print "selection changed"
+ if len(self.list) > 0 and not self.running:
+ self["CmdText"].setText(_("Press OK to get further details for %s") % str(self["progress_list"].getCurrent()[1]))
+
+class DiseqcTesterTestTypeSelection(Screen, ConfigListScreen):
+ skin = """
+
+
+ """
+ def __init__(self, session, feid):
+ Screen.__init__(self, session)
+ self.feid = feid
+
+ self.list = []
+ ConfigListScreen.__init__(self, self.list)
+
+ self["actions"] = ActionMap(["SetupActions"],
+ {
+ "cancel": self.keyCancel
+ }, -2)
+
+ self.createSetup()
+
+ def createSetup(self):
+ self.testtype = ConfigSelection(choices={"quick": _("Quick"), "random": _("Random"), "complete": _("Complete")}, default = "quick")
+ self.testtypeEntry = getConfigListEntry(_("Test Type"), self.testtype)
+ self.list.append(self.testtypeEntry)
+
+ self.loopsfailed = ConfigSelection(choices={"-1": "Every known", "1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6", "7": "7", "8": "8"}, default = "3")
+ self.loopsfailedEntry = getConfigListEntry(_("Stop testing plane after # failed transponders"), self.loopsfailed)
+ self.list.append(self.loopsfailedEntry)
+
+ self.loopssuccessful = ConfigSelection(choices={"-1": "Every known", "1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6", "7": "7", "8": "8"}, default = "1")
+ self.loopssuccessfulEntry = getConfigListEntry(_("Stop testing plane after # successful transponders"), self.loopssuccessful)
+ self.list.append(self.loopssuccessfulEntry)
+
+ self.log = ConfigYesNo(False)
+ if harddiskmanager.HDDCount() > 0:
+ self.logEntry = getConfigListEntry(_("Log results to harddisk"), self.log)
+ self.list.append(self.logEntry)
+
+ self["config"].list = self.list
+ self["config"].l.setList(self.list)
+
+ def keyOK(self):
+ print self.testtype.getValue()
+ testtype = DiseqcTester.TEST_TYPE_QUICK
+ if self.testtype.getValue() == "quick":
+ testtype = DiseqcTester.TEST_TYPE_QUICK
+ elif self.testtype.getValue() == "random":
+ testtype = DiseqcTester.TEST_TYPE_RANDOM
+ elif self.testtype.getValue() == "complete":
+ testtype = DiseqcTester.TEST_TYPE_COMPLETE
+ self.session.open(DiseqcTester, feid = self.feid, test_type = testtype, loopsfailed = int(self.loopsfailed.value), loopssuccessful = int(self.loopssuccessful.value), log = self.log.value)
+
+ def keyCancel(self):
+ self.close()
class DiseqcTesterNimSelection(NimSelection):
skin = """
@@ -215,16 +625,17 @@ class DiseqcTesterNimSelection(NimSelection):
NimSelection.__init__(self, session)
def setResultClass(self):
- self.resultclass = DiseqcTester
+ #self.resultclass = DiseqcTester
+ self.resultclass = DiseqcTesterTestTypeSelection
def showNim(self, nim):
nimConfig = nimmanager.getNimConfig(nim.slot)
if nim.isCompatible("DVB-S"):
- if nimConfig.configMode.value in ["loopthrough", "equal", "satposdepends", "nothing"]:
+ if nimConfig.configMode.value in ("loopthrough", "equal", "satposdepends", "nothing"):
return False
if nimConfig.configMode.value == "simple":
if nimConfig.diseqcMode.value == "positioner":
- return False
+ return True
return True
return False
@@ -236,4 +647,4 @@ def autostart(reason, **kwargs):
def Plugins(**kwargs):
return [ PluginDescriptor(name="DiSEqC Tester", description=_("Test DiSEqC settings"), where = PluginDescriptor.WHERE_PLUGINMENU, fnc=DiseqcTesterMain),
- PluginDescriptor(where = PluginDescriptor.WHERE_AUTOSTART, fnc = autostart)]
\ No newline at end of file
+ PluginDescriptor(where = PluginDescriptor.WHERE_AUTOSTART, fnc = autostart)]