fenen/fenen.py

609 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
# Fenen Feed Reader
# Copyright (C) 2023 Jake Bauer <jbauer@paritybit.ca>
# Licensed under the terms of the ISC License, see LICENSE for details.
import readline
import signal
import subprocess
import sys
import tempfile
import html
import textwrap
import re
import datetime as dt
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
from xml.etree import ElementTree
# External dependency
import feedparser
# Internal dependencies
from config import Config
from html_converter import HTMLConverter
from database import Database
from ui_context import UIContext
def signal_handler(sig, frame):
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Takes a range like "1-4,8,10"; converts it to a list like [1, 2, 3, 4, 8, 10]
def interpret_range(string):
indexlist = []
if ui.context == "all":
maxIndex = db.get_num_feeds()
elif ui.context == "feed":
maxIndex = db.get_num_entries(ui.data["feed"]["table_name"])
elif ui.context in ["unread", "search"]:
maxIndex = len(ui.data["items"])
if string == "*":
return list(range(0, maxIndex))
for i in string.split(","):
try:
if "-" not in i:
if int(i) > maxIndex:
print("Item", i, "doesn't exist.")
return []
indexlist.append(int(i))
else:
low, high = map(int, i.split("-"))
if high > maxIndex:
print("Item", i, "doesn't exist.")
return []
indexlist += range(low, high + 1)
except ValueError:
print("Invalid item number or range:", i)
return []
# Transform nice-for-humans indices into nice-for-computers indices
for i in range(len(indexlist)):
indexlist[i] = indexlist[i] - 1
return indexlist
# TODO: add support for inputting a regular page and parsing out a feed link
# FIXME: be more robust about accepting non-feed URLs (check ContentType?)
def add_feed(url):
if not url.startswith("http://") and not url.startswith("https://"):
url = "http://" + url
db.insert_feed([url])
def delete_feed(indices):
if not (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
print("Can only delete feeds, not posts.")
return
indices = interpret_range(indices)
response = input(
"Are you sure you want to delete "
f'{"these feeds" if len(indices) > 1 else "this feed"}? [y/N] '
)
if not response or not "yes".startswith(response.lower()):
return
urls = []
tables = []
for i in indices:
if ui.context == "search":
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
ui.data["items"].pop(i)
else:
feed = db.get_feed_by_index(i)
tables.append(feed["table_name"])
urls.append(feed["url"])
db.remove_feeds(urls, tables)
def change_feed(indices):
if not (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
print("Can only change feed information, not post information.")
return
indices = interpret_range(indices)
# Get all feeds first so changes in the database don't mess up the order
feeds = []
for i in indices:
if ui.context == "search":
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
else:
feeds.append(db.get_feed_by_index(i))
for feed in feeds:
print(f'Feed: {feed["url"]}')
try:
name = input(f' Name [{feed["name"]}]: ')
category = input(f' Category [{feed["category"]}]: ')
except EOFError:
print("")
break
db.update_feed(name, category, feed["url"])
def import_feeds(file):
try:
with open(file) as f:
tree = ElementTree.parse(f)
except FileNotFoundError:
print("File", file, "not found.")
return
except ElementTree.ParseError as e:
print(f"Failed to parse {file}: {e}")
return
count = 0
for node in tree.findall(".//outline"):
url = node.attrib.get("xmlUrl", "")
name = html.unescape(node.attrib.get("text", ""))
custom_name = 1 if name else 0
site_url = node.attrib.get("htmlUrl", "")
category = html.unescape(node.attrib.get("category", ""))
if url and not db.get_feed_by_url(url):
db.insert_feed([url, name, custom_name, site_url, category], True)
count += 1
print(f'{count if count > 0 else "No"} new feeds imported.')
def export_feeds(file):
with open(file, "w") as f:
f.write('<opml version="2.0">\n')
f.write("<head>\n\t<docs>http://opml.org/spec2.opml</docs>\n</head>\n")
f.write("<body>\n")
for feed in db.get_all_feeds():
name = html.escape(feed["name"]) if feed["name"] else ""
url = feed["url"]
site_url = feed["site_url"] if feed["site_url"] else ""
category = html.escape(feed["category"]) if feed["category"] else ""
f.write(
f'\t<outline type="rss" text="{name}" xmlUrl="{url}" htmlUrl="{site_url}" category="{category}"/>\n'
)
f.write("</body>\n</opml>")
print("OPML file generated.")
def mark_read(indices, mark_read):
indices = interpret_range(indices)
unread_status = "0" if mark_read else "1"
feeds = []
entries = []
for i in indices:
if ui.context == "all":
feeds.append(db.get_feed_by_index(i))
elif ui.context == "feed":
entries.append({"table": ui.data["feed"]["table_name"], "id": i})
elif ui.context == "unread":
entries.append(
{
"table": ui.data["items"][i]["table_name"],
"id": ui.data["items"][i]["id"],
}
)
elif ui.context == "search":
if ui.prev_context == "feed":
entries.append(
{
"table": ui.data["feed"]["table_name"],
"id": ui.data["items"][i]["id"],
}
)
elif ui.prev_context == "all":
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
if feeds:
db.change_unread_status_of_feeds(feeds, unread_status)
else:
db.change_unread_status_of_entries(entries, unread_status)
def get_url(index):
if ui.context == "all":
feed = db.get_feed_by_index(index)
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
elif ui.context == "feed":
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], index)
url = entry["url"] if "url" in entry.keys() else None
elif ui.context == "unread":
entry = db.get_entry_by_id(
ui.data["items"][index]["table_name"], ui.data["items"][index]["id"]
)
url = entry["url"] if "url" in entry.keys() else None
elif ui.context == "search":
if ui.prev_context == "all":
feed = db.get_feed_by_url(ui.data["items"][index]["url"])
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
elif ui.prev_context == "feed":
entry = db.get_entry_by_id(
ui.data["feed"]["table_name"], ui.data["items"][index]["id"]
)
url = entry["url"] if "url" in entry.keys() else None
return url
def open_in_browser(indices=None):
browser = conf.get_value("browser").split(" ")
try:
if not indices:
subprocess.Popen(browser + [ui.data["post_url"]], stdout=subprocess.DEVNULL)
return
indices = interpret_range(indices)
for i in indices:
url = get_url(i)
if url:
subprocess.Popen(browser + [url], stdout=subprocess.DEVNULL)
sleep(0.1) # Wait a bit so the browser opens URLs in the correct order
else:
print(f"Entry {i + 1} has no associated URL.")
except FileNotFoundError:
print(
f"Error opening browser: could not find the program '{' '.join(browser)}'."
)
return
def download_entry(indices=None):
if not indices:
subprocess.run(
f'{conf.get_value("downloader")} {ui.data["post_url"]}', shell=True
)
return
indices = interpret_range(indices)
for i in indices:
url = get_url(i)
if url:
subprocess.run(f'{conf.get_value("downloader")} {url}', shell=True)
else:
print(f"Entry {i + 1} has no associated URL.")
def display_entry(entry, index=None):
if ui.context == "unread":
feed_name = ui.data["items"][index]["feed_name"]
else:
feed_name = ui.data["feed"]["name"]
html_converter = HTMLConverter()
html_converter.feed(entry["content"])
paragraphs = html_converter.text.splitlines()
text = re.sub(r"\n\n+", "\n\n", "\n".join(textwrap.fill(p, 80) for p in paragraphs))
content = text + "\n\n" + "\n".join(html_converter.links)
output = (
f'Title: {entry["title"]}\n'
f"From: {feed_name}\n"
f'Published: {entry["date"]}\n'
f'URL: {entry["url"]}\n\n'
f"{content}\n"
)
with tempfile.NamedTemporaryFile("w") as f:
f.write(output)
f.flush()
subprocess.run(f'{conf.get_value("pager")} {f.name}', shell=True)
ui.data["post_url"] = entry["url"]
def print_feeds(feeds):
n = 1
for feed in feeds:
num_unread = db.get_unread_count(feed["table_name"])
name = feed["name"] if feed["name"] else feed["url"]
category = feed["category"] if feed["category"] else "Uncategorized"
print(f"{n}) [{category}] {name} ({num_unread} unread)")
n += 1
def print_feed_entries(feed, entries):
print(f'{feed["name"]} - {feed["url"]}')
n = 1
for entry in entries:
marker = "* " if entry["unread"] else ""
print(f'{n}) {marker}{entry["date"]} {entry["title"]}')
n += 1
def show(indices=None):
if indices:
if "all".startswith(indices):
ui.change_context("all")
indices = None
elif "unread".startswith(indices):
ui.change_context("unread")
indices = None
else:
indices = interpret_range(indices)
if not indices:
return
if indices and (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
for i in indices:
if ui.context == "search":
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
else:
feed = db.get_feed_by_index(i)
entries = db.get_all_entries(feed["table_name"])
print_feed_entries(feed, entries)
ui.change_context("feed")
ui.data["feed"] = feed
elif ui.context == "all":
print_feeds(db.get_all_feeds())
elif indices and (
ui.context == "feed" or (ui.context == "search" and ui.prev_context == "feed")
):
for i in indices:
if ui.context == "search":
entry = db.get_entry_by_id(
ui.data["feed"]["table_name"], ui.data["items"][i]["id"]
)
else:
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], i)
display_entry(entry)
if entry["unread"]:
mark_read(str(i + 1), True)
elif ui.context == "feed":
entries = db.get_all_entries(feed["table_name"])
print_feed_entries(ui.data["feed"], entries)
elif indices and ui.context == "unread":
for i in indices:
entry = db.get_entry_by_id(
ui.data["items"][i]["table_name"], ui.data["items"][i]["id"]
)
display_entry(entry, i)
if entry["unread"]:
mark_read(str(i + 1), True)
elif ui.context == "unread":
ui.change_context("unread")
for feed in db.get_all_feeds():
items = db.get_unread_entries(feed["table_name"])
for entry in items:
ui.data["items"].append(
{
"date": entry["date"],
"title": entry["title"],
"unread": entry["unread"],
"feed_name": feed["name"],
"table_name": feed["table_name"],
"id": entry["id"],
}
)
ui.data["items"] = sorted(ui.data["items"], key=lambda entry: entry["date"])
if len(ui.data["items"]) == 0:
print("No unread entries.")
ui.revert_context()
n = 1
for entry in ui.data["items"]:
print(f'{n}) * {entry["date"]} {entry["feed_name"]} - {entry["title"]}')
n += 1
elif ui.context == "search":
search(ui.data["search_query"]) # Re-run the search
def search(query):
query = "%" + query + "%"
if ui.context == "all":
ui.change_context("search")
ui.data["search_query"] = query
results = db.search_feeds(query)
for feed in results:
ui.data["items"].append({"url": feed["url"]})
print_feeds(results)
elif ui.context == "feed":
results = db.search_entries(ui.data["feed"]["table_name"], query)
feed = ui.data["feed"] # Store because changing context erases this
ui.change_context("search")
ui.data["search_query"] = query
ui.data["feed"] = feed
for entry in results:
ui.data["items"].append({"id": entry["id"]})
print_feed_entries(feed, results)
elif ui.context == "unread":
print("Can't search unread entries.")
return
elif ui.context == "search":
# Run this search in the same context as the previous search
ui.revert_context()
search(query)
return
if len(ui.data["items"]) == 0:
print("No search results.")
return
def load_feed(url):
db = Database(conf.get_value("db"))
entries_to_insert = []
entries_to_update = []
try:
feed = feedparser.parse(url)
if "status" not in feed:
print(f"\nError loading feed: {url}")
print(feed.bozo_exception)
return 1
if feed.status not in [200, 301, 302, 307, 308]:
print(f"\nError loading feed: {url}: HTTP Code {feed.status}")
return 1
feed_title = feed.feed.get("title", "No Feed Title")
site_url = feed.feed.get("link", None)
if db.feed_missing_name(url):
db.populate_feed_name(url, feed_title)
if db.feed_missing_site_url(url):
db.populate_feed_site_url(url, site_url)
table_name = db.get_feed_by_url(url)["table_name"]
existing_ids = [entry["id"] for entry in db.get_all_entries(table_name)]
for entry in feed.entries:
entry_title = entry.get("title", "No Title")
entry_url = entry.get("link", "No URL")
entry_id = entry.get("id", entry_url)
if "summary" in entry:
entry_content = entry["summary"]
else:
entry_content = entry.get("content", "")
if "published_parsed" in entry and entry.published_parsed:
date = entry.published_parsed
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
elif "updated_parsed" in entry and entry.updated_parsed:
date = entry.updated_parsed
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
else:
entry_date = dt.date.today()
if entry_id in existing_ids:
entries_to_update.append(
(entry_title, entry_url, entry_content, entry_id)
)
else:
entries_to_insert.append(
(
entry_id,
entry_date,
entry_title,
entry_url,
entry_content,
)
)
db.insert_entries(table_name, entries_to_insert)
db.update_entries(table_name, entries_to_update)
return 0
except Exception as e:
print("\nUnhandled error with URL:", url, "->", type(e), e)
return 1
def refresh_feeds():
feeds = db.get_all_feeds()
total_jobs = len(feeds)
finished_jobs = 0
start = default_timer()
with ThreadPoolExecutor(max_workers=conf.get_value("threads")) as executor:
print(f"Refreshed 0/{total_jobs} feeds", end="\r", flush=True)
for result in executor.map(load_feed, [feed["url"] for feed in feeds]):
if result == 0:
finished_jobs += 1
print(
f"Refreshed {finished_jobs}/{total_jobs} feeds",
end="\r",
flush=True,
)
times = str(dt.timedelta(seconds=round(default_timer() - start))).split(":")[1:]
tot_time = (times[0].lstrip("0") + "m") if int(times[0]) > 0 else ""
tot_time += (times[1].lstrip("0") + "s") if int(times[1]) > 0 else "0s"
print(f"\nFinished in {tot_time} ", end="")
if finished_jobs < total_jobs:
print(f"({total_jobs - finished_jobs} failed)")
else:
print("")
def print_help():
print(
"""
add <url> - Add a feed (<url> must point to feed directly).
delete <n> - Delete feed(s).
change <n> - Change the name/category of feed(s).
show [all|unread|<n>] - Show all feeds, unread entries, or the given item(s)>
import [file] - Import feeds from [file] or feeds.opml>
export [file] - Export feeds to [file] or feeds.opml.
refresh - Refresh all feeds to check for new content.
mark <n> - Mark item(s) as read.
unmark <n> - Mark item(s) as unread.
open <n> - Open item(s) in the browser.
get <n> - Download item(s) from the Internet.
vacuum - Clean up free space in the database.
help|? - Print this message.
/<query> - Search the current items.
quit - Exit fenen.
<n> can be one or more integers or ranges of integers separated by commas. For
example, "2" or "1-4,8,10".
Short forms of commands may also be used. For example "s u" instead of "show
unread" or "d" instead of "delete". "list" and "print" are aliases of "show".
"""
)
conf = Config()
conf.parse_config()
db = Database(conf.get_value("db"))
db.init_db()
ui = UIContext()
print("Fenen Feed Reader v0.3.0/2023-11-27. Type ? for help.")
if __name__ == "__main__":
while True:
try:
command = input(f"fenen ({ui.context})> ").split()
except EOFError:
print("")
sys.exit(0)
if not command:
continue
# Special handling for "/<query>" command
if command[0][0] == "/" and len(command[0]) > 1:
command.append(command[0][1:])
command[0] = command[0][0]
# Allow ed-like syntax (e.g. s1, o2-5 in addition to s 1, o 2-5)
elif (
len(command[0]) > 1
and command[0][0].isalpha()
and (command[0][1].isdigit() or command[0][1] == "*")
):
command.append(command[0][1:])
command[0] = command[0][0]
args = command[1:]
command = command[0]
if "add".startswith(command):
add_feed(args[0]) if args else print("Usage: add <url>")
elif "delete".startswith(command):
delete_feed(args[0]) if args else print("Usage: delete <number(s)>")
elif "change".startswith(command):
change_feed(args[0]) if args else print("Usage: change <number(s)>")
elif "export".startswith(command):
export_feeds(args[0]) if args else export_feeds("feeds.opml")
elif "import".startswith(command):
import_feeds(args[0]) if args else import_feeds("feeds.opml")
elif "mark".startswith(command):
mark_read(args[0], True) if args else print("Usage: mark <number(s)>")
elif "unmark".startswith(command):
mark_read(args[0], False) if args else print("Usage: unmark <number(s)>")
elif "open".startswith(command):
open_in_browser(args[0]) if args else open_in_browser(None) if ui.data[
"post_url"
] else print("Usage: open <number(s)>")
elif "get".startswith(command):
download_entry(args[0]) if args else download_entry(None) if ui.data[
"post_url"
] else print("Usage: get <number(s)>")
elif "refresh".startswith(command):
refresh_feeds()
elif (
"show".startswith(command)
or "list".startswith(command)
or "print".startswith(command)
):
show(args[0]) if args else show()
elif "vacuum".startswith(command):
db.vacuum()
elif "help".startswith(command) or "?".startswith(command):
print_help()
elif "/".startswith(command):
search(args[0]) if args else print("Usage: /<query>")
elif "quit".startswith(command):
sys.exit(0)
else:
print(f"Unrecognized command: {command}. Try 'help' for help.")