Mejoras a la hora de generar un email diario a partir de un feed RSS (Slashdot)

En Generar un email diario a partir de un feed RSS (Slashdot) publico un pequeño programa en Python para enviarme un correo electrónico diario con los artículos publicados en Slashdot desde la última notificación.

Este programa me ha servido bien durante años, pero había un problema desconcertante: El programa se ejecuta por cron una vez por hora y se supone que tendría que enviarme el resumen diario entre las cuatro y cinco de la tarde. Normalmente era así, pero no siempre. A veces el email se enviaba horas más tarde.

La cosa no era grave, solo molesta, y nunca tenía tiempo para investigarla. Cuando por fin me metí en harina, tardé un buen ratillo en localizar el problema:

El programa hace una petición HTTP condicional al feed RSS de Slashdot para no machacar el servidor y no trabajar nosotros tampoco en vano si no ha habido cambios. La cuestión es que, si no hay nada nuevo que procesar, el programa termina inmediatamente. Es decir, no llega a la parte de "es hora de mandar el email". En resumen, el correo electrónico se enviaba solo cuando era más tarde de las 16:00 y, además, había entrado algún artículo nuevo en el feed RSS.

Una vez determinada la causa última, la solución es simple: procesar el feed RSS aunque no haya novedades. La cosa parecía sencilla, pero al final no lo fue tanto. No guardo el feed RSS original así que tengo que hacer chanchullos para que mi programa no considere que ha perdido datos. La lógica es innecesariamente compleja, el programa exige una refactorización urgente. Durante el proceso detecté otro problema: si en un momento dado el programa detecta que se han perdido datos, pero no ha llegado aún el momento de enviar el correo electrónico, no nos enteramos. Compliqué el código una vez más para que el estado "pérdida de datos" sea persistente y solo se limpie cuando se ha enviado el correo electrónico notificándolo.

El resultado es un programa que funciona correctamente, pero innecesariamente complejo y bastante frágil. Cuando se llega a este estado urge refactorizar o, con un programa tan corto, directamente reescribirlo por completo.

Las diferencias respecto a la versión anterior son:

slashdot-20181015-ddeb027b3f1c.diff (Código fuente)

--- slashdot-20181015.py        2019-10-12 02:28:46.020075954 +0200
+++ slashdot-ddeb027b3f1c.py    2021-02-11 20:18:38.341490217 +0100
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3


-# (c) 2018 Jesús Cea Avión - jcea@jcea.es - https://www.jcea.es/
+# (c) 2018-2020 Jesús Cea Avión - jcea@jcea.es - https://www.jcea.es/
 # This code is licensed under AGPLv3.


@@ -28,6 +28,7 @@

         self._procesados = data['procesados']
         self._buckets = data['buckets']
+        self._solapamiento = data.get('solapamiento', True)
         self._procesados_nuevos = set()
         self.etag = data['etag']
         self.modified = data['modified']
@@ -41,10 +42,14 @@
             v['modified'] = self.modified
         return v

+    def notificado(self):
+        self._solapamiento = True
+        self.cambiado = True
+
     def solapamiento(self):
         if self.saved:
             return self._solapamiento
-        return not self._procesados.isdisjoint(self._procesados_nuevos)
+        return self._solapamiento and not self._procesados.isdisjoint(self._procesados_nuevos)

     def update(self, guid, ts, title, link, summary):
         self._procesados_nuevos.add(guid)
@@ -65,6 +70,10 @@

         return self.update(guid, ts, title, link, summary)

+    def feed304(self):
+        self._procesados_nuevos.update(self._procesados)
+        return self.etag, self.modified
+
     def itera_antiguos(self, horas=16):
         ts_antiguo = datetime.datetime.now()
         ts_antiguo -= datetime.timedelta(seconds=horas * 60 * 60)
@@ -99,6 +108,8 @@
         if not self.cambiado:
             return

+        self._solapamiento = self.solapamiento()
+
         procesados = self._procesados_nuevos
         if not purga:
             procesados.update(self._procesados)
@@ -106,6 +117,7 @@
         with open(path + '.NEW', 'wb') as f:
             data = {'procesados': procesados,
                     'buckets': self._buckets,
+                    'solapamiento': self._solapamiento,
                     'etag': etag, 'modified': modified,
                     }
             pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
@@ -115,7 +127,6 @@
         self.cambiado = False

         # Tras el save no deberíamos usar más este objeto
-        self._solapamiento = self.solapamiento()
         self.saved = True
         del self._procesados
         del self._procesados_nuevos
@@ -136,20 +147,21 @@
     finally:
         socket.setdefaulttimeout(timeout)

-    if feed.status == 304:
-        # print("Sin cambios")
-        return
-
-    for entry in feed.entries:
-        summary = BeautifulSoup(entry["summary"], 'html.parser')
-        summary = list(summary.children)[0]
-        entradas.new(entry['id'], entry.updated_parsed,
-                     entry['title'], entry['link'], summary)
-
-    no_solapamiento = ''
-    if not entradas.solapamiento():
-        no_solapamiento = '<font size=+2><b>HEMOS PERDIDO ENTRADAS</b></font>'
-        no_solapamiento += '<br/><br/>\n'
+    if feed.status == 200:
+        etag, modified = feed.get('etag'), feed.get('modified')
+        for entry in feed.entries:
+            summary = BeautifulSoup(entry["summary"], 'html.parser')
+            summary = list(summary.children)[0]
+            entradas.new(entry['id'], entry.updated_parsed,
+                         entry['title'], entry['link'], summary)
+    elif feed.status == 304:
+        # En el 304 no se manda ETAG ni Modified, así que coge los que comprobamos
+        # en la petición condicional.
+        etag, modified = entradas.feed304()
+    else:
+        raise RuntimeError(f'Estado: {feed.status}')
+
+    # Si es 304, procesamos lo viejo por si tiene que salir el email.

     html_desc = html_links = ''
     entradas_a_borrar = []
@@ -164,9 +176,14 @@
     # lo cierto es que queremos actualizar el etag y el modified AUNQUE
     # no haya habido cambios en el feed. Esto es algo a mejorar en el futuro.

-    entradas.save(feed.get('etag'), feed.get('modified'))
+    entradas.save(etag, modified)

     if html_desc:
+        no_solapamiento = ''
+        if not entradas.solapamiento():
+            no_solapamiento = '<font size=+2><b>HEMOS PERDIDO ENTRADAS</b></font>'
+            no_solapamiento += '<br/><br/>\n'
+
         html_links = f'<ul>\n{html_links}\n</ul>\n'
         html = f'''<html>
 <head>
@@ -196,14 +213,17 @@
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)

+        entradas.notificado()
+
     # XXX: Lo suyo sería grabar solo si hay cambios de verdad, pero
     # lo cierto es que queremos actualizar el etag y el modified AUNQUE
     # no haya habido cambios en el feed. Esto es algo a mejorar en el futuro.
     entradas = items()
+    etag, modified = entradas.feed304()  # XXX: Para evitar "no solapamiento"
     for guid, ts in entradas_a_borrar:
         entradas.borrar(guid, ts)

-    entradas.save(feed.get('etag'), feed.get('modified'), purga=False)
+    entradas.save(etag, modified, purga=False)


 if __name__ == '__main__':

El código completo es el siguiente:

slashdot-ddeb027b3f1c.py (Código fuente)

#!/usr/bin/env python3


# (c) 2018-2020 Jesús Cea Avión - jcea@jcea.es - https://www.jcea.es/
# This code is licensed under AGPLv3.


import os
import time
import subprocess
import pickle
from email.mime.text import MIMEText
import email.utils
import datetime
import socket

from bs4 import BeautifulSoup
import feedparser

path = 'items.pickle'


class items:
    def __init__(self):
        self.cambiado = False
        with open(path, 'rb') as f:
            data = pickle.load(f)

        self._procesados = data['procesados']
        self._buckets = data['buckets']
        self._solapamiento = data.get('solapamiento', True)
        self._procesados_nuevos = set()
        self.etag = data['etag']
        self.modified = data['modified']
        self.saved = False

    def condicional(self):
        v = {}
        if self.etag:
            v['etag'] = self.etag
        if self.modified:
            v['modified'] = self.modified
        return v

    def notificado(self):
        self._solapamiento = True
        self.cambiado = True

    def solapamiento(self):
        if self.saved:
            return self._solapamiento
        return self._solapamiento and not self._procesados.isdisjoint(self._procesados_nuevos)

    def update(self, guid, ts, title, link, summary):
        self._procesados_nuevos.add(guid)
        bucket = (ts.tm_year, ts.tm_mon, ts.tm_mday)
        if bucket not in self._buckets:
            self._buckets[bucket] = {}
        if (ts, guid) in self._buckets[bucket]:
            return
        self._buckets[bucket][(ts, guid)] = (title, link, summary)
        self.cambiado = True

    def new(self, guid, ts, title, link, summary):
        if guid in self._procesados:
            self._procesados_nuevos.add(guid)
            return
        if guid in self._procesados_nuevos:
            return

        return self.update(guid, ts, title, link, summary)

    def feed304(self):
        self._procesados_nuevos.update(self._procesados)
        return self.etag, self.modified

    def itera_antiguos(self, horas=16):
        ts_antiguo = datetime.datetime.now()
        ts_antiguo -= datetime.timedelta(seconds=horas * 60 * 60)
        bucket_antiguo = (ts_antiguo.year, ts_antiguo.month, ts_antiguo.day)

        entradas = []
        for bucket_name, bucket in self._buckets.items():
            if bucket_name >= bucket_antiguo:
                continue

            for (ts, guid), (title, link, summary) in bucket.items():
                entradas.append((ts, guid, title, link, summary))

        for entrada in sorted(entradas, reverse=True):
            yield entrada

    def borrar(self, guid, ts):
        bucket = (ts.tm_year, ts.tm_mon, ts.tm_mday)
        del self._buckets[bucket][(ts, guid)]
        if not self._buckets[bucket]:
            del self._buckets[bucket]
        self.cambiado = True

    def save(self, etag, modified, purga=True):
        # XXX: Esto es necesario para que se guarden las cabeceras
        # de peticiones condicionales AUNQUE no haya habido cambios
        # en el propio feed. Algo a mejorar en el futuro, por ejemplo
        # pudiendo grabar solo eso.
        if (etag != self.etag) or (modified != self.modified):
            self.cambiado = True

        if not self.cambiado:
            return

        self._solapamiento = self.solapamiento()

        procesados = self._procesados_nuevos
        if not purga:
            procesados.update(self._procesados)

        with open(path + '.NEW', 'wb') as f:
            data = {'procesados': procesados,
                    'buckets': self._buckets,
                    'solapamiento': self._solapamiento,
                    'etag': etag, 'modified': modified,
                    }
            pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
            f.flush()
            os.fsync(f.fileno())
        os.rename(path + '.NEW', path)
        self.cambiado = False

        # Tras el save no deberíamos usar más este objeto
        self.saved = True
        del self._procesados
        del self._procesados_nuevos


def main():
    entradas = items()

    condicional = entradas.condicional()

    # XXX: Este timeout no es problema porque no tenemos múltiples hilos.
    # Habría que usar mejor "request" o similar.
    timeout = socket.getdefaulttimeout()
    socket.setdefaulttimeout(10)
    try:
        feed = feedparser.parse(
                'http://rss.slashdot.org/Slashdot/slashdotMain', **condicional)
    finally:
        socket.setdefaulttimeout(timeout)

    if feed.status == 200:
        etag, modified = feed.get('etag'), feed.get('modified')
        for entry in feed.entries:
            summary = BeautifulSoup(entry["summary"], 'html.parser')
            summary = list(summary.children)[0]
            entradas.new(entry['id'], entry.updated_parsed,
                         entry['title'], entry['link'], summary)
    elif feed.status == 304:
        # En el 304 no se manda ETAG ni Modified, así que coge los que comprobamos
        # en la petición condicional.
        etag, modified = entradas.feed304()
    else:
        raise RuntimeError(f'Estado: {feed.status}')

    # Si es 304, procesamos lo viejo por si tiene que salir el email.

    html_desc = html_links = ''
    entradas_a_borrar = []
    for ts, guid, title, link, summary in entradas.itera_antiguos():
        link = f'<a href="{link}">{title}</a>'
        html_links += f'<li>{link}</li>\n'
        html_desc += (f'<b>{link}</b></br>\n<font size=-2>\n'
                      f'{summary}\n</font></p>')
        entradas_a_borrar.append((guid, ts))

    # XXX: Lo suyo sería grabar solo si hay cambios de verdad, pero
    # lo cierto es que queremos actualizar el etag y el modified AUNQUE
    # no haya habido cambios en el feed. Esto es algo a mejorar en el futuro.

    entradas.save(etag, modified)

    if html_desc:
        no_solapamiento = ''
        if not entradas.solapamiento():
            no_solapamiento = '<font size=+2><b>HEMOS PERDIDO ENTRADAS</b></font>'
            no_solapamiento += '<br/><br/>\n'

        html_links = f'<ul>\n{html_links}\n</ul>\n'
        html = f'''<html>
<head>
<style>
a {{text-decoration: none;}}
</style>
</head>
<body>
{no_solapamiento}
{html_links}
{html_desc}
</body>
</html>
'''

        msg = MIMEText(html, 'html')
        fecha = f'{time.strftime("%Y-%m-%d", feed.updated_parsed)}'
        msg['Subject'] = f'Feed Slashdot {fecha}'
        msg['To'] = 'jcea@jcea.es'
        msg['Date'] = email.utils.formatdate()
        msg['List-Id'] = 'Notificaciones Slashdot'
        msg['Message-Id'] = email.utils.make_msgid(domain='P2Ppriv')
        msg = msg.as_bytes()

        subprocess.run(['mail', 'jcea@jcea.es'],
                       input=msg, check=True, timeout=60,
                       stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE)

        entradas.notificado()

    # XXX: Lo suyo sería grabar solo si hay cambios de verdad, pero
    # lo cierto es que queremos actualizar el etag y el modified AUNQUE
    # no haya habido cambios en el feed. Esto es algo a mejorar en el futuro.
    entradas = items()
    etag, modified = entradas.feed304()  # XXX: Para evitar "no solapamiento"
    for guid, ts in entradas_a_borrar:
        entradas.borrar(guid, ts)

    entradas.save(etag, modified, purga=False)


if __name__ == '__main__':
    main()

Observa que la licencia de mi código es Affero General Public License v3. Por favor, respétala.

Aprendizajes y futuro

El programa funciona y hace exactamente lo que necesito. Si funciona, no lo toques, ya sabes. No obstante el código actual es innecesariamente complejo, y le vendría bien una reescritura:

  • Si me guardo una copia del feed RSS en el disco duro, se simplifican muchísimo las cosas:
    • La gestión de peticiones condicionales es trivial: Si el feed RSS no ha cambiado, podemos simplemente reprocesar la copia local. No hay casos especiales.
    • De hecho la comprobación de si hemos perdido o no datos es trivial analizando el feed RSS que nos envía el servidor y el último procesado que tenemos en local. La gestión de estado es mucho más sencilla.
  • El no poder modificar el estado una vez que lo hemos grabado (para volver a grabar la actualización después) es algo heredado que no tiene sentido ya, pero que me complica la vida sobremanera. Todo eso hay que rehacerlo.
  • Los esfuerzos heroicos por actualizar el estado persistente en disco duro solo cuando ha habido cambios, son innecesarios e injustificados.

No tocaré el programa mientras vaya cumpliendo con mis necesidades, pero debería tener la disciplina de reescribirlo si le empiezo a exigir más. Es muy posible que eso sea más rápido (y el resultado más elegante, claro) que intentar encajar otro chanchullo más encima de los que ya hay.