From 5595a7b12c24048762e3893cf30da0936d430602 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Stefan=20K=C3=B6gl?= Date: Thu, 31 Mar 2011 20:10:46 +0300 Subject: [PATCH] Subscribe to Feeds that contain Pubsubhubbub-Hubs stores such feeds with increased expiry-time --- feedservice/feeddownloader.py | 30 +++++++ feedservice/main.py | 2 + feedservice/pubsubhubbub.py | 190 ++++++++++++++++++++++++++++++++++++++++++ feedservice/settings.py | 2 + feedservice/urlstore.py | 6 +- 5 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 feedservice/pubsubhubbub.py create mode 100644 feedservice/settings.py diff --git a/feedservice/feeddownloader.py b/feedservice/feeddownloader.py index a2d7130..d0d3839 100644 --- a/feedservice/feeddownloader.py +++ b/feedservice/feeddownloader.py @@ -134,6 +134,8 @@ def parse_feed(feed_url, inline_logo, scale_to, logo_format, strip_html, modifie for name, is_text, func in PROPERTIES: set_val(podcast, name, func, strip_html and is_text) + subscribe_at_hub(podcast) + return podcast, podcast.get('urls', None), podcast.get('new_location', None), last_modified @@ -146,6 +148,15 @@ def set_val(obj, name, func, remove_tags=False): obj[name] = val +def add_error(feed, key, msg): + """ Adds an error entry to the feed """ + + if not 'errors' in feed: + feed['errors'] = {} + + feed['errors'][key] = msg + + def get_podcast_logo(feed): cover_art = None image = feed.feed.get('image', None) @@ -400,3 +411,22 @@ def get_short_title(title, common_title): title = title.replace(common_title, '').strip() title = re.sub(r'^[\W\d]+', '', title) return title + + +def subscribe_at_hub(feed): + """ Tries to subscribe to the feed if it contains a hub URL """ + + if not feed.get('hub', False): + return + + import pubsubhubbub + + # use the last URL in the redirect chain + feed_url = feed['urls'][-1] + + hub_url = feed.get('hub') + + try: + pubsubhubbub.Subscriber.subscribe(feed_url, hub_url) + except pubsubhubbub.SubscriptionError, e: + add_error(feed, 'hub-subscription', repr(e)) diff --git a/feedservice/main.py b/feedservice/main.py index 6fc2556..5382a32 100644 --- a/feedservice/main.py +++ b/feedservice/main.py @@ -2,12 +2,14 @@ from google.appengine.ext import webapp from google.appengine.ext.webapp.util import run_wsgi_app import feeddownloader +import pubsubhubbub def main(): endpoints = [ ('/parse', feeddownloader.Parser), + ('/subscribe', pubsubhubbub.Subscriber) ] application = webapp.WSGIApplication(endpoints) diff --git a/feedservice/pubsubhubbub.py b/feedservice/pubsubhubbub.py new file mode 100644 index 0000000..047e89e --- /dev/null +++ b/feedservice/pubsubhubbub.py @@ -0,0 +1,190 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# PubSubHubbub subscriber for mygpo-feedservice +# +# + +import urllib, urllib2, urlparse, logging +from datetime import timedelta + +from google.appengine.ext import webapp, db + +import urlstore + + +# increased expiry time for subscribed feeds +INCREASED_EXPIRY = timedelta(days=7) + + +class SubscriptionError(Exception): + pass + + +class SubscribedFeed(db.Model): + url = db.StringProperty() + verify_token = db.StringProperty() + mode = db.StringProperty() + verified = db.BooleanProperty() + + + +class Subscriber(webapp.RequestHandler): + """ request handler for pubsubhubbub subscriptions """ + + + def get(self): + """ Callback used by the Hub to verify the subscription request """ + + # received arguments: hub.mode, hub.topic, hub.challenge, + # hub.lease_seconds, hub.verify_token + mode = self.request.get('hub.mode') + feed_url = self.request.get('hub.topic') + challenge = self.request.get('hub.challenge') + lease_seconds = self.request.get('hub.lease_seconds') + verify_token = self.request.get('hub.verify_token') + + logging.debug('received subscription-parameters: mode: %(mode)s, ' + + 'topic: %(topic)s, challenge: %(challenge)s, lease_seconds: ' + + '%(lease_seconds)s, verify_token: %(verify_token)s' % \ + dict(mode=mode, topic=feed_url, challenge=challenge, + lease_seconds=lease_seconds, verify_token=verify_token)) + + subscription = Subscriber.get_subscription(feed_url) + + if subscription is None: + logging.warn('subscription does not exist') + self.response.set_status(404) + return + + if subscription.mode != mode: + logging.warn('invalid mode, %s expected' % + subscription.mode) + self.response.set_status(404) + return + + if subscription.verify_token != verify_token: + logging.warn('invalid verify_token, %s expected' % + subscribe.verify_token) + self.response.set_status(404) + return + + subscription.verified = True + subscription.put() + + logging.info('subscription confirmed') + self.response.set_status(200) + self.response.out.write(challenge) + + + + def post(self): + """ Callback to notify about a feed update """ + + feed_url = self.request.get('url') + + logging.info('received notification for %s' % feed_url) + + subscription = Subscriber.get_subscription(feed_url) + + if subscription is None: + logging.warn('no subscription for this URL') + self.response.set_status(400) + return + + if subscription.mode != 'subscribe': + logging.warn('invalid subscription mode: %s' % subscription.mode) + self.response.set_status(400) + return + + if not subscription.verified: + logging.warn('the subscription has not yet been verified') + self.response.set_status(400) + return + + # The changed parts in the POST data are ignored -- we simply fetch the + # whole feed. + # It is stored in memcache with all the normal (unsubscribed) feeds + # but with increased expiry time. + urlstore.fetch_url(feed_url, add_expires=INCREASED_EXPIRY) + + self.response.set_status(200) + + + @staticmethod + def get_subscription(feedurl): + q = SubscribedFeed.all() + q.filter('url =', feedurl) + return q.get() + + + @staticmethod + def subscribe(feedurl, huburl): + """ Subscribe to the feed at a Hub """ + + logging.info('subscribing for %(feed)s at %(hub)s' % + dict(feed=feedurl, hub=huburl)) + + verify_token = Subscriber.generate_verify_token() + + mode = 'subscribe' + verify = 'sync' + + data = { + "hub.callback": Subscriber.get_callback_url(feedurl), + "hub.mode": mode, + "hub.topic": feedurl, + "hub.verify": verify, + "hub.verify_token": verify_token, + } + + subscription = Subscriber.get_subscription(feedurl) + if subscription is not None: + + if subscription.mode == mode: + logging.info('subscription already exists') + return + + else: + logging.info('subscription exists but has wrong mode: ' + + 'old: %(oldmode)s, new: %(newmode)s. Overwriting.' % + dict(oldmode=subscription.mode, newmode=mode)) + + else: + subscription = SubscribedFeed() + + subscription.url = feedurl + subscription.verify_token = verify_token + subscription.mode = mode + subscription.verified = False + subscription.put() + + data = urllib.urlencode(data) + logging.debug('sending request: %s' % repr(data)) + + try: + resp = urllib2.urlopen(huburl, data) + except Exception as e: + logging.error('Could not send subscription to Hub: %s' % repr(e)) + raise SubscriptionError(e) + + status = resp.code + if status != 204: + logging.error('received incorrect status %d' % status) + raise SubscriptionError('Subscription has not been accepted by the Hub') + + + + @staticmethod + def get_callback_url(feedurl): + import settings + url = urlparse.urljoin(settings.BASE_URL, 'subscribe') + + param = urllib.urlencode([('url', feedurl)]) + return '%s?%s' % (url, param) + + + @staticmethod + def generate_verify_token(length=32): + import random, string + return "".join(random.sample(string.letters+string.digits, length)) diff --git a/feedservice/settings.py b/feedservice/settings.py new file mode 100644 index 0000000..a8d7e29 --- /dev/null +++ b/feedservice/settings.py @@ -0,0 +1,2 @@ + +BASE_URL='http://mygpo-feedservice.appengine.com/' diff --git a/feedservice/urlstore.py b/feedservice/urlstore.py index 825c345..c952ea9 100644 --- a/feedservice/urlstore.py +++ b/feedservice/urlstore.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta import time import urllib2 from email import utils @@ -50,7 +50,7 @@ def from_cache(url): return memcache.get(url) -def fetch_url(url, cached=None): +def fetch_url(url, cached=None, add_expires=timedelta()): """ Fetches the given URL and stores the resulting object in the Cache """ @@ -70,7 +70,7 @@ def fetch_url(url, cached=None): r = opener.open(request) obj = cached or URLObject(url=url) obj.content = r.read() - obj.expires = parse_header_date(r.headers.dict.get('expires', None)) + obj.expires = parse_header_date(r.headers.dict.get('expires', None)) + add_expires obj.modified = parse_header_date(r.headers.dict.get('last-modified', None)) obj.etag = r.headers.dict.get('etag', None) -- 2.11.4.GIT