#!/usr/bin/env python3
# Use the proper idiom in the main module ...
# NOTE: See https://docs.python.org/3.11/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
if __name__ == "__main__":
# Import standard modules ...
import email
import email.message
import html
import json
import mimetypes
import shutil
import subprocess
import sys
import time
# Import special modules ...
try:
import lxml
import lxml.etree
except:
raise Exception("\"lxml\" is not installed; run \"pip install --user lxml\"") from None
# Import my modules ...
try:
import pyguymer3
except:
raise Exception("\"pyguymer3\" is not installed; you need to have the Python module from https://github.com/Guymer/PyGuymer3 located somewhere in your $PYTHONPATH") from None
# Check that "ssmtp" is installed ...
if shutil.which("ssmtp") is None:
raise Exception("\"ssmtp\" is not installed") from None
# Define settings ...
path = "/path/to/rss_checker.json"
# Define function ...
def construct_email(emailIn, feedTitleIn, postTitleIn, dateIn, linkIn, contentIn, thumbnailIn, sessIn, /):
# Check inputs ...
if feedTitleIn is None:
print("WARNING: \"feedTitleIn\" is None")
return False
if postTitleIn is None:
print("WARNING: \"postTitleIn\" is None")
return False
if dateIn is None:
print("WARNING: \"dateIn\" is None")
return False
# Create email ...
message = email.message.EmailMessage()
message["To"] = emailIn
message["Subject"] = f"New post in \"{feedTitleIn.text.strip()}\" feed"
message["From"] = "you@example.com"
# Create content ...
contentOut = f"Post Title: {postTitleIn.text.strip()}\n"
contentOut += f"Post Date: {dateIn.text.strip()}\n"
contentOut += f"Post Link: {linkIn}\n"
# Check if there is an article description ...
if contentIn is not None:
# Add the article description ...
contentOut += f"Post Description:\n{html.unescape(contentIn.text.strip())}\n"
# Set content ...
message.set_content(contentOut)
# Check if there is an article thumbnail ...
if thumbnailIn is not None:
# Obtain the thumbnail URL ...
url = thumbnailIn.attrib.get("url", "ERROR")
# Check that there is a thumbnail URL ...
if url != "ERROR":
# Download the thumbnail ...
cont = pyguymer3.download_stream(sessIn, url)
# Determine MIME type ...
ftype = mimetypes.guess_type(url, strict = True)[0]
if ftype is None:
ftype = "image/jpg"
# Create short-hands ...
maintype, subtype = ftype.split("/")
# Add the article thumbnail ...
message.add_attachment(
cont,
maintype = maintype,
subtype = subtype,
filename = f"thumbnail.{subtype}",
)
# Return the answer ...
return message
# Load data file as JSON ...
with open(path, "rt", encoding = "utf-8") as fObj:
data = json.load(fObj)
# Initialize counter and set limit ...
n = 0 # [#]
nlim = 30 # [#]
# Start session ...
with pyguymer3.start_session() as sess:
# Loop over feeds ...
for feed in data:
print(f"Processing \"{feed}\" ...")
# Download Atom/RSS (as a byte stream) ...
src = pyguymer3.download_stream(sess, feed)
if src is False:
print("WARNING: Failed to download the Atom/RSS feed.")
continue
if len(src) == 0:
print("WARNING: The Atom/RSS feed is empty.")
continue
# Parse Atom/RSS as XML with error handling ...
# NOTE: Atom/RSS feeds have a habit of being illegal XML. For
# example:
#
Cinthie's 'Soul, Strings & Samples' Mini Mix
# ... should be:
# Cinthie's 'Soul, Strings & Samples' Mini Mix
# ... therefore, I no longer use "xml.etree.ElementTree" but
# rather "lxml.etree" as it supports recovery of illegally
# specified characters.
root = lxml.etree.fromstring(src, parser = lxml.etree.XMLParser(recover = True))
# Determine the feed format ...
if root.tag == "{http://www.w3.org/2005/Atom}feed":
print(" It is an Atom feed")
# Loop over all entry tags in the feed ...
for entry in root.findall("{http://www.w3.org/2005/Atom}entry"):
# Find the link to the article ...
post = entry.find("{http://www.w3.org/2005/Atom}id").text.strip()
if not post.startswith("http"):
post = entry.find("{http://www.w3.org/2005/Atom}link").get("href").strip()
if not post.startswith("http"):
raise Exception("cannot find a post that starts with \"http\"") from None
# Correct for common bugs ...
post = post.replace("www.FreeBSD.org", "www.freebsd.org")
post = post.replace("www.freebsd.org//", "www.freebsd.org/")
# Skip this article if it has already been emailed ...
if post in data[feed]["posts"]:
continue
# Construct email ...
inp = construct_email(
data[feed]["email"],
root.find("{http://www.w3.org/2005/Atom}title"),
entry.find("{http://www.w3.org/2005/Atom}title"),
entry.find("{http://www.w3.org/2005/Atom}updated"),
post,
entry.find("{http://www.w3.org/2005/Atom}summary"),
entry.find("{http://search.yahoo.com/mrss/}thumbnail"),
sess,
)
if inp is False:
continue
# Send email and increment counter ...
subprocess.run(
["ssmtp", data[feed]["email"]],
check = True,
encoding = "utf-8",
input = inp.as_string(),
timeout = 60.0,
)
n += 1 # [#]
print(f" Sent email about \"{post}\"")
# Save article so that it is not sent again ...
data[feed]["posts"] = sorted(list(set(data[feed]["posts"] + [post])))
with open(path, "wt", encoding = "utf-8") as fObj:
json.dump(
data,
fObj,
ensure_ascii = False,
indent = 4,
sort_keys = True,
)
# Stop sending emails or wait so that this script does not
# spam the server ...
if n >= nlim:
print("Finishing cleanly; sent too many emails.")
sys.exit()
time.sleep(2.0)
elif root.tag == "rss":
print(" It is an RSS feed")
# Loop over all item tags in the first channel tag of the feed ...
for item in root.find("channel").findall("item"):
# Find the link to the article ...
post = item.find("link").text.strip()
if not post.startswith("http"):
raise Exception("cannot find a post that starts with \"http\"") from None
# Correct for common bugs ...
post = post.replace("www.FreeBSD.org", "www.freebsd.org")
post = post.replace("www.freebsd.org//", "www.freebsd.org/")
# Skip this article if it has already been emailed ...
if post in data[feed]["posts"]:
continue
# Construct email ...
inp = construct_email(
data[feed]["email"],
root.find("channel").find("title"),
item.find("title"),
item.find("pubDate"),
post,
item.find("description"),
item.find("{http://search.yahoo.com/mrss/}thumbnail"),
sess,
)
if inp is False:
continue
# Send email and increment counter ...
subprocess.run(
["ssmtp", data[feed]["email"]],
check = True,
encoding = "utf-8",
input = inp.as_string(),
timeout = 60.0,
)
n += 1 # [#]
print(f" Sent email about \"{post}\"")
# Save article so that it is not sent again ...
data[feed]["posts"] = sorted(list(set(data[feed]["posts"] + [post])))
with open(path, "wt", encoding = "utf-8") as fObj:
json.dump(
data,
fObj,
ensure_ascii = False,
indent = 4,
sort_keys = True,
)
# Stop sending emails or wait so that this script does not
# spam the server ...
if n >= nlim:
print("Finishing cleanly; sent too many emails.")
sys.exit()
time.sleep(2.0)
else:
raise Exception(f"\"{root.tag}\" is an unrecognized feed format") from None