Initial checkin

This commit is contained in:
Jake Bauer 2023-07-26 18:11:46 -04:00
commit e1c34dc059
8 changed files with 1281 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Idk, whatever python does
__pycache__/
# Local testing
feeds.*
# Build artifacts
build/
dist/
fenen.spec

15
LICENSE Normal file
View File

@ -0,0 +1,15 @@
ISC License
Copyright (c) 2023 Jake Bauer <jbauer@paritybit.ca>
Permission to use, copy, modify, and distribute this software for any purpose
with or without fee is hereby granted, provided that the above copyright notice
and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
THIS SOFTWARE.

382
README.md Normal file
View File

@ -0,0 +1,382 @@
# Fenen Feed Reader
Fenen is a terminal based RSS/Atom feed reader with a command syntax
reminiscent of mail(1)/ed(1) with messages/lines replaced by feeds and
feed entries.
It's the feed reader that fits my needs :)
* [Installation](#installation)
* [Getting Started](#getting-started)
* [Command Reference](#command-reference)
* [Configuration](#configuration)
* [Development Roadmap](#development-roadmap)
* [Contributing](#contributing)
## Installation
The easiest way to get fenen is to just grab one of the pre-packaged
binaries for your system, which contain a Python environment and all the
dependencies. These are available on the [project
page](http://www.paritybit.ca/projects/fenen).
### From Source
If you just want to run fenen without packaging it, ensure you have
Python >=3.6 installed.
If you want to package fenen into a single executable file for yourself,
ensure you have Python >=3.7 installed and install pyinstaller via pip:
```
pip install pyinstaller
```
You can then run the following to turn fenen into a single executable
file (which will be found under the `./dist/` folder) for your current
combination of operating system and architecture:
```
pyinstaller --onefile fenen.py
```
The only external dependency fenen needs is
[feedparser](https://pypi.org/project/feedparser/) which is likely already
available in your operating system's package repositories.
#### OpenBSD
```
pkg_add py3-feedparser
```
#### Debian Linux & Derivatives (Ubuntu, Linux Mint, etc.)
```
apt install python3-feedparser
```
#### Fedora Linux & Derivatives
```
dnf install python3-feedparser
```
#### My OS/Distro Isn't Listed
Search your OS/Distro's package repositories for the python feedparser package.
Some have different packages for different versions of Python so I didn't
bother listing them all above.
If they don't have it then your best option is probably just to install it via
pip (install it globally or using whichever virtual environment manager you
wish):
```
pip install feedparser
```
## Getting Started
When you launch fenen, you will be greeted with a prompt:
```
fenen (all)>
```
This is the command line that you use to interact with fenen. If you've
ever used ed(1) or mail(1) before, fenen's syntax is a bit similar to
those. The word in parentheses indicates the current context:
* all -> The list of all feeds
* feed -> The list of entries in a particular feed
* unread -> The list of unread entries across all feeds
* search -> The list of search results (can be either feeds or entries)
To get started, add some feeds to fenen:
```
fenen (all)> add http://www.paritybit.ca/feed.xml
fenen (all)> add https://www.undeadly.org/cgi?action=rss
fenen (all)> add https://rakudoweekly.blog/
```
You can use the `show` command to show all the feeds fenen knows about:
```
fenen (all)> show
1) http://www.paritybit.ca/feed.xml (0 unread)
2) https://rakudoweekly.blog/ (0 unread)
3) https://www.undeadly.org/cgi?action=rss (0 unread)
fenen (all)>
```
Although you have just added some feeds to fenen, they are not yet
populated with content. To refresh these feeds and load new content, use
the `refresh` command:
```
fenen (all)> refresh
Refreshed 3/3 feeds in 1 seconds
fenen (all)>
```
Now when you `show` again, it should look something like:
```
fenen (all)> show
1) OpenBSD Journal (9 unread)
2) paritybit.ca (50 unread)
3) Rakudo Weekly News (10 unread)
fenen (all)>
```
To view a list of posts in a specific feed, use the `show` command with
the number of the feed:
```
fenen (all)> show 1
OpenBSD Journal - https://www.undeadly.org/cgi?action=rss
1) * 2023-06-21 [CFT] Major pfsync(4) Rewrite on the Horizon
2) * 2023-06-24 Game of Trees 0.90 released
3) * 2023-07-04 [CFT] sec(4) for Route Based IPSec VPNs
4) * 2023-07-06 Major pfsync(4) Rewrite Has Been Committed
5) * 2023-07-06 Soft updates (softdep) disabled for future VFS work
6) * 2023-07-11 Wayland on OpenBSD
7) * 2023-07-12 pkg_*: the road forward
8) * 2023-07-13 OpenBGPD 8.1 released
9) * 2023-07-14 Mandatory enforcement of indirect branch targets
fenen (feed)>
```
The latest posts are shown at the bottom of the list (closest to the
next command prompt) and unread items have a `*` between the number and
the date.
To read a specific item, use the `show` command with a number again.
This item will then be marked as read.
```
fenen (feed)> show 9
...Entry number 9 is displayed...
```
You can also manually mark an item as read or unread with the `mark` and
`unmark` commands, and you can open any item in the browser with the
`open` command, all of which also take an item number. If you were just
reading a post and you invoke `open` without a number, that post will
open in the browser.
```
fenen (feed)> open
...Browser opens with entry number 9...
fenen (feed)>
```
You'll notice that if you try using `show` without a number, it will
print the list of entries in this feed again. To get back to the list of
all feeds, you can use the `show all` command. Additionally, to view
a list of unread feed entries across all of your feeds, use the `show
unread` command:
```
fenen (all)> show unread
...A list of unread feeds is printed...
fenen (unread)>
```
Finally, quit fenen with the `quit` command:
```
fenen (all)> quit
```
### Tips
Here are a few extra tidbits to make your fenen experience more
pleasant:
You don't actually have to type out full command names, you can type any
of the first part of a command and it will be expanded to the full
meaning. For example `s u` is the same as `sho un` which is the same as
`show unread`.
Any command which takes an item number can also accept ranges and lists
of numbers. For example, you can use `delete 1,4,6-10` to delete feeds
1, 4, 6, 7, 8, 9, and 10 or `mark *` to mark all entries as read.
There is no space needed between a command and item numbers if you use
the single-letter short form of a command. For example, you can use
`o1-5` to open entries 1 through 5, which is equivalent to `o 1-5` and
`open 1-5`.
This is what a typical fenen session looks like once you've got all your
feeds added and are just using it to check for new entries:
```
fenen (all)> r
Refreshed 150/150 feeds in 42 seconds
fenen (all)> s u
1) * 2023-07-16 Ken Shirriff's blog - Undocumented 8086 instructions, explained by the microcode
2) * 2023-07-16 Phoronix - Linux Mint 21.2 Released With Cinnamon Enhancements, Other Desktop Polishing
3) * 2023-07-16 Technology Connections - Longer-lasting light bulbs: it was complicated
fenen (unread)> o1,3
fenen (unread)> m*
fenen (unread)> ^D
```
First we start by refreshing the feeds to pull in content that was
published since the last refresh. Then we tell fenen to show us any
unread entries (which, in this case, is everything that is new since the
last refresh). We decide that we want to watch that Technology
Connections video and read that blog post from Ken Shirriff, so we open
them in the browser. Finally, we mark all of these posts as read and
quit by pressing Control+D.
In order to view all the available commands, type `help` and hit enter.
Most commands are self-explanatory, but here is a breakdown:
## Command Reference
`add <url>`:
Adds the given feed URL `<url>` to fenen. This must be a link to a
feed; fenen does not support finding a feed embedded in a site.
Perform a refresh to load the content from that feed.
`delete <n>`:
Delete the given feed(s) from the feed reader. All content will be
removed from fenen. This command only deletes a feed from fenen and will
do nothing if you try to delete a post. If you're doing this because you
want to shrink the size of fenen's database on disk, follow this up with
a `vacuum` command.
`change <n>`:
Change the name and/or category of the given feed(s). You will be
prompted to input a new name and category. The current feed name and
category are printed between square brackets and if nothing is entered
for either of the values then that value won't be changed.
`(show|list|print) [all|unread] <n>`:
If given the subcommand `all`, show the list of feeds. If given the
subcommand `unread`, show a list of unread entries from all feeds. If
not given a subcommand, then show the given item(s) which will either
print out a listing of posts in a feed if run on the list of feeds, or
show a specific post if run on a list of posts.
`import [file]`:
Import feeds from the given OPML file, adding them to fenen. If no file
is given, fenen will try to import from `feeds.opml`.
`export [file]`:
Export the list of feeds in fenen to an OPML file. If no file is given,
fenen will try to export to `feeds.opml`.
`refresh`:
Refresh all feed content by downloading the latest version of each
site's feed. This may take a while to complete if you have a slow
connection many feeds.
`mark <n>` and `unmark <n>`:
Mark the given item(s) as read or unread, respectively.
`open <n>`:
Open the given item(s) in the browser. If the item(s) are feeds, this
will open the website corresponding to that feed as long as the feed
contains that information, otherwise it will open the feed itself. If
the item(s) are posts, this will open those posts in the browser.
`get <n>`:
Download the given entry from the Internet. This can be used to read the full
content of a post if only a short summary is given in the feed.
`vacuum`:
Clean up free space not currently being used to store data in the
database. This simply shrinks the size of the database file on disk if
there is space inside that is not being used. This command isn't
needed under normal circumstances.
`(help|?)`:
Print out a help message with a brief explanation of each command.
`/<query>`:
Performs a search on the current list of feeds or the current list of
entries in a feed and displays the results. You can search under the
name and url for feeds and under the name, url, and date for entries in
a particular feed.
`quit`:
Exit fenen. ^D (EOF) and ^C (SIGINT) also work. Note that if you try
to exit with ^C in the middle of a refresh, fenen will finish all
currently ongoing downloads and then exit.
## Configuration
Fenen has a reasonable and small default configuration. Having a config
file is not necessary unless you want to override any of the settings
and, if you do, it's only necessary to put the specific option(s) you
wish to override in the config file.
Fenen will first look for a custom configuration in
`$XDG_CONFIG_HOME/fenen.conf`, then `~/.config/fenen.conf`, and then
`~/.fenen/fenen.conf`. If none is found it will use the built-in default
settings.
The following is a sample configuration file containing the built-in
defaults:
```
# The location of the database file
db = $XDG_DATA_HOME/fenen.db
# The number of threads to use when refreshing feeds
# The sweet spot for performance seems to land between 4 and 8 threads
# depending on your hardware
threads = 4
# The program to use to open items with the `open` command
browser = firefox
# The program to use to read feed entries in fenen with the `show` command
pager = less
# The program to download full entries with the `get` command
downloader = curl -O
```
## Development Roadmap
There are a few things I want to add to fenen before I consider it
feature-complete:
* Write a manpage
* Add ability to download podcast/youtube video directly?
* Tests?
## Contributing
Send bug reports, feedback, suggestions, and patches by email to
[jbauer@paritybit.ca](mailto:jbauer@paritybit.ca).
I am especially interested in fixing accessibility issues.
Please make sure to format your code using
[Black](https://pypi.org/project/black/) (with default settings) before
submitting a patch.
I want to keep fenen as small and external-dependency-free as is
reasonable. Please keep that in mind when making suggestions or
sending patches.

44
config.py Normal file
View File

@ -0,0 +1,44 @@
import os
class Config:
def __init__(self):
self.config = {}
self._set_defaults()
def _set_defaults(self):
if "XDG_DATA_HOME" in os.environ:
datadir = os.environ["XDG_DATA_HOME"]
elif os.path.isdir(os.environ["HOME"] + "/.local/share"):
datadir = os.environ["HOME"] + "/.local/share"
else:
datadir = os.environ["HOME"] + "/.fenen"
if not os.path.isdir(datadir):
os.mkdir(datadir)
self.config["db"] = datadir + "/fenen.db"
self.config["threads"] = 4
self.config["browser"] = "firefox"
self.config["pager"] = "less"
self.config["downloader"] = "curl -O"
def parse_config(self):
if "XDG_CONFIG_HOME" in os.environ and os.path.isfile(
os.environ["XDG_CONFIG_HOME"] + "/fenen.conf"
):
configfile = os.environ["XDG_CONFIG_HOME"] + "/fenen.conf"
elif os.path.isfile(os.environ["HOME"] + "/.config/fenen.conf"):
configfile = os.environ["HOME"] + "/.config/fenen.conf"
elif os.path.isfile(os.environ["HOME"] + "/.fenen/fenen.conf"):
configfile = os.environ["HOME"] + "/.fenen/fenen.conf"
else:
return
with open(configfile, "r") as f:
for line in f:
kv = list(map(str.strip, line.split("=", 1)))
if kv[0] in self.config:
self.config[kv[0]] = kv[1]
else:
print(f"Unknown config option: {kv[0]}")
def get_value(self, key):
return self.config[key] if key in self.config else None

174
database.py Normal file
View File

@ -0,0 +1,174 @@
import sqlite3
from hashlib import sha256
class Database:
db = None
c = None
def __init__(self, dbfile):
self.db = sqlite3.connect(dbfile, timeout=30)
self.db.row_factory = sqlite3.Row
self.c = self.db.cursor()
def init_db(self):
with self.db:
self.c.execute(
"CREATE TABLE IF NOT EXISTS feeds "
"(url TEXT PRIMARY KEY, name TEXT COLLATE NOCASE, "
"custom_name INTEGER, site_url TEXT, "
"category TEXT COLLATE NOCASE, table_name TEXT)"
)
def insert_feed(self, data, from_opml=False):
with self.db:
table_name = "table" + sha256(data[0].encode()).hexdigest()
data.append(table_name)
self.c.execute(
f"CREATE TABLE IF NOT EXISTS {table_name} "
"(id TEXT PRIMARY KEY, date DATE, title TEXT, "
"url TEXT, content TEXT, unread INTEGER)"
)
if from_opml:
self.c.execute(
"INSERT OR IGNORE INTO feeds VALUES (?, ?, ?, ?, ?, ?)",
tuple(data),
)
else:
self.c.execute(
"INSERT OR IGNORE INTO feeds VALUES (?, NULL, NULL, NULL, NULL, ?)",
tuple(data),
)
def update_feed(self, name, category, url):
with self.db:
if name:
self.c.execute(
"UPDATE feeds SET name = ?, custom_name = 1 WHERE url = ?",
(name, url),
)
if category:
self.c.execute(
"UPDATE feeds SET category = ? WHERE url = ?",
(category, url),
)
def get_all_feeds(self):
return self.c.execute(
"SELECT * FROM feeds ORDER BY category, name, url"
).fetchall()
def get_feed_by_url(self, url):
return self.c.execute("SELECT * FROM feeds WHERE url = ?", (url,)).fetchone()
def get_feed_by_index(self, index):
return self.c.execute(
f"SELECT * FROM feeds ORDER BY category, name, url LIMIT 1 OFFSET {index}"
).fetchone()
def get_all_entries(self, table):
return self.c.execute(f"SELECT * FROM {table} ORDER BY date").fetchall()
def get_entry_by_index(self, table, index):
return self.c.execute(
f"SELECT * FROM {table} ORDER BY date LIMIT 1 OFFSET {index}"
).fetchone()
def get_entry_by_id(self, table, id):
return self.c.execute(f"SELECT * FROM {table} WHERE id = ?", (id,)).fetchone()
def get_unread_count(self, table):
return self.c.execute(
f"SELECT COUNT(unread) FROM {table} WHERE unread = 1"
).fetchone()[0]
def get_unread_entries(self, table):
return self.c.execute(f"SELECT * FROM {table} WHERE unread = 1").fetchall()
def get_num_feeds(self):
return self.c.execute("SELECT COUNT(1) FROM feeds").fetchone()[0]
def get_num_entries(self, table):
return self.c.execute(f"SELECT COUNT(1) FROM {table}").fetchone()[0]
def search_feeds(self, query):
return self.c.execute(
"SELECT * FROM feeds WHERE url LIKE ? OR name LIKE ? ORDER BY category, name, url",
(query, query),
).fetchall()
def search_entries(self, table, query):
return self.c.execute(
f"SELECT * FROM {table} WHERE "
"date LIKE ? OR title LIKE ? OR url LIKE ? ORDER BY date ASC",
(query, query, query),
).fetchall()
def feed_missing_name(self, url):
if self.c.execute(
"SELECT url FROM feeds WHERE url = ? AND name IS NULL", (url,)
).fetchone():
return True
return False
def feed_missing_site_url(self, url):
if self.c.execute(
"SELECT url FROM feeds WHERE url = ? AND site_url IS NULL", (url,)
).fetchone():
return True
return False
def populate_feed_name(self, url, name):
with self.db:
self.c.execute(
"UPDATE feeds SET name = ?, custom_name = 0 WHERE url = ?", (name, url)
)
def populate_feed_site_url(self, url, site_url):
with self.db:
self.c.execute(
"UPDATE feeds SET site_url = ? WHERE url = ?", (site_url, url)
)
def remove_feeds(self, urls, tables):
urls = [(url,) for url in urls]
with self.db:
self.c.executemany("DELETE FROM feeds WHERE url = ?", urls)
for table in tables:
self.c.execute(f"DROP TABLE {table}")
def change_unread_status_of_feeds(self, feeds, unread_status):
with self.db:
for feed in feeds:
self.c.execute(
f'UPDATE {feed["table_name"]} SET unread = ?', (unread_status,)
)
def change_unread_status_of_entries(self, entries, unread_status):
with self.db:
for entry in entries:
self.c.execute(
f'UPDATE {entry["table"]} SET unread = ? WHERE id = ?',
(unread_status, entry["id"]),
)
def insert_entries(self, table, entries):
with self.db:
try:
self.c.executemany(
f"INSERT INTO {table} VALUES (?, ?, ?, ?, ?, 1)", entries
)
except sqlite3.IntegrityError:
pass # This is likely just a duplicate entry i.e. unique
# constraint failed on table<n>.id which doesn't matter
def update_entries(self, table, entries):
with self.db:
self.c.executemany(
f"UPDATE {table} SET title = ?, url = ?, content = ? WHERE id = ?",
entries,
)
def vacuum(self):
with self.db:
self.c.execute("VACUUM")

605
fenen.py Executable file
View File

@ -0,0 +1,605 @@
#!/usr/bin/env python3
# Fenen Feed Reader
# Copyright (C) 2023 Jake Bauer <jbauer@paritybit.ca>
# Licensed under the terms of the ISC License, see LICENSE for details.
import readline
import signal
import subprocess
import sys
import tempfile
import html
import textwrap
import re
import datetime as dt
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
from xml.etree import ElementTree
# External dependency
import feedparser
# Internal dependencies
from config import Config
from html_converter import HTMLConverter
from database import Database
from ui_context import UIContext
def signal_handler(sig, frame):
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Takes a range like "1-4,8,10"; converts it to a list like [1, 2, 3, 4, 8, 10]
def interpret_range(string):
indexlist = []
if ui.context == "all":
maxIndex = db.get_num_feeds()
elif ui.context == "feed":
maxIndex = db.get_num_entries(ui.data["feed"]["table_name"])
elif ui.context in ["unread", "search"]:
maxIndex = len(ui.data["items"])
if string == "*":
return list(range(0, maxIndex))
for i in string.split(","):
try:
if "-" not in i:
if int(i) > maxIndex:
print("Item", i, "doesn't exist.")
return []
indexlist.append(int(i))
else:
low, high = map(int, i.split("-"))
if high > maxIndex:
print("Item", i, "doesn't exist.")
return []
indexlist += range(low, high + 1)
except ValueError:
print("Invalid item number or range:", i)
return []
# Transform nice-for-humans indices into nice-for-computers indices
for i in range(len(indexlist)):
indexlist[i] = indexlist[i] - 1
return indexlist
# TODO: add support for inputting a regular page and parsing out a feed link
# FIXME: be more robust about accepting non-feed URLs (check ContentType?)
def add_feed(url):
if not url.startswith("http://") and not url.startswith("https://"):
url = "http://" + url
db.insert_feed([url])
def delete_feed(indices):
if not (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
print("Can only delete feeds, not posts.")
return
indices = interpret_range(indices)
response = input(
"Are you sure you want to delete "
f'{"these feeds" if len(indices) > 1 else "this feed"}? [y/N] '
)
if not response or not "yes".startswith(response.lower()):
return
urls = []
tables = []
for i in indices:
if ui.context == "search":
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
ui.data["items"].pop(i)
else:
feed = db.get_feed_by_index(i)
tables.append(feed["table_name"])
urls.append(feed["url"])
db.remove_feeds(urls, tables)
def change_feed(indices):
if not (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
print("Can only change feed information, not post information.")
return
indices = interpret_range(indices)
# Get all feeds first so changes in the database don't mess up the order
feeds = []
for i in indices:
if ui.context == "search":
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
else:
feeds.append(db.get_feed_by_index(i))
for feed in feeds:
print(f'Feed: {feed["url"]}')
try:
name = input(f' Name [{feed["name"]}]: ')
category = input(f' Category [{feed["category"]}]: ')
except EOFError:
print("")
break
db.update_feed(name, category, feed["url"])
def import_feeds(file):
try:
with open(file) as f:
tree = ElementTree.parse(f)
except FileNotFoundError:
print("File", file, "not found.")
return
except ElementTree.ParseError as e:
print(f"Failed to parse {file}: {e}")
return
count = 0
for node in tree.findall(".//outline"):
url = node.attrib.get("xmlUrl", "")
name = html.unescape(node.attrib.get("text", ""))
custom_name = 1 if name else 0
site_url = node.attrib.get("htmlUrl", "")
category = html.unescape(node.attrib.get("category", ""))
if url and not db.get_feed_by_url(url):
db.insert_feed([url, name, custom_name, site_url, category], True)
count += 1
print(f'{count if count > 0 else "No"} new feeds imported.')
def export_feeds(file):
with open(file, "w") as f:
f.write('<opml version="2.0">\n')
f.write("<head>\n\t<docs>http://opml.org/spec2.opml</docs>\n</head>\n")
f.write("<body>\n")
for feed in db.get_all_feeds():
name = html.escape(feed["name"]) if feed["name"] else ""
url = feed["url"]
site_url = feed["site_url"] if feed["site_url"] else ""
category = html.escape(feed["category"]) if feed["category"] else ""
f.write(
f'\t<outline type="rss" text="{name}" xmlUrl="{url}" htmlUrl="{site_url}" category="{category}"/>\n'
)
f.write("</body>\n</opml>")
print("OPML file generated.")
def mark_read(indices, mark_read):
indices = interpret_range(indices)
unread_status = "0" if mark_read else "1"
feeds = []
entries = []
for i in indices:
if ui.context == "all":
feeds.append(db.get_feed_by_index(i))
elif ui.context == "feed":
entries.append({"table": ui.data["feed"]["table_name"], "id": i})
elif ui.context == "unread":
entries.append(
{
"table": ui.data["items"][i]["table_name"],
"id": ui.data["items"][i]["id"],
}
)
elif ui.context == "search":
if ui.prev_context == "feed":
entries.append(
{
"table": ui.data["feed"]["table_name"],
"id": ui.data["items"][i]["id"],
}
)
elif ui.prev_context == "all":
feeds.append(db.get_feed_by_url(ui.data["items"][i]["url"]))
if feeds:
db.change_unread_status_of_feeds(feeds, unread_status)
else:
db.change_unread_status_of_entries(entries, unread_status)
def get_url(index):
if ui.context == "all":
feed = db.get_feed_by_index(index)
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
elif ui.context == "feed":
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], index)
url = entry["url"] if "url" in entry.keys() else None
elif ui.context == "unread":
entry = db.get_entry_by_id(
ui.data["items"][index]["table_name"], ui.data["items"][index]["id"]
)
url = entry["url"] if "url" in entry.keys() else None
elif ui.context == "search":
if ui.prev_context == "all":
feed = db.get_feed_by_url(ui.data["items"][index]["url"])
url = feed["site_url"] if "site_url" in feed.keys() else feed["url"]
elif ui.prev_context == "feed":
entry = db.get_entry_by_id(
ui.data["feed"]["table_name"], ui.data["items"][index]["id"]
)
url = entry["url"] if "url" in entry.keys() else None
return url
def open_in_browser(indices=None):
if not indices:
subprocess.Popen(
[conf.get_value("browser"), ui.data["post_url"]], stdout=subprocess.DEVNULL
)
return
indices = interpret_range(indices)
for i in indices:
url = get_url(i)
if url:
subprocess.Popen(
[conf.get_value("browser"), url], stdout=subprocess.DEVNULL
)
sleep(0.1) # Wait a bit so the browser opens URLs in the correct order
else:
print(f"Entry {i + 1} has no associated URL.")
def download_entry(indices=None):
if not indices:
subprocess.run(
f'{conf.get_value("downloader")} {ui.data["post_url"]}', shell=True
)
return
indices = interpret_range(indices)
for i in indices:
url = get_url(i)
if url:
subprocess.run(f'{conf.get_value("downloader")} {url}', shell=True)
else:
print(f"Entry {i + 1} has no associated URL.")
def display_entry(entry, index=None):
if ui.context == "unread":
feed_name = ui.data["items"][index]["feed_name"]
else:
feed_name = ui.data["feed"]["name"]
html_converter = HTMLConverter()
html_converter.feed(entry["content"])
paragraphs = html_converter.text.splitlines()
text = re.sub(r"\n\n+", "\n\n", "\n".join(textwrap.fill(p, 80) for p in paragraphs))
content = text + "\n\n" + "\n".join(html_converter.links)
output = (
f'Title: {entry["title"]}\n'
f"From: {feed_name}\n"
f'Published: {entry["date"]}\n'
f'URL: {entry["url"]}\n\n'
f"{content}\n"
)
with tempfile.NamedTemporaryFile("w") as f:
f.write(output)
f.flush()
subprocess.run(f'{conf.get_value("pager")} {f.name}', shell=True)
ui.data["post_url"] = entry["url"]
def print_feeds(feeds):
n = 1
for feed in feeds:
num_unread = db.get_unread_count(feed["table_name"])
name = feed["name"] if feed["name"] else feed["url"]
category = feed["category"] if feed["category"] else "Uncategorized"
print(f"{n}) [{category}] {name} ({num_unread} unread)")
n += 1
def print_feed_entries(feed, entries):
print(f'{feed["name"]} - {feed["url"]}')
n = 1
for entry in entries:
marker = "* " if entry["unread"] else ""
print(f'{n}) {marker}{entry["date"]} {entry["title"]}')
n += 1
def show(indices=None):
if indices:
if "all".startswith(indices):
ui.change_context("all")
indices = None
elif "unread".startswith(indices):
ui.change_context("unread")
indices = None
else:
indices = interpret_range(indices)
if not indices:
return
if indices and (
ui.context == "all" or (ui.context == "search" and ui.prev_context == "all")
):
for i in indices:
if ui.context == "search":
feed = db.get_feed_by_url(ui.data["items"][i]["url"])
else:
feed = db.get_feed_by_index(i)
entries = db.get_all_entries(feed["table_name"])
print_feed_entries(feed, entries)
ui.change_context("feed")
ui.data["feed"] = feed
elif ui.context == "all":
print_feeds(db.get_all_feeds())
elif indices and (
ui.context == "feed" or (ui.context == "search" and ui.prev_context == "feed")
):
for i in indices:
if ui.context == "search":
entry = db.get_entry_by_id(
ui.data["feed"]["table_name"], ui.data["items"][i]["id"]
)
else:
entry = db.get_entry_by_index(ui.data["feed"]["table_name"], i)
display_entry(entry)
if entry["unread"]:
mark_read(str(i + 1), True)
elif ui.context == "feed":
entries = db.get_all_entries(feed["table_name"])
print_feed_entries(ui.data["feed"], entries)
elif indices and ui.context == "unread":
for i in indices:
entry = db.get_entry_by_id(
ui.data["items"][i]["table_name"], ui.data["items"][i]["id"]
)
display_entry(entry, i)
if entry["unread"]:
mark_read(str(i + 1), True)
elif ui.context == "unread":
ui.change_context("unread")
for feed in db.get_all_feeds():
items = db.get_unread_entries(feed["table_name"])
for entry in items:
ui.data["items"].append(
{
"date": entry["date"],
"title": entry["title"],
"unread": entry["unread"],
"feed_name": feed["name"],
"table_name": feed["table_name"],
"id": entry["id"],
}
)
ui.data["items"] = sorted(ui.data["items"], key=lambda entry: entry["date"])
if len(ui.data["items"]) == 0:
print("No unread entries.")
ui.revert_context()
n = 1
for entry in ui.data["items"]:
print(f'{n}) * {entry["date"]} {entry["feed_name"]} - {entry["title"]}')
n += 1
elif ui.context == "search":
search(ui.data["search_query"]) # Re-run the search
def search(query):
query = "%" + query + "%"
if ui.context == "all":
ui.change_context("search")
ui.data["search_query"] = query
results = db.search_feeds(query)
for feed in results:
ui.data["items"].append({"url": feed["url"]})
print_feeds(results)
elif ui.context == "feed":
results = db.search_entries(ui.data["feed"]["table_name"], query)
feed = ui.data["feed"] # Store because changing context erases this
ui.change_context("search")
ui.data["search_query"] = query
ui.data["feed"] = feed
for entry in results:
ui.data["items"].append({"id": entry["id"]})
print_feed_entries(feed, results)
elif ui.context == "unread":
print("Can't search unread entries.")
return
elif ui.context == "search":
# Run this search in the same context as the previous search
ui.revert_context()
search(query)
return
if len(ui.data["items"]) == 0:
print("No search results.")
return
def load_feed(url):
db = Database(conf.get_value("db"))
entries_to_insert = []
entries_to_update = []
try:
feed = feedparser.parse(url)
if "status" not in feed:
print(f"\nError loading feed: {url}")
print(feed.bozo_exception)
return 1
if feed.status not in [200, 301, 302, 307, 308]:
print(f"\nError loading feed: {url}: HTTP Code {feed.status}")
return 1
feed_title = feed.feed.get("title", "No Feed Title")
site_url = feed.feed.get("link", None)
if db.feed_missing_name(url):
db.populate_feed_name(url, feed_title)
if db.feed_missing_site_url(url):
db.populate_feed_site_url(url, site_url)
table_name = db.get_feed_by_url(url)["table_name"]
existing_ids = [entry["id"] for entry in db.get_all_entries(table_name)]
for entry in feed.entries:
entry_title = entry.get("title", "No Title")
entry_url = entry.get("link", "No URL")
entry_id = entry.get("id", entry_url)
if "summary" in entry:
entry_content = entry["summary"]
else:
entry_content = entry.get("content", "")
if "published_parsed" in entry and entry.published_parsed:
date = entry.published_parsed
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
elif "updated_parsed" in entry and entry.updated_parsed:
date = entry.updated_parsed
entry_date = f"{date.tm_year}-{date.tm_mon:02}-{date.tm_mday:02}"
else:
entry_date = dt.date.today()
if entry_id in existing_ids:
entries_to_update.append(
(entry_title, entry_url, entry_content, entry_id)
)
else:
entries_to_insert.append(
(
entry_id,
entry_date,
entry_title,
entry_url,
entry_content,
)
)
db.insert_entries(table_name, entries_to_insert)
db.update_entries(table_name, entries_to_update)
return 0
except Exception as e:
print("\nUnhandled error with URL:", url, "->", type(e), e)
return 1
def refresh_feeds():
feeds = db.get_all_feeds()
total_jobs = len(feeds)
finished_jobs = 0
start = default_timer()
with ThreadPoolExecutor(max_workers=conf.get_value("threads")) as executor:
print(f"Refreshed 0/{total_jobs} feeds", end="", flush=True)
for result in executor.map(load_feed, [feed["url"] for feed in feeds]):
if result == 0:
finished_jobs += 1
print(
f"\rRefreshed {finished_jobs}/{total_jobs} feeds",
end="",
flush=True,
)
times = str(dt.timedelta(seconds=round(default_timer() - start))).split(":")[1:]
tot_time = (times[0].lstrip("0") + "m") if int(times[0]) > 0 else ""
tot_time += (times[1].lstrip("0") + "s") if int(times[1]) > 0 else "0s"
print(f" in {tot_time}", end="")
if finished_jobs < total_jobs:
print(f" ({total_jobs - finished_jobs} failed)")
else:
print("")
def print_help():
print(
"""
add <url> - Add a feed (<url> must point to feed directly).
delete <n> - Delete feed(s).
change <n> - Change the name/category of feed(s).
show [all|unread|<n>] - Show all feeds, unread entries, or the given item(s)>
import [file] - Import feeds from [file] or feeds.opml>
export [file] - Export feeds to [file] or feeds.opml.
refresh - Refresh all feeds to check for new content.
mark <n> - Mark item(s) as read.
unmark <n> - Mark item(s) as unread.
open <n> - Open item(s) in the browser.
get <n> - Download item(s) from the Internet.
vacuum - Clean up free space in the database.
help|? - Print this message.
/<query> - Search the current items.
quit - Exit fenen.
<n> can be one or more integers or ranges of integers separated by commas. For
example, "2" or "1-4,8,10".
Short forms of commands may also be used. For example "s u" instead of "show
unread" or "d" instead of "delete". "list" and "print" are aliases of "show".
"""
)
conf = Config()
conf.parse_config()
db = Database(conf.get_value("db"))
db.init_db()
ui = UIContext()
print("Fenen Feed Reader v0.1.0/2023-07-26. Type ? for help.")
if __name__ == "__main__":
while True:
try:
command = input(f"fenen ({ui.context})> ").split()
except EOFError:
print("")
sys.exit(0)
if not command:
continue
# Special handling for "/<query>" command
if command[0][0] == "/" and len(command[0]) > 1:
command.append(command[0][1:])
command[0] = command[0][0]
# Allow ed-like syntax (e.g. s1, o2-5 in addition to s 1, o 2-5)
elif (
len(command[0]) > 1
and command[0][0].isalpha()
and (command[0][1].isdigit() or command[0][1] == "*")
):
command.append(command[0][1:])
command[0] = command[0][0]
args = command[1:]
command = command[0]
if "add".startswith(command):
add_feed(args[0]) if args else print("Usage: add <url>")
elif "delete".startswith(command):
delete_feed(args[0]) if args else print("Usage: delete <number(s)>")
elif "change".startswith(command):
change_feed(args[0]) if args else print("Usage: change <number(s)>")
elif "export".startswith(command):
export_feeds(args[0]) if args else export_feeds("feeds.opml")
elif "import".startswith(command):
import_feeds(args[0]) if args else import_feeds("feeds.opml")
elif "mark".startswith(command):
mark_read(args[0], True) if args else print("Usage: mark <number(s)>")
elif "unmark".startswith(command):
mark_read(args[0], False) if args else print("Usage: unmark <number(s)>")
elif "open".startswith(command):
open_in_browser(args[0]) if args else open_in_browser(None) if ui.data[
"post_url"
] else print("Usage: open <number(s)>")
elif "get".startswith(command):
download_entry(args[0]) if args else download_entry(None) if ui.data[
"post_url"
] else print("Usage: get <number(s)>")
elif "refresh".startswith(command):
refresh_feeds()
elif (
"show".startswith(command)
or "list".startswith(command)
or "print".startswith(command)
):
show(args[0]) if args else show()
elif "vacuum".startswith(command):
db.vacuum()
elif "help".startswith(command) or "?".startswith(command):
print_help()
elif "/".startswith(command):
search(args[0]) if args else print("Usage: /<query>")
elif "quit".startswith(command):
sys.exit(0)
else:
print(f"Unrecognized command: {command}. Try 'help' for help.")

30
html_converter.py Normal file
View File

@ -0,0 +1,30 @@
from html.parser import HTMLParser
class HTMLConverter(HTMLParser):
text = ""
links = []
def __init__(self):
HTMLParser.__init__(self)
self.text = ""
self.links = []
def handle_data(self, data):
self.text += data
def handle_starttag(self, tag, attrs):
if tag in ["img", "video", "audio"]:
attrs = dict((key, value) for key, value in attrs)
title = attrs["title"] + " " if "title" in attrs.keys() else ""
src = attrs["src"] if "src" in attrs.keys() else ""
alt = attrs["alt"] if "alt" in attrs.keys() else ""
self.text += f'[{tag.upper()}: {title}{src} - "{alt}"]'
elif tag == "a":
attrs = dict((key, value) for key, value in attrs)
href = attrs["href"] if "href" in attrs else ""
self.links.append(f"[{len(self.links)}] {href}")
def handle_endtag(self, tag):
if tag == "a":
self.text += f"[{len(self.links) - 1}]"

23
ui_context.py Normal file
View File

@ -0,0 +1,23 @@
class UIContext:
context = "all"
prev_context = ""
data = {
"feed": "",
"post_url": "",
"search_query": "",
"items": [],
}
def revert_context(self):
self.context, self.prev_context = self.prev_context, self.context
def change_context(self, new_context):
if new_context not in ["all", "feed", "unread", "search"]:
raise NotImplementedError(f"There is no UI context {new_context}.")
self.prev_context = self.context
self.context = new_context
# Clear data fields so there's no lingering state from previous context
self.data["feed"] = ""
self.data["post_url"] = ""
self.data["search_query"] = ""
self.data["items"] = []