611 lines
21 KiB
Python
Executable File
611 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Fenen Feed Reader
|
|
# Copyright (C) 2023 Jake Bauer <jbauer@paritybit.ca>
|
|
# Licensed under the terms of the ISC License, see LICENSE for details.
|
|
|
|
import readline
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import html
|
|
import textwrap
|
|
import re
|
|
import datetime as dt
|
|
from time import sleep
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from timeit import default_timer
|
|
from xml.etree import ElementTree
|
|
|
|
# External dependency
|
|
import feedparser
|
|
|
|
# Internal dependencies
|
|
from config import Config
|
|
from html_converter import HTMLConverter
|
|
from database import Database
|
|
from ui_context import UIContext
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
sys.exit(0)
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
|
# Takes a range like "1-4,8,10"; converts it to a list like [1, 2, 3, 4, 8, 10]
|
|
def interpret_range(string):
|
|
indexlist = []
|
|
if ui.context == "all":
|
|
maxIndex = db.get_num_feeds()
|
|
elif ui.context == "feed":
|
|
maxIndex = db.get_num_entries(ui.data["feed"]["table_name"])
|
|
elif ui.context in ["unread", "search"]:
|
|
maxIndex = len(ui.data["items"])
|
|
if string == "*":
|
|
return list(range(0, maxIndex))
|
|
for i in string.split(","):
|
|
try:
|
|
if "-" not in i:
|
|
if int(i) > maxIndex:
|
|
print("Item", i, "doesn't exist.")
|
|
return []
|
|
indexlist.append(int(i))
|
|
else:
|
|
low, high = map(int, i.split("-"))
|
|
if high > maxIndex:
|
|
print("Item", i, "doesn't exist.")
|
|
return []
|
|
indexlist += range(low, high + 1)
|
|
except ValueError:
|
|
print("Invalid item number or range:", i)
|
|
return []
|
|
# Transform nice-for-humans indices into nice-for-computers indices
|
|
for i in range(len(indexlist)):
|
|
indexlist[i] = indexlist[i] - 1
|
|
return indexlist
|
|
|
|
|
|
# TODO: add support for inputting a regular page and parsing out a feed link
|
|
# FIXME: be more robust about accepting non-feed URLs (check ContentType?)
|
|
def add_feed(url):
|
|
if not url.startswith("http://") and not url.startswith("https://"):
|
|
url = "http://" + url
|
|
db.insert_feed([url])
|
|
|
|
|
|
def delete_feed(indices):
|
|
if not (
|
|
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
|
|
):
|
|
print("Can only delete feeds, not posts.")
|
|
return
|
|
indices = interpret_range(indices)
|
|
response = input(
|
|
"Are you sure you want to delete "
|
|
f'{"these feeds" if len(indices) > 1 else "this feed"}? [y/N] '
|
|
)
|
|
if not response or not "yes".startswith(response.lower()):
|
|
return
|
|
urls = []
|
|
tables = []
|
|
for i in indices:
|
|
if ui.context == "search":
|
|
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
|
|
ui.data["items"].pop(i)
|
|
else:
|
|
feed = db.get_feed_by_index(i)
|
|
tables.append(feed["table_name"])
|
|
urls.append(feed["url"])
|
|
db.remove_feeds(urls, tables)
|
|
|
|
|
|
def change_feed(indices):
|
|
if not (
|
|
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
|
|
):
|
|
print("Can only change feed information, not post information.")
|
|
return
|
|
indices = interpret_range(indices)
|
|
# Get all feeds first so changes in the database don't mess up the order
|
|
feeds = []
|
|
for i in indices:
|
|
if ui.context == "search":
|
|
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
|
|
else:
|
|
feeds.append(db.get_feed_by_index(i))
|
|
for feed in feeds:
|
|
print(f'Feed: {feed["url"]}')
|
|
try:
|
|
name = input(f' Name [{feed["name"]}]: ')
|
|
category = input(f' Category [{feed["category"]}]: ')
|
|
except EOFError:
|
|
print("")
|
|
break
|
|
db.update_feed(name, category, feed["url"])
|
|
|
|
|
|
def import_feeds(file):
|
|
try:
|
|
with open(file) as f:
|
|
tree = ElementTree.parse(f)
|
|
except FileNotFoundError:
|
|
print("File", file, "not found.")
|
|
return
|
|
except ElementTree.ParseError as e:
|
|
print(f"Failed to parse {file}: {e}")
|
|
return
|
|
count = 0
|
|
for node in tree.findall(".//outline"):
|
|
url = node.attrib.get("xmlUrl", "")
|
|
name = html.unescape(node.attrib.get("text", ""))
|
|
custom_name = 1 if name else 0
|
|
site_url = node.attrib.get("htmlUrl", "")
|
|
category = html.unescape(node.attrib.get("category", ""))
|
|
if url and not db.get_feed_by_url(url):
|
|
db.insert_feed([url, name, custom_name, site_url, category], True)
|
|
count += 1
|
|
print(f'{count if count > 0 else "No"} new feeds imported.')
|
|
|
|
|
|
def export_feeds(file):
|
|
with open(file, "w") as f:
|
|
f.write('<opml version="2.0">\n')
|
|
f.write("<head>\n\t<docs>http://opml.org/spec2.opml</docs>\n</head>\n")
|
|
f.write("<body>\n")
|
|
for feed in db.get_all_feeds():
|
|
name = html.escape(feed["name"]) if feed["name"] else ""
|
|
url = feed["url"]
|
|
site_url = feed["site_url"] if feed["site_url"] else ""
|
|
category = html.escape(feed["category"]) if feed["category"] else ""
|
|
f.write(
|
|
f'\t<outline type="rss" text="{name}" xmlUrl="{url}" htmlUrl="{site_url}" category="{category}"/>\n'
|
|
)
|
|
f.write("</body>\n</opml>")
|
|
print("OPML file generated.")
|
|
|
|
|
|
def mark_read(indices, mark_read):
|
|
indices = interpret_range(indices)
|
|
unread_status = "0" if mark_read else "1"
|
|
feeds = []
|
|
entries = []
|
|
for i in indices:
|
|
if ui.context == "all":
|
|
feeds.append(db.get_feed_by_index(i))
|
|
elif ui.context == "feed":
|
|
entries.append({"table": ui.data["feed"]["table_name"], "id": i})
|
|
elif ui.context == "unread":
|
|
entries.append(
|
|
{
|
|
"table": ui.data["items"][i]["table_name"],
|
|
"id": ui.data["items"][i]["id"],
|
|
}
|
|
)
|
|
elif ui.context == "search":
|
|
if ui.prev_context == "feed":
|
|
entries.append(
|
|
{
|
|
"table": ui.data["feed"]["table_name"],
|
|
"id": ui.data["items"][i]["id"],
|
|
}
|
|
)
|
|
elif ui.prev_context == "all":
|
|
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
|
|
if feeds:
|
|
db.change_unread_status_of_feeds(feeds, unread_status)
|
|
else:
|
|
db.change_unread_status_of_entries(entries, unread_status)
|
|
|
|
|
|
def get_url(index):
|
|
if ui.context == "all":
|
|
feed = db.get_feed_by_index(index)
|
|
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
|
|
elif ui.context == "feed":
|
|
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], index)
|
|
url = entry["url"] if "url" in entry.keys() else None
|
|
elif ui.context == "unread":
|
|
entry = db.get_entry_by_id(
|
|
ui.data["items"][index]["table_name"], ui.data["items"][index]["id"]
|
|
)
|
|
url = entry["url"] if "url" in entry.keys() else None
|
|
elif ui.context == "search":
|
|
if ui.prev_context == "all":
|
|
feed = db.get_feed_by_url(ui.data["items"][index]["url"])
|
|
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
|
|
elif ui.prev_context == "feed":
|
|
entry = db.get_entry_by_id(
|
|
ui.data["feed"]["table_name"], ui.data["items"][index]["id"]
|
|
)
|
|
url = entry["url"] if "url" in entry.keys() else None
|
|
return url
|
|
|
|
|
|
def open_in_browser(indices=None):
|
|
browser = conf.get_value("browser").split(" ")
|
|
try:
|
|
if not indices:
|
|
subprocess.Popen(browser + [ui.data["post_url"]], stdout=subprocess.DEVNULL)
|
|
return
|
|
indices = interpret_range(indices)
|
|
for i in indices:
|
|
url = get_url(i)
|
|
if url:
|
|
subprocess.Popen(browser + [url], stdout=subprocess.DEVNULL)
|
|
sleep(0.1) # Wait a bit so the browser opens URLs in the correct order
|
|
else:
|
|
print(f"Entry {i + 1} has no associated URL.")
|
|
except FileNotFoundError:
|
|
print(
|
|
f"Error opening browser: could not find the program '{' '.join(browser)}'."
|
|
)
|
|
return
|
|
|
|
|
|
def download_entry(indices=None):
|
|
if not indices:
|
|
subprocess.run(
|
|
f'{conf.get_value("downloader")} {ui.data["post_url"]}', shell=True
|
|
)
|
|
return
|
|
indices = interpret_range(indices)
|
|
for i in indices:
|
|
url = get_url(i)
|
|
if url:
|
|
subprocess.run(f'{conf.get_value("downloader")} {url}', shell=True)
|
|
else:
|
|
print(f"Entry {i + 1} has no associated URL.")
|
|
|
|
|
|
def display_entry(entry, index=None):
|
|
if ui.context == "unread":
|
|
feed_name = ui.data["items"][index]["feed_name"]
|
|
else:
|
|
feed_name = ui.data["feed"]["name"]
|
|
html_converter = HTMLConverter()
|
|
html_converter.feed(entry["content"])
|
|
paragraphs = html_converter.text.splitlines()
|
|
text = re.sub(r"\n\n+", "\n\n", "\n".join(textwrap.fill(p, 80) for p in paragraphs))
|
|
content = text + "\n\n" + "\n".join(html_converter.links)
|
|
output = (
|
|
f'Title: {entry["title"]}\n'
|
|
f"From: {feed_name}\n"
|
|
f'Published: {entry["date"]}\n'
|
|
f'URL: {entry["url"]}\n\n'
|
|
f"{content}\n"
|
|
)
|
|
with tempfile.NamedTemporaryFile("w") as f:
|
|
f.write(output)
|
|
f.flush()
|
|
subprocess.run(f'{conf.get_value("pager")} {f.name}', shell=True)
|
|
ui.data["post_url"] = entry["url"]
|
|
|
|
|
|
def print_feeds(feeds):
|
|
n = 1
|
|
for feed in feeds:
|
|
num_unread = db.get_unread_count(feed["table_name"])
|
|
name = feed["name"] if feed["name"] else feed["url"]
|
|
category = feed["category"] if feed["category"] else "Uncategorized"
|
|
print(f"{n}) [{category}] {name} ({num_unread} unread)")
|
|
n += 1
|
|
|
|
|
|
def print_feed_entries(feed, entries):
|
|
print(f'{feed["name"]} - {feed["url"]}')
|
|
n = 1
|
|
for entry in entries:
|
|
marker = "* " if entry["unread"] else ""
|
|
print(f'{n}) {marker}{entry["date"]} {entry["title"]}')
|
|
n += 1
|
|
|
|
|
|
def show(indices=None):
|
|
if indices:
|
|
if "all".startswith(indices):
|
|
ui.change_context("all")
|
|
indices = None
|
|
elif "unread".startswith(indices):
|
|
ui.change_context("unread")
|
|
indices = None
|
|
else:
|
|
indices = interpret_range(indices)
|
|
if not indices:
|
|
return
|
|
if indices and (
|
|
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
|
|
):
|
|
for i in indices:
|
|
if ui.context == "search":
|
|
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
|
|
else:
|
|
feed = db.get_feed_by_index(i)
|
|
entries = db.get_all_entries(feed["table_name"])
|
|
print_feed_entries(feed, entries)
|
|
ui.change_context("feed")
|
|
ui.data["feed"] = feed
|
|
elif ui.context == "all":
|
|
print_feeds(db.get_all_feeds())
|
|
elif indices and (
|
|
ui.context == "feed" or (ui.context == "search" and ui.prev_context == "feed")
|
|
):
|
|
for i in indices:
|
|
if ui.context == "search":
|
|
entry = db.get_entry_by_id(
|
|
ui.data["feed"]["table_name"], ui.data["items"][i]["id"]
|
|
)
|
|
else:
|
|
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], i)
|
|
display_entry(entry)
|
|
if entry["unread"]:
|
|
mark_read(str(i + 1), True)
|
|
elif ui.context == "feed":
|
|
entries = db.get_all_entries(feed["table_name"])
|
|
print_feed_entries(ui.data["feed"], entries)
|
|
elif indices and ui.context == "unread":
|
|
for i in indices:
|
|
entry = db.get_entry_by_id(
|
|
ui.data["items"][i]["table_name"], ui.data["items"][i]["id"]
|
|
)
|
|
display_entry(entry, i)
|
|
if entry["unread"]:
|
|
mark_read(str(i + 1), True)
|
|
elif ui.context == "unread":
|
|
ui.change_context("unread")
|
|
for feed in db.get_all_feeds():
|
|
items = db.get_unread_entries(feed["table_name"])
|
|
for entry in items:
|
|
ui.data["items"].append(
|
|
{
|
|
"date": entry["date"],
|
|
"title": entry["title"],
|
|
"unread": entry["unread"],
|
|
"feed_name": feed["name"],
|
|
"table_name": feed["table_name"],
|
|
"id": entry["id"],
|
|
}
|
|
)
|
|
ui.data["items"] = sorted(ui.data["items"], key=lambda entry: entry["date"])
|
|
if len(ui.data["items"]) == 0:
|
|
print("No unread entries.")
|
|
ui.revert_context()
|
|
n = 1
|
|
for entry in ui.data["items"]:
|
|
print(f'{n}) * {entry["date"]} {entry["feed_name"]} - {entry["title"]}')
|
|
n += 1
|
|
elif ui.context == "search":
|
|
search(ui.data["search_query"]) # Re-run the search
|
|
|
|
|
|
def search(query):
|
|
query = "%" + query + "%"
|
|
if ui.context == "all":
|
|
ui.change_context("search")
|
|
ui.data["search_query"] = query
|
|
results = db.search_feeds(query)
|
|
for feed in results:
|
|
ui.data["items"].append({"url": feed["url"]})
|
|
print_feeds(results)
|
|
elif ui.context == "feed":
|
|
results = db.search_entries(ui.data["feed"]["table_name"], query)
|
|
feed = ui.data["feed"] # Store because changing context erases this
|
|
ui.change_context("search")
|
|
ui.data["search_query"] = query
|
|
ui.data["feed"] = feed
|
|
for entry in results:
|
|
ui.data["items"].append({"id": entry["id"]})
|
|
print_feed_entries(feed, results)
|
|
elif ui.context == "unread":
|
|
print("Can't search unread entries.")
|
|
return
|
|
elif ui.context == "search":
|
|
# Run this search in the same context as the previous search
|
|
ui.revert_context()
|
|
search(query)
|
|
return
|
|
if len(ui.data["items"]) == 0:
|
|
print("No search results.")
|
|
return
|
|
|
|
|
|
def load_feed(url):
|
|
db = Database(conf.get_value("db"))
|
|
entries_to_insert = []
|
|
entries_to_update = []
|
|
|
|
try:
|
|
feed = feedparser.parse(url)
|
|
if "status" not in feed:
|
|
print(f"\nError loading feed: {url}")
|
|
print(feed.bozo_exception)
|
|
return 1
|
|
if feed.status not in [200, 301, 302, 307, 308]:
|
|
print(f"\nError loading feed: {url}: HTTP Code {feed.status}")
|
|
return 1
|
|
|
|
feed_title = feed.feed.get("title", "No Feed Title")
|
|
site_url = feed.feed.get("link", None)
|
|
|
|
if db.feed_missing_name(url):
|
|
db.populate_feed_name(url, feed_title)
|
|
if db.feed_missing_site_url(url):
|
|
db.populate_feed_site_url(url, site_url)
|
|
|
|
table_name = db.get_feed_by_url(url)["table_name"]
|
|
existing_ids = [entry["id"] for entry in db.get_all_entries(table_name)]
|
|
|
|
for entry in feed.entries:
|
|
entry_title = entry.get("title", "No Title")
|
|
entry_url = entry.get("link", "No URL")
|
|
entry_id = entry.get("id", entry_url)
|
|
if "summary" in entry:
|
|
entry_content = entry["summary"]
|
|
else:
|
|
entry_content = entry.get("content", "")
|
|
if "published_parsed" in entry and entry.published_parsed:
|
|
date = entry.published_parsed
|
|
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
|
|
elif "updated_parsed" in entry and entry.updated_parsed:
|
|
date = entry.updated_parsed
|
|
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
|
|
else:
|
|
entry_date = dt.date.today()
|
|
|
|
if entry_id in existing_ids:
|
|
entries_to_update.append(
|
|
(entry_title, entry_url, entry_content, entry_id)
|
|
)
|
|
else:
|
|
entries_to_insert.append(
|
|
(
|
|
entry_id,
|
|
entry_date,
|
|
entry_title,
|
|
entry_url,
|
|
entry_content,
|
|
)
|
|
)
|
|
|
|
db.insert_entries(table_name, entries_to_insert)
|
|
db.update_entries(table_name, entries_to_update)
|
|
return 0
|
|
except Exception as e:
|
|
print("\nUnhandled error with URL:", url, "->", type(e), e)
|
|
return 1
|
|
|
|
|
|
def refresh_feeds():
|
|
feeds = db.get_all_feeds()
|
|
total_jobs = len(feeds)
|
|
finished_jobs = 0
|
|
start = default_timer()
|
|
with ThreadPoolExecutor(max_workers=int(conf.get_value("threads"))) as executor:
|
|
print(f"Refreshed 0/{total_jobs} feeds", end="\r", flush=True)
|
|
futures = {executor.submit(load_feed, feed["url"]): feed for feed in feeds}
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
if result == 0:
|
|
finished_jobs += 1
|
|
print(
|
|
f"Refreshed {finished_jobs}/{total_jobs} feeds",
|
|
end="\r",
|
|
flush=True,
|
|
)
|
|
times = str(dt.timedelta(seconds=round(default_timer() - start))).split(":")[1:]
|
|
tot_time = (times[0].lstrip("0") + "m") if int(times[0]) > 0 else ""
|
|
tot_time += (times[1].lstrip("0") + "s") if int(times[1]) > 0 else "0s"
|
|
print(f"\nFinished in {tot_time} ", end="")
|
|
if finished_jobs < total_jobs:
|
|
print(f"({total_jobs - finished_jobs} failed)")
|
|
else:
|
|
print("")
|
|
|
|
|
|
def print_help():
|
|
print(
|
|
"""
|
|
add <url> - Add a feed (<url> must point to feed directly).
|
|
delete <n> - Delete feed(s).
|
|
change <n> - Change the name/category of feed(s).
|
|
show [all|unread|<n>] - Show all feeds, unread entries, or the given item(s)>
|
|
import [file] - Import feeds from [file] or feeds.opml>
|
|
export [file] - Export feeds to [file] or feeds.opml.
|
|
refresh - Refresh all feeds to check for new content.
|
|
mark <n> - Mark item(s) as read.
|
|
unmark <n> - Mark item(s) as unread.
|
|
open <n> - Open item(s) in the browser.
|
|
get <n> - Download item(s) from the Internet.
|
|
vacuum - Clean up free space in the database.
|
|
help|? - Print this message.
|
|
/<query> - Search the current items.
|
|
quit - Exit fenen.
|
|
|
|
<n> can be one or more integers or ranges of integers separated by commas. For
|
|
example, "2" or "1-4,8,10".
|
|
|
|
Short forms of commands may also be used. For example "s u" instead of "show
|
|
unread" or "d" instead of "delete". "list" and "print" are aliases of "show".
|
|
"""
|
|
)
|
|
|
|
|
|
conf = Config()
|
|
conf.parse_config()
|
|
|
|
db = Database(conf.get_value("db"))
|
|
db.init_db()
|
|
|
|
ui = UIContext()
|
|
|
|
print("Fenen Feed Reader v0.3.0/2023-11-27. Type ? for help.")
|
|
if __name__ == "__main__":
|
|
while True:
|
|
try:
|
|
command = input(f"fenen ({ui.context})> ").split()
|
|
except EOFError:
|
|
print("")
|
|
sys.exit(0)
|
|
if not command:
|
|
continue
|
|
|
|
# Special handling for "/<query>" command
|
|
if command[0][0] == "/" and len(command[0]) > 1:
|
|
command.append(command[0][1:])
|
|
command[0] = command[0][0]
|
|
|
|
# Allow ed-like syntax (e.g. s1, o2-5 in addition to s 1, o 2-5)
|
|
elif (
|
|
len(command[0]) > 1
|
|
and command[0][0].isalpha()
|
|
and (command[0][1].isdigit() or command[0][1] == "*")
|
|
):
|
|
command.append(command[0][1:])
|
|
command[0] = command[0][0]
|
|
|
|
args = command[1:]
|
|
command = command[0]
|
|
|
|
if "add".startswith(command):
|
|
add_feed(args[0]) if args else print("Usage: add <url>")
|
|
elif "delete".startswith(command):
|
|
delete_feed(args[0]) if args else print("Usage: delete <number(s)>")
|
|
elif "change".startswith(command):
|
|
change_feed(args[0]) if args else print("Usage: change <number(s)>")
|
|
elif "export".startswith(command):
|
|
export_feeds(args[0]) if args else export_feeds("feeds.opml")
|
|
elif "import".startswith(command):
|
|
import_feeds(args[0]) if args else import_feeds("feeds.opml")
|
|
elif "mark".startswith(command):
|
|
mark_read(args[0], True) if args else print("Usage: mark <number(s)>")
|
|
elif "unmark".startswith(command):
|
|
mark_read(args[0], False) if args else print("Usage: unmark <number(s)>")
|
|
elif "open".startswith(command):
|
|
open_in_browser(args[0]) if args else open_in_browser(None) if ui.data[
|
|
"post_url"
|
|
] else print("Usage: open <number(s)>")
|
|
elif "get".startswith(command):
|
|
download_entry(args[0]) if args else download_entry(None) if ui.data[
|
|
"post_url"
|
|
] else print("Usage: get <number(s)>")
|
|
elif "refresh".startswith(command):
|
|
refresh_feeds()
|
|
elif (
|
|
"show".startswith(command)
|
|
or "list".startswith(command)
|
|
or "print".startswith(command)
|
|
):
|
|
show(args[0]) if args else show()
|
|
elif "vacuum".startswith(command):
|
|
db.vacuum()
|
|
elif "help".startswith(command) or "?".startswith(command):
|
|
print_help()
|
|
elif "/".startswith(command):
|
|
search(args[0]) if args else print("Usage: /<query>")
|
|
elif "quit".startswith(command) or "exit".startswith(command):
|
|
sys.exit(0)
|
|
else:
|
|
print(f"Unrecognized command: {command}. Try 'help' for help.")
|