Detección de presencia por ARP, o cómo saber si tu hijo está en casa (II): Ahora con AsyncIO

En Detección de presencia por ARP, o cómo saber si tu hijo está en casa publiqué un programa Python que notifica por Telegram cuando un dispositivo concreto se conecta o desconecta de tu red. Antes de seguir leyendo, te recomiendo que repases ese artículo.

El código me ha servido bien, pero tiene algunos detalles que no me acaban de gustar:

  • Una vez por minuto se lanzan cientos de procesos Ping para sondear la red. Aunque la Raspberry PI donde está funcionando este sensor va sobrada de recursos y no he tenido problemas con esas avalanchas, resulta algo poco elegante.
  • Lanzo un proceso externo para leer la tabla ARP.

Publico aquí una nueva versión del código Python, esta vez usando la biblioteca asyncio de Python 3:

arp-20180927.py (Código fuente)

#!/usr/bin/env python3

# (c) 2018 Jesús Cea Avión - jcea@jcea.es - https://www.jcea.es/
# This code is licensed under AGPLv3.

import time
import socket
import threading
import asyncio
import random

import telegram.ext


MAC = 'XX:XX:XX:XX:XX:XX'
TELEGRAM_TOKEN = '000000000:XXXXXXXXXXXXXXXXXXXX'
network = '192.168.1.0'
# Usuarios autorizados
# jcea y XXX
usuarios = (000000, 000000)


# Esperamos a tener red porque no queremos que se pierda la
# primera notificación de arranque.
# Además, no queremos un falso negativo cuando aún no tenemos red.
while True:
    try:
        socket.gethostbyname('api.telegram.org')
        break
    except socket.gaierror:
        time.sleep(1)

updater = telegram.ext.updater.Updater(TELEGRAM_TOKEN)
updater.start_polling(timeout=60)
job_queue = updater.job_queue

current_message = ''


def callback(bot, update):
    usuario = update.effective_user['id']
    if usuario in usuarios:
        bot.send_message(usuario, text=current_message, parse_mode='HTML')


updater.dispatcher.add_handler(
        telegram.ext.MessageHandler(
            filters=(telegram.ext.filters.Filters.user(user_id=usuarios) &
                     telegram.ext.filters.Filters.text),
            callback=callback,
            ))


def msg(bot, job):
    texto = job.context['msg']
    for i in usuarios:
        bot.send_message(i, text=texto, parse_mode='HTML')


# Usa TCP
async def conexion2(ip, loop):
    try:
        with socket.socket() as sock:
            sock.setblocking(False)
            fut = loop.sock_connect(sock, (ip, 12345))
            try:
                await asyncio.wait_for(fut, timeout=1)
            except asyncio.TimeoutError:
                pass
    except Exception:
        pass


async def conexion(ip, loop):
    sleep_time = 59.5 * random.random()
    while True:
        await conexion2(ip, loop)
        await asyncio.sleep(sleep_time + random.random(), loop=loop)
        sleep_time = 59.5


async def background_async(first_pass, loop):
    net = '.'.join(network.split('.')[:3])
    futures = []
    for i in range(1, 255):
        ip = net + '.' + str(i)
        futures.append(conexion2(ip, loop))

    await asyncio.wait(futures, loop=loop)
    first_pass.set()

    futures = []
    for i in range(1, 255):
        ip = net + '.' + str(i)
        futures.append(conexion(ip, loop))

    # Esto no termina nunca
    await asyncio.wait(futures, loop=loop)


def background(first_pass):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    task = loop.create_task(background_async(first_pass, loop))
    return loop.run_until_complete(task)


# import subprocess
# def presente():
#     r = subprocess.run(
#             ['/usr/sbin/arp', '-n'],
#             universal_newlines=True,
#             stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
#             timeout=60, check=True)
#     return r.stdout.find(MAC) != -1


# Más eficiente pero no portable
def presente():
    with open('/proc/net/arp') as f:
        tabla_arp = f.read()
    return tabla_arp.find(MAC) != -1


def main():
    global current_message

    presencia = None
    prefix = ' <b>Arranque del sistema:</b>'

    first_pass = threading.Event()
    t = threading.Thread(target=background, args=(first_pass,), daemon=True)
    t.start()

    first_pass.wait()

    while True:
        presencia2 = presente()
        if presencia != presencia2:
            if presencia2:
                current_message = ('%s:%s La luz está encendida'
                                   % (time.ctime(), prefix))
            else:
                current_message = ('%s:%s La luz <b>ESTÁ APAGADA</b>'
                                   % (time.ctime(), prefix))
            print(current_message)

            job_queue.run_once(msg, when=0, context={'msg': current_message})

            presencia = presencia2
            prefix = ''

        time.sleep(60)


if __name__ == '__main__':
    main()

En programa se configura en las líneas 15-20. En las líneas 26-31 el programa espera hasta tener red y luego inicializa la parte Telegram (líneas 33-35).

Seguidamente pasamos al final del código. En las líneas 131-133 lanzamos un hilo separado para no interferir con la parte de Telegram. Ese hilo será quien haga la mayor parte de trabajo. El hilo principal en sí esperará en la línea 135 a que haya una primera pasada de búsqueda y luego entra en un bucle infinito en el que, cada minuto, comprueba el estado de presencia y envía los mensajes correspondientes por Telegram: arranque del sistema, que el dispositivo ha salido de la red y que el dispositivo ha entrado en la red.

Para ello debe acceder a la tabla ARP del sistema operativo (línea 138), utilizando la rutina en las líneas 119-122. Es de señalar que esa rutina no es portable y, tal y como esta escrita, solo está disponible en Linux. Otros sistemas operativos tienen mecanismos similares. En Solaris, por ejemplo, puedes probar con man -s7p arp. He mantenido el código antiguo en forma de comentarios en las líneas 108-115.

La parte interesante se ha lanzado en un hilo separado en la línea 133. Las líneas 101-105 crean un bucle de eventos explícito nuevo para AsyncIO y lanzan una tarea que, salvo errores serios, se va a ejecutar eternamente.

Esa tarea asíncrona empieza haciendo una sondeo inicial a toda la red (líneas 83-87). El sondeo de cada dirección IP de la red local se hace en un future [1] separado, aprovechando AsyncIO. Se espera a que terminen todas (línea 89) y se notifica al hilo principal que ya tenemos un resultado (línea 90). A continuación se lanzan nuevos futures para sondear cada una de las direcciones IP de la red. Dichos futures (líneas 74-79) realizan un sondeo por minuto, aproximadamente, intentando que los sondeos se repartan uniformemente a lo largo del minuto. Estos futures no terminan nunca.

La parte de sondeo en sí se realiza en las líneas 61-71, usando también AsyncIO. Intentamos conectar por TCP, pero solo le damos un segundo de tiempo y no nos importa en realidad si realmente ocurrió una conexión TCP o no. Lo que nos interesa es el intento de resolución ARP. Sería más ligero utilizar UDP, pero la integración con AsyncIO es un poquito más complicada y, en realidad, solo habrá tráfico TCP para direcciones IP que realmente existen en la red local. La inmensa mayoría de los intentos de conectar a direcciones IP inexistentes se abortan al no poder completar la resolución ARP. No obstante, se trata de una mejora futura.

En resumen, este programa utiliza un par de hilos, uno para gestionar Telegram y otro para gestionar toda la parte AsyncIO, que mantiene unos 254 futures activos concurrentemente. No lanza procesos externos y el consumo de recursos es bajo y estable con el tiempo, sin picos como ocurría en la versión anterior.

La parte Telegram (líneas 40-57) se explicó ya en Detección de presencia por ARP, o cómo saber si tu hijo está en casa.

[1] Estoy usando aquí el término future como genérico, no exactamente lo que la biblioteca AsyncIO entiende por future. El código tampoco es muy consistente con la nomenclatura oficial, bastante confusa. No voy a definir ahora lo que es una corrutina :-).