#!/usr/bin/env python3 # Fenen Feed Reader # Copyright (C) 2023 Jake Bauer # Licensed under the terms of the ISC License, see LICENSE for details. import readline import signal import subprocess import sys import tempfile import html import textwrap import re import datetime as dt from time import sleep from concurrent.futures import ThreadPoolExecutor, as_completed from timeit import default_timer from xml.etree import ElementTree # External dependency import feedparser # Internal dependencies from config import Config from html_converter import HTMLConverter from database import Database from ui_context import UIContext def signal_handler(sig, frame): sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Takes a range like "1-4,8,10"; converts it to a list like [1, 2, 3, 4, 8, 10] def interpret_range(string): indexlist = [] if ui.context == "all": maxIndex = db.get_num_feeds() elif ui.context == "feed": maxIndex = db.get_num_entries(ui.data["feed"]["table_name"]) elif ui.context in ["unread", "search"]: maxIndex = len(ui.data["items"]) if string == "*": return list(range(0, maxIndex)) for i in string.split(","): try: if "-" not in i: if int(i) > maxIndex: print("Item", i, "doesn't exist.") return [] indexlist.append(int(i)) else: low, high = map(int, i.split("-")) if high > maxIndex: print("Item", i, "doesn't exist.") return [] indexlist += range(low, high + 1) except ValueError: print("Invalid item number or range:", i) return [] # Transform nice-for-humans indices into nice-for-computers indices for i in range(len(indexlist)): indexlist[i] = indexlist[i] - 1 return indexlist # TODO: add support for inputting a regular page and parsing out a feed link # FIXME: be more robust about accepting non-feed URLs (check ContentType?) def add_feed(url): if not url.startswith("http://") and not url.startswith("https://"): url = "http://" + url db.insert_feed([url]) def delete_feed(indices): if not ( ui.context == "all" or (ui.context == "search" and ui.prev_context == "all") ): print("Can only delete feeds, not posts.") return indices = interpret_range(indices) response = input( "Are you sure you want to delete " f'{"these feeds" if len(indices) > 1 else "this feed"}? [y/N] ' ) if not response or not "yes".startswith(response.lower()): return urls = [] tables = [] for i in indices: if ui.context == "search": feed = db.get_feed_by_url(ui.data["items"][i]["url"]) ui.data["items"].pop(i) else: feed = db.get_feed_by_index(i) tables.append(feed["table_name"]) urls.append(feed["url"]) db.remove_feeds(urls, tables) def change_feed(indices): if not ( ui.context == "all" or (ui.context == "search" and ui.prev_context == "all") ): print("Can only change feed information, not post information.") return indices = interpret_range(indices) # Get all feeds first so changes in the database don't mess up the order feeds = [] for i in indices: if ui.context == "search": feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"])) else: feeds.append(db.get_feed_by_index(i)) for feed in feeds: print(f'Feed: {feed["url"]}') try: name = input(f' Name [{feed["name"]}]: ') category = input(f' Category [{feed["category"]}]: ') except EOFError: print("") break db.update_feed(name, category, feed["url"]) def import_feeds(file): try: with open(file) as f: tree = ElementTree.parse(f) except FileNotFoundError: print("File", file, "not found.") return except ElementTree.ParseError as e: print(f"Failed to parse {file}: {e}") return count = 0 for node in tree.findall(".//outline"): url = node.attrib.get("xmlUrl", "") name = html.unescape(node.attrib.get("text", "")) custom_name = 1 if name else 0 site_url = node.attrib.get("htmlUrl", "") category = html.unescape(node.attrib.get("category", "")) if url and not db.get_feed_by_url(url): db.insert_feed([url, name, custom_name, site_url, category], True) count += 1 print(f'{count if count > 0 else "No"} new feeds imported.') def export_feeds(file): with open(file, "w") as f: f.write('\n') f.write("\n\thttp://opml.org/spec2.opml\n\n") f.write("\n") for feed in db.get_all_feeds(): name = html.escape(feed["name"]) if feed["name"] else "" url = feed["url"] site_url = feed["site_url"] if feed["site_url"] else "" category = html.escape(feed["category"]) if feed["category"] else "" f.write( f'\t\n' ) f.write("\n") print("OPML file generated.") def mark_read(indices, mark_read): indices = interpret_range(indices) unread_status = "0" if mark_read else "1" feeds = [] entries = [] for i in indices: if ui.context == "all": feeds.append(db.get_feed_by_index(i)) elif ui.context == "feed": entries.append({"table": ui.data["feed"]["table_name"], "id": i}) elif ui.context == "unread": entries.append( { "table": ui.data["items"][i]["table_name"], "id": ui.data["items"][i]["id"], } ) elif ui.context == "search": if ui.prev_context == "feed": entries.append( { "table": ui.data["feed"]["table_name"], "id": ui.data["items"][i]["id"], } ) elif ui.prev_context == "all": feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"])) if feeds: db.change_unread_status_of_feeds(feeds, unread_status) else: db.change_unread_status_of_entries(entries, unread_status) def get_url(index): if ui.context == "all": feed = db.get_feed_by_index(index) url = feed["site_url"] if "site_url" in feed.keys() else feed["url"] elif ui.context == "feed": entry = db.get_entry_by_index(ui.data["feed"]["table_name"], index) url = entry["url"] if "url" in entry.keys() else None elif ui.context == "unread": entry = db.get_entry_by_id( ui.data["items"][index]["table_name"], ui.data["items"][index]["id"] ) url = entry["url"] if "url" in entry.keys() else None elif ui.context == "search": if ui.prev_context == "all": feed = db.get_feed_by_url(ui.data["items"][index]["url"]) url = feed["site_url"] if "site_url" in feed.keys() else feed["url"] elif ui.prev_context == "feed": entry = db.get_entry_by_id( ui.data["feed"]["table_name"], ui.data["items"][index]["id"] ) url = entry["url"] if "url" in entry.keys() else None return url def open_in_browser(indices=None): browser = conf.get_value("browser").split(" ") try: if not indices: subprocess.Popen(browser + [ui.data["post_url"]], stdout=subprocess.DEVNULL) return indices = interpret_range(indices) for i in indices: url = get_url(i) if url: subprocess.Popen(browser + [url], stdout=subprocess.DEVNULL) sleep(0.1) # Wait a bit so the browser opens URLs in the correct order else: print(f"Entry {i + 1} has no associated URL.") except FileNotFoundError: print( f"Error opening browser: could not find the program '{' '.join(browser)}'." ) return def download_entry(indices=None): if not indices: subprocess.run( f'{conf.get_value("downloader")} {ui.data["post_url"]}', shell=True ) return indices = interpret_range(indices) for i in indices: url = get_url(i) if url: subprocess.run(f'{conf.get_value("downloader")} {url}', shell=True) else: print(f"Entry {i + 1} has no associated URL.") def display_entry(entry, index=None): if ui.context == "unread": feed_name = ui.data["items"][index]["feed_name"] else: feed_name = ui.data["feed"]["name"] html_converter = HTMLConverter() html_converter.feed(entry["content"]) paragraphs = html_converter.text.splitlines() text = re.sub(r"\n\n+", "\n\n", "\n".join(textwrap.fill(p, 80) for p in paragraphs)) content = text + "\n\n" + "\n".join(html_converter.links) output = ( f'Title: {entry["title"]}\n' f"From: {feed_name}\n" f'Published: {entry["date"]}\n' f'URL: {entry["url"]}\n\n' f"{content}\n" ) with tempfile.NamedTemporaryFile("w") as f: f.write(output) f.flush() subprocess.run(f'{conf.get_value("pager")} {f.name}', shell=True) ui.data["post_url"] = entry["url"] def print_feeds(feeds): n = 1 for feed in feeds: num_unread = db.get_unread_count(feed["table_name"]) name = feed["name"] if feed["name"] else feed["url"] category = feed["category"] if feed["category"] else "Uncategorized" print(f"{n}) [{category}] {name} ({num_unread} unread)") n += 1 def print_feed_entries(feed, entries): print(f'{feed["name"]} - {feed["url"]}') n = 1 for entry in entries: marker = "* " if entry["unread"] else "" print(f'{n}) {marker}{entry["date"]} {entry["title"]}') n += 1 def show(indices=None): if indices: if "all".startswith(indices): ui.change_context("all") indices = None elif "unread".startswith(indices): ui.change_context("unread") indices = None else: indices = interpret_range(indices) if not indices: return if indices and ( ui.context == "all" or (ui.context == "search" and ui.prev_context == "all") ): for i in indices: if ui.context == "search": feed = db.get_feed_by_url(ui.data["items"][i]["url"]) else: feed = db.get_feed_by_index(i) entries = db.get_all_entries(feed["table_name"]) print_feed_entries(feed, entries) ui.change_context("feed") ui.data["feed"] = feed elif ui.context == "all": print_feeds(db.get_all_feeds()) elif indices and ( ui.context == "feed" or (ui.context == "search" and ui.prev_context == "feed") ): for i in indices: if ui.context == "search": entry = db.get_entry_by_id( ui.data["feed"]["table_name"], ui.data["items"][i]["id"] ) else: entry = db.get_entry_by_index(ui.data["feed"]["table_name"], i) display_entry(entry) if entry["unread"]: mark_read(str(i + 1), True) elif ui.context == "feed": entries = db.get_all_entries(feed["table_name"]) print_feed_entries(ui.data["feed"], entries) elif indices and ui.context == "unread": for i in indices: entry = db.get_entry_by_id( ui.data["items"][i]["table_name"], ui.data["items"][i]["id"] ) display_entry(entry, i) if entry["unread"]: mark_read(str(i + 1), True) elif ui.context == "unread": ui.change_context("unread") for feed in db.get_all_feeds(): items = db.get_unread_entries(feed["table_name"]) for entry in items: ui.data["items"].append( { "date": entry["date"], "title": entry["title"], "unread": entry["unread"], "feed_name": feed["name"], "table_name": feed["table_name"], "id": entry["id"], } ) ui.data["items"] = sorted(ui.data["items"], key=lambda entry: entry["date"]) if len(ui.data["items"]) == 0: print("No unread entries.") ui.revert_context() n = 1 for entry in ui.data["items"]: print(f'{n}) * {entry["date"]} {entry["feed_name"]} - {entry["title"]}') n += 1 elif ui.context == "search": search(ui.data["search_query"]) # Re-run the search def search(query): query = "%" + query + "%" if ui.context == "all": ui.change_context("search") ui.data["search_query"] = query results = db.search_feeds(query) for feed in results: ui.data["items"].append({"url": feed["url"]}) print_feeds(results) elif ui.context == "feed": results = db.search_entries(ui.data["feed"]["table_name"], query) feed = ui.data["feed"] # Store because changing context erases this ui.change_context("search") ui.data["search_query"] = query ui.data["feed"] = feed for entry in results: ui.data["items"].append({"id": entry["id"]}) print_feed_entries(feed, results) elif ui.context == "unread": print("Can't search unread entries.") return elif ui.context == "search": # Run this search in the same context as the previous search ui.revert_context() search(query) return if len(ui.data["items"]) == 0: print("No search results.") return def load_feed(url): db = Database(conf.get_value("db")) entries_to_insert = [] entries_to_update = [] try: feed = feedparser.parse(url) if "status" not in feed: print(f"\nError loading feed: {url}") print(feed.bozo_exception) return 1 if feed.status not in [200, 301, 302, 307, 308]: print(f"\nError loading feed: {url}: HTTP Code {feed.status}") return 1 feed_title = feed.feed.get("title", "No Feed Title") site_url = feed.feed.get("link", None) if db.feed_missing_name(url): db.populate_feed_name(url, feed_title) if db.feed_missing_site_url(url): db.populate_feed_site_url(url, site_url) table_name = db.get_feed_by_url(url)["table_name"] existing_ids = [entry["id"] for entry in db.get_all_entries(table_name)] for entry in feed.entries: entry_title = entry.get("title", "No Title") entry_url = entry.get("link", "No URL") entry_id = entry.get("id", entry_url) if "summary" in entry: entry_content = entry["summary"] else: entry_content = entry.get("content", "") if "published_parsed" in entry and entry.published_parsed: date = entry.published_parsed entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}" elif "updated_parsed" in entry and entry.updated_parsed: date = entry.updated_parsed entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}" else: entry_date = dt.date.today() if entry_id in existing_ids: entries_to_update.append( (entry_title, entry_url, entry_content, entry_id) ) else: entries_to_insert.append( ( entry_id, entry_date, entry_title, entry_url, entry_content, ) ) db.insert_entries(table_name, entries_to_insert) db.update_entries(table_name, entries_to_update) return 0 except Exception as e: print("\nUnhandled error with URL:", url, "->", type(e), e) return 1 def refresh_feeds(): feeds = db.get_all_feeds() total_jobs = len(feeds) finished_jobs = 0 start = default_timer() with ThreadPoolExecutor(max_workers=int(conf.get_value("threads"))) as executor: print(f"Refreshed 0/{total_jobs} feeds", end="\r", flush=True) futures = {executor.submit(load_feed, feed["url"]): feed for feed in feeds} for future in as_completed(futures): result = future.result() if result == 0: finished_jobs += 1 print( f"Refreshed {finished_jobs}/{total_jobs} feeds", end="\r", flush=True, ) times = str(dt.timedelta(seconds=round(default_timer() - start))).split(":")[1:] tot_time = (times[0].lstrip("0") + "m") if int(times[0]) > 0 else "" tot_time += (times[1].lstrip("0") + "s") if int(times[1]) > 0 else "0s" print(f"\nFinished in {tot_time} ", end="") if finished_jobs < total_jobs: print(f"({total_jobs - finished_jobs} failed)") else: print("") def print_help(): print( """ add - Add a feed ( must point to feed directly). delete - Delete feed(s). change - Change the name/category of feed(s). show [all|unread|] - Show all feeds, unread entries, or the given item(s)> import [file] - Import feeds from [file] or feeds.opml> export [file] - Export feeds to [file] or feeds.opml. refresh - Refresh all feeds to check for new content. mark - Mark item(s) as read. unmark - Mark item(s) as unread. open - Open item(s) in the browser. get - Download item(s) from the Internet. vacuum - Clean up free space in the database. help|? - Print this message. / - Search the current items. quit - Exit fenen. can be one or more integers or ranges of integers separated by commas. For example, "2" or "1-4,8,10". Short forms of commands may also be used. For example "s u" instead of "show unread" or "d" instead of "delete". "list" and "print" are aliases of "show". """ ) conf = Config() conf.parse_config() db = Database(conf.get_value("db")) db.init_db() ui = UIContext() print("Fenen Feed Reader v0.3.0/2023-11-27. Type ? for help.") if __name__ == "__main__": while True: try: command = input(f"fenen ({ui.context})> ").split() except EOFError: print("") sys.exit(0) if not command: continue # Special handling for "/" command if command[0][0] == "/" and len(command[0]) > 1: command.append(command[0][1:]) command[0] = command[0][0] # Allow ed-like syntax (e.g. s1, o2-5 in addition to s 1, o 2-5) elif ( len(command[0]) > 1 and command[0][0].isalpha() and (command[0][1].isdigit() or command[0][1] == "*") ): command.append(command[0][1:]) command[0] = command[0][0] args = command[1:] command = command[0] if "add".startswith(command): add_feed(args[0]) if args else print("Usage: add ") elif "delete".startswith(command): delete_feed(args[0]) if args else print("Usage: delete ") elif "change".startswith(command): change_feed(args[0]) if args else print("Usage: change ") elif "export".startswith(command): export_feeds(args[0]) if args else export_feeds("feeds.opml") elif "import".startswith(command): import_feeds(args[0]) if args else import_feeds("feeds.opml") elif "mark".startswith(command): mark_read(args[0], True) if args else print("Usage: mark ") elif "unmark".startswith(command): mark_read(args[0], False) if args else print("Usage: unmark ") elif "open".startswith(command): open_in_browser(args[0]) if args else open_in_browser(None) if ui.data[ "post_url" ] else print("Usage: open ") elif "get".startswith(command): download_entry(args[0]) if args else download_entry(None) if ui.data[ "post_url" ] else print("Usage: get ") elif "refresh".startswith(command): refresh_feeds() elif ( "show".startswith(command) or "list".startswith(command) or "print".startswith(command) ): show(args[0]) if args else show() elif "vacuum".startswith(command): db.vacuum() elif "help".startswith(command) or "?".startswith(command): print_help() elif "/".startswith(command): search(args[0]) if args else print("Usage: /") elif "quit".startswith(command) or "exit".startswith(command): sys.exit(0) else: print(f"Unrecognized command: {command}. Try 'help' for help.")