171 lines
4.7 KiB
Python
171 lines
4.7 KiB
Python
import json
|
|
import dateutil
|
|
import configparser
|
|
from datetime import datetime, timezone
|
|
|
|
from flask import Flask, request, abort
|
|
from feedgen.feed import FeedGenerator
|
|
|
|
from pony import orm
|
|
from pony.orm import Required, Optional, Set, desc
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read('settings.ini')
|
|
|
|
site_url = config['general']['site_url']
|
|
if site_url[:-1] != '/':
|
|
site_url += '/'
|
|
site_url += 'feeds/'
|
|
|
|
app = Flask(__name__)
|
|
db = orm.Database()
|
|
|
|
class Feed(db.Entity):
|
|
slug = Required(str, unique=True)
|
|
admin_token = Required(str)
|
|
title = Required(str)
|
|
atom_id = Required(str) # also link
|
|
description = Required(str)
|
|
login = Optional(str)
|
|
password = Optional(str)
|
|
items = Set('Item')
|
|
|
|
class Item(db.Entity):
|
|
feed = Required(Feed)
|
|
atom_id = Required(str)
|
|
title = Required(str)
|
|
content = Required(str)
|
|
date = Required(datetime)
|
|
|
|
db_config = config['db']
|
|
db_adapter = db_config['adapter']
|
|
|
|
if db_adapter == 'sqlite':
|
|
filename = db_config['filename']
|
|
db.bind(provider='sqlite', filename=filename, create_db=True)
|
|
elif db_adapter == 'postgres':
|
|
user = db_config['user']
|
|
password = db_config['password']
|
|
host = db_config['host']
|
|
database = db_config['database']
|
|
db.bind(provider='postgres', user=user, password=password, host=host, database=database)
|
|
else:
|
|
raise Exception('unhandled db adapter: {}'.format(db_adapter))
|
|
|
|
db.generate_mapping(create_tables=True)
|
|
|
|
def does_slug_exist(slug):
|
|
with orm.db_session:
|
|
return len(orm.select(f for f in Feed if f.slug == slug)[:1])
|
|
|
|
|
|
@app.route('/feeds', methods=['GET', 'POST'])
|
|
@orm.db_session
|
|
def add_feed():
|
|
if request.method == 'GET':
|
|
feeds = orm.select(f for f in Feed if f.password is '')
|
|
|
|
links = []
|
|
for f in feeds:
|
|
link = '<li><a href={}{}>{}</a></li>'.format(site_url, f.slug, f.title)
|
|
links.append(link)
|
|
|
|
return """
|
|
<body>
|
|
<ul>
|
|
{}
|
|
</ul>
|
|
</body>
|
|
""".format('\n'.join(links))
|
|
|
|
data = request.get_json(silent=True)
|
|
|
|
if 'slug' not in data:
|
|
abort(400, 'missing slug')
|
|
|
|
slug = data['slug']
|
|
|
|
if does_slug_exist(slug):
|
|
abort(400, "feed with this slug already exists")
|
|
|
|
if 'title' not in data or 'description' not in data or 'admin_token' not in data:
|
|
abort(400, "missing title or description or admin_token")
|
|
|
|
url = site_url + slug
|
|
|
|
feed = Feed(slug=slug,
|
|
admin_token=data['admin_token'],
|
|
title=data['title'],
|
|
atom_id=url,
|
|
description=data['description'])
|
|
|
|
if 'login' in data and 'password' in data:
|
|
feed.login = data['login']
|
|
feed.password = data['password']
|
|
|
|
return "OK"
|
|
|
|
|
|
@app.route('/feeds/<slug>', methods=['GET', 'POST'])
|
|
@orm.db_session
|
|
def by_feed_name(slug):
|
|
if not does_slug_exist(slug):
|
|
abort(404, "unknown slug")
|
|
|
|
feed = orm.select(f for f in Feed if f.slug == slug)[:1][0]
|
|
|
|
if feed.password is not '':
|
|
auth = request.authorization
|
|
if not auth or auth.username != feed.login or auth.password != feed.password:
|
|
abort(401, "missing or incorrect password")
|
|
|
|
if request.method == 'POST':
|
|
data = request.get_json(silent=True)
|
|
|
|
if 'title' not in data or 'content' not in data or 'admin_token' not in data:
|
|
abort(400, "missing title or content")
|
|
|
|
if data['admin_token'] != feed.admin_token:
|
|
abort(401, "incorrect admin_token")
|
|
|
|
item_id = feed.atom_id + '/' + str(len(feed.items))
|
|
date = datetime.now(dateutil.tz.tzutc())
|
|
|
|
with orm.db_session:
|
|
item = Item(feed=feed,
|
|
atom_id=item_id,
|
|
title=data['title'],
|
|
content=data['content'],
|
|
date=date)
|
|
|
|
return "OK"
|
|
|
|
gen = FeedGenerator()
|
|
gen.title(feed.title)
|
|
gen.id(feed.atom_id)
|
|
gen.link(href=feed.atom_id)
|
|
gen.description(feed.description)
|
|
|
|
# Take the items by reversed date order. When adding them in the RSS feed,
|
|
# order will be reversed again (first in array will end up last in the
|
|
# feed), so explicitly reverse one more time.
|
|
items = reversed(feed.items.order_by(desc(Item.date))[:20])
|
|
for item in items:
|
|
fe = gen.add_entry()
|
|
fe.id(item.atom_id)
|
|
fe.title(item.title)
|
|
fe.content(item.content)
|
|
|
|
date = item.date.replace(tzinfo=timezone.utc)
|
|
fe.published(date)
|
|
|
|
return gen.rss_str(pretty=True).decode('utf8')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
debug_str = config['general'].get('debug', 'False')
|
|
debug = debug_str != 'False' and debug_str != 'false'
|
|
|
|
orm.set_sql_debug(debug)
|
|
app.run(debug=debug, host='0.0.0.0')
|