aboutsummaryrefslogtreecommitdiff
path: root/src/AtomFeed.py
blob: f99d10526aa12dede451796252620fda2a603ef8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
# CurlyTx Atom feed parser
# Copyright (C) 2011 Christian Weiske <cweiske@cweiske.de>
# License: GPLv3 or later

from twisted.web.client import getPage
from xml.etree.cElementTree import fromstring

import os

class AtomFeed:
    """ Simple XML parser that extracts pages from a atom feed """
    ns = "{http://www.w3.org/2005/Atom}"
    nsc = "{http://ns.cweiske.de/curlytx}"
    errorCallback = None

    def __init__(self, url, callback, errorCallback):
        """ Fetches the URL

        Parsed pages are sent back to callback by parse()
        """
        self.errorCallback = errorCallback
        if (url.startswith('file://')):
            file = url[7:]
            if not os.path.exists(file):
                errorCallback('Settings atom feed file does not exist: ' + file)
                return

            with open(file, 'r') as f:
                self.parse(f.read(), callback)
        else:
            getPage(url).addCallback(self.parse, callback).addErrback(self.onError)

    def parse(self, data, callback):
        """ Parse atom feed data into pages list and run callback """
        try:
            xml = fromstring(data)
        except Exception:
            return self.errorCallback("Invalid XML")

        pages = []
        for entry in xml.findall("{0}entry".format(self.ns)):
            titleE = entry.find("{0}title".format(self.ns))
            url   = self.bestLink(entry.findall("{0}link".format(self.ns)))
            if titleE != None and titleE.text != "" and url != None:
                pages.append({"title": titleE.text, "url": url})

        settings = dict()
        for entry in list(xml):
            if (entry.tag.startswith(self.nsc)):
                settings[entry.tag[len(self.nsc):]] = entry.text

        callback(pages, settings)

    def bestLink(self, list):
        """ Fetch the best matching link from an atom feed entry """
        foundLevel = -1
        foundHref = None
        for link in list:
            if link.get("rel") != "alternate" and link.get("rel") != "":
                continue
            level = self.level(link)
            if foundLevel > level:
                continue
            foundLevel = level
            foundHref = link.get("href")
        return foundHref

    def level(self, link):
        """ Determines the level of a link

        "text/plain" type links are best, links without type are second.
        All others have the lowest level 1.
        """
        type = link.get("type")
        if type == "text/plain":
            return 3
        elif type == "":
            return 2
        return 1

    def onError(self, error):
        """ Pass the error message only """
        self.errorCallback(error.getErrorMessage())