link to bugtracker
[mygpo-feedservice.git] / feedservice / feeddownloader.py
bloba61b31e1baacb5f5c84227023f2da4da37bdf741
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
6 import urlstore
7 import youtube
8 from mimetype import get_mimetype, check_mimetype, get_podcast_types
11 def parse_feeds(feed_urls, *args, **kwargs):
12 """
13 Parses several feeds, specified by feed_urls and returns their JSON
14 objects and the latest of their modification dates. RSS-Redirects are
15 followed automatically by including both feeds in the result.
16 """
18 visited_urls = set()
19 result = []
20 last_modified = None
22 for url in feed_urls:
23 res, visited, new, last_mod = parse_feed(url, *args, **kwargs)
25 if not res:
26 continue
28 visited_urls.update(visited)
30 # we follow RSS-redirects automatically
31 if new and new not in (list(visited_urls) + feed_urls):
32 feed_urls.append(new)
34 if not last_modified or (last_mod and last_mod > last_modified):
35 last_modified = last_mod
37 result.append(res)
39 return result, last_modified
42 def parse_feed(feed_url, inline_logo, scale_to, strip_html, modified):
43 """
44 Parses a feed and returns its JSON object, a list of urls that refer to
45 this feed, an outgoing redirect and the timestamp of the last modification
46 of the feed
47 """
49 import feedparser
50 from urls import get_redirects
52 feed_url, feed_content, last_modified = urlstore.get_url(feed_url)
54 if last_modified and modified and last_modified <= modified:
55 return None, None, None, None
57 feed = feedparser.parse(feed_content)
58 feed.feed.link = feed_url
60 podcast = dict()
62 PROPERTIES = (
63 ('title', True, lambda: feed.feed.get('title', None)),
64 ('link', False, lambda: feed.feed.get('link', None)),
65 ('description', True, lambda: feed.feed.get('subtitle', None)),
66 ('author', True, lambda: feed.feed.get('author', feed.feed.get('itunes_author', None))),
67 ('language', False, lambda: feed.feed.get('language', None)),
68 ('urls', False, lambda: get_redirects(feed_url)),
69 ('new_location', False, lambda: get_newlocation(feed)),
70 ('logo', False, lambda: get_podcast_logo(feed)),
71 ('logo_data', False, lambda: get_data_uri(inline_logo, podcast['logo'], scale_to, modified)),
72 ('tags', False, lambda: get_feed_tags(feed.feed)),
73 ('episodes', False, lambda: get_episodes(feed, strip_html)),
74 ('content_types', False, lambda: get_podcast_types(podcast)),
77 for name, is_text, func in PROPERTIES:
78 set_val(podcast, name, func, strip_html and is_text)
80 return podcast, podcast.get('urls', None), podcast.get('new_location', None), last_modified
83 def set_val(obj, name, func, remove_tags):
84 from utils import remove_html_tags
86 val = func()
87 if remove_tags: val = remove_html_tags(val)
88 if val is not None:
89 obj[name] = val
92 def get_newlocation(feed):
93 if 'newlocation' in feed.feed:
94 return feed.feed.newlocation
95 else:
96 return None
99 def get_podcast_logo(feed):
100 cover_art = None
101 image = feed.feed.get('image', None)
102 if image is not None:
103 for key in ('href', 'url'):
104 cover_art = getattr(image, key, None)
105 if cover_art:
106 break
108 yturl = youtube.get_real_cover(feed.feed.link)
109 if yturl:
110 cover_art = yturl
112 return cover_art
115 def get_data_uri(inline_logo, url, size=None, modified_since=None):
116 import base64
117 from google.appengine.api import images
119 if not inline_logo:
120 return None
122 url, content, last_modified = urlstore.get_url(url)
124 if last_modified and modified_since and last_modified <= modified:
125 return None
127 if size:
128 img = images.Image(content)
129 content = images.resize(content, min(size, img.width), min(size, img.height))
131 mimetype = get_mimetype(None, url)
132 encoded = base64.b64encode(content)
133 return 'data:%s;base64,%s' % (mimetype, encoded)
136 def get_feed_tags(feed):
137 tags = []
139 for tag in feed.get('tags', []):
140 if tag['term']:
141 tags.extend([t for t in tag['term'].split(',') if t])
143 if tag['label']:
144 tags.append(tag['label'])
146 return list(set(tags))
149 def get_episodes(feed, strip_html):
150 episodes = []
151 for entry in feed.entries:
152 urls = get_episode_files(entry)
153 if not urls:
154 continue
156 e = get_episode_metadata(entry, urls, strip_html)
157 episodes.append(e)
158 return episodes
162 def get_episode_files(entry):
163 """Get the download / episode URL of a feedparser entry"""
165 urls = {}
166 enclosures = getattr(entry, 'enclosures', [])
167 for enclosure in enclosures:
168 if 'href' in enclosure:
169 mimetype = get_mimetype(enclosure.get('type', ''), enclosure['href'])
170 if check_mimetype(mimetype):
171 try:
172 filesize = int(enclosure['length'])
173 except ValueError:
174 filesize = None
175 urls[enclosure['href']] = (mimetype, filesize)
177 media_content = getattr(entry, 'media_content', [])
178 for media in media_content:
179 if 'url' in media:
180 mimetype = get_mimetype(media.get('type', ''), media['url'])
181 if check_mimetype(mimetype):
182 urls[media['url']] = (mimetype, None)
184 links = getattr(entry, 'links', [])
185 for link in links:
186 if not hasattr(link, 'href'):
187 continue
189 if youtube.is_video_link(link['href']):
190 urls[link['href']] = ('application/x-youtube', None)
192 # XXX: Implement link detection as in gPodder
194 return urls
197 def get_episode_metadata(entry, files, strip_html):
199 PROPERTIES = (
200 ('title', True, lambda: entry.get('title', entry.get('link', None))),
201 ('description', True, lambda: get_episode_summary(entry)),
202 ('link', False, lambda: entry.get('link', None)),
203 ('author', True, lambda: entry.get('author', entry.get('itunes_author', None))),
204 ('duration', False, lambda: get_duration(entry)),
205 ('language', False, lambda: entry.get('language', None)),
206 ('files', False, lambda: get_files(files)),
207 ('timestamp', False, lambda: get_timestamp(entry)),
210 episode = {}
211 for name, is_text, func in PROPERTIES:
212 set_val(episode, name, func, strip_html and is_text)
214 return episode
218 def get_episode_summary(entry):
219 for key in ('summary', 'subtitle', 'link'):
220 value = entry.get(key, None)
221 if value:
222 return value
224 return None
227 def get_duration(entry):
228 from utils import parse_time
230 str = entry.get('itunes_duration', '')
231 try:
232 return parse_time(str)
233 except ValueError:
234 return None
237 def get_files(files):
238 f = []
239 for k, v in files.items():
240 file = dict(url=k)
241 if v[0]:
242 file['mimetype'] = v[0]
243 if v[1]:
244 file['filesize'] = v[1]
245 f.append(file)
246 return f
249 def get_timestamp(entry):
250 from datetime import datetime
251 try:
252 return datetime.datetime(*(entry.updated_parsed)[:6]).strftime('%Y-%m-%dT%H:%M:%S')
253 except:
254 return None