2rss/main.py

171 lines
4.7 KiB
Python
Raw Normal View History

2017-12-29 12:10:25 +00:00
import json
import dateutil
2017-12-29 14:45:37 +00:00
import configparser
2017-12-29 12:10:25 +00:00
from datetime import datetime, timezone
from flask import Flask, request, abort
from feedgen.feed import FeedGenerator
from pony import orm
from pony.orm import Required, Optional, Set, desc
2017-12-29 12:10:25 +00:00
2017-12-29 14:45:37 +00:00
config = configparser.ConfigParser()
config.read('settings.ini')
2017-12-29 12:10:25 +00:00
2017-12-29 14:45:37 +00:00
site_url = config['general']['site_url']
if site_url[:-1] != '/':
site_url += '/'
site_url += 'feeds/'
2017-12-29 12:10:25 +00:00
2017-12-29 14:45:37 +00:00
app = Flask(__name__)
2017-12-29 12:10:25 +00:00
db = orm.Database()
class Feed(db.Entity):
slug = Required(str, unique=True)
admin_token = Required(str)
title = Required(str)
atom_id = Required(str) # also link
description = Required(str)
login = Optional(str)
password = Optional(str)
items = Set('Item')
class Item(db.Entity):
feed = Required(Feed)
atom_id = Required(str)
title = Required(str)
content = Required(str)
date = Required(datetime)
2017-12-29 14:45:37 +00:00
db_config = config['db']
db_adapter = db_config['adapter']
if db_adapter == 'sqlite':
filename = db_config['filename']
db.bind(provider='sqlite', filename=filename, create_db=True)
elif db_adapter == 'postgres':
user = db_config['user']
password = db_config['password']
host = db_config['host']
database = db_config['database']
db.bind(provider='postgres', user=user, password=password, host=host, database=database)
else:
raise Exception('unhandled db adapter: {}'.format(db_adapter))
2017-12-29 12:10:25 +00:00
db.generate_mapping(create_tables=True)
def does_slug_exist(slug):
with orm.db_session:
return len(orm.select(f for f in Feed if f.slug == slug)[:1])
@app.route('/feeds', methods=['GET', 'POST'])
@orm.db_session
2017-12-29 12:10:25 +00:00
def add_feed():
if request.method == 'GET':
feeds = orm.select(f for f in Feed if f.password is '')
links = []
for f in feeds:
2017-12-31 12:26:05 +00:00
link = '<li><a href={}{}>{}</a></li>'.format(site_url, f.slug, f.title)
links.append(link)
return """
<body>
2017-12-31 12:26:05 +00:00
<ul>
{}
2017-12-31 12:26:05 +00:00
</ul>
</body>
""".format('\n'.join(links))
2017-12-29 12:10:25 +00:00
data = request.get_json(silent=True)
if 'slug' not in data:
abort(400, 'missing slug')
slug = data['slug']
if does_slug_exist(slug):
abort(400, "feed with this slug already exists")
if 'title' not in data or 'description' not in data or 'admin_token' not in data:
abort(400, "missing title or description or admin_token")
2017-12-29 14:45:37 +00:00
url = site_url + slug
2017-12-29 12:10:25 +00:00
feed = Feed(slug=slug,
admin_token=data['admin_token'],
title=data['title'],
atom_id=url,
description=data['description'])
2017-12-29 12:10:25 +00:00
if 'login' in data and 'password' in data:
feed.login = data['login']
feed.password = data['password']
2017-12-29 12:10:25 +00:00
return "OK"
2017-12-29 12:10:25 +00:00
@app.route('/feeds/<slug>', methods=['GET', 'POST'])
@orm.db_session
def by_feed_name(slug):
if not does_slug_exist(slug):
abort(404, "unknown slug")
feed = orm.select(f for f in Feed if f.slug == slug)[:1][0]
if feed.password is not '':
auth = request.authorization
if not auth or auth.username != feed.login or auth.password != feed.password:
abort(401, "missing or incorrect password")
if request.method == 'POST':
data = request.get_json(silent=True)
if 'title' not in data or 'content' not in data or 'admin_token' not in data:
abort(400, "missing title or content")
if data['admin_token'] != feed.admin_token:
abort(401, "incorrect admin_token")
item_id = feed.atom_id + '/' + str(len(feed.items))
date = datetime.now(dateutil.tz.tzutc())
with orm.db_session:
item = Item(feed=feed,
atom_id=item_id,
title=data['title'],
content=data['content'],
date=date)
return "OK"
gen = FeedGenerator()
gen.title(feed.title)
gen.id(feed.atom_id)
gen.link(href=feed.atom_id)
gen.description(feed.description)
# Take the items by reversed date order. When adding them in the RSS feed,
# order will be reversed again (first in array will end up last in the
# feed), so explicitly reverse one more time.
items = reversed(feed.items.order_by(desc(Item.date))[:20])
for item in items:
2017-12-29 12:10:25 +00:00
fe = gen.add_entry()
fe.id(item.atom_id)
fe.title(item.title)
fe.content(item.content)
date = item.date.replace(tzinfo=timezone.utc)
2017-12-31 13:08:46 +00:00
fe.published(date)
2017-12-29 12:10:25 +00:00
2017-12-31 13:08:46 +00:00
return gen.rss_str(pretty=True).decode('utf8')
if __name__ == '__main__':
debug_str = config['general'].get('debug', 'False')
debug = debug_str != 'False' and debug_str != 'false'
orm.set_sql_debug(debug)
app.run(debug=debug, host='0.0.0.0')