Web-Scraper mit Postgres bauen

Warum? Weil ich wissen wollte, ob es geht, aber das kennen wir ja. Für die meisten Anwendungsfälle mag das keine gute Idee sein, aber es macht auf jeden Fall Spaß.

Docker-Image und Installation der Extension

Das wichtigste Element ist die Postgres-Extension pgsql-http, die es uns erlaubt, HTTP-Aufrufe direkt aus Postgres heraus zu machen. Diese Extension ist nicht standardmäßig enthalten, daher müssen wir ein eigenes Docker-Image erstellen, das diese Abhängigkeit beinhaltet. Ein einfaches Dockerfile wie das folgende reicht aus:

FROM postgres:17
RUN apt-get update \
    && apt-get install -y curl \
    && apt-get install -y libcurl4-openssl-dev make gcc g++ \
    && apt-get install -y postgresql-server-dev-17 \
    && apt-get -y install postgresql-17-http

Hinweis: Falls postgresql-17-http nicht direkt verfügbar ist, kann die Extension auch manuell über PGXN oder durch Kompilieren installiert werden. Um die HTTP-Extension zu kompilieren, sind zusätzliche Pakete erforderlich.

Sobald du das Image gestartet hast, musst du die Extension laden. Du kannst die Tabelle pg_extension überprüfen, um sicherzustellen, dass die Extension erfolgreich installiert wurde.

CREATE SCHEMA IF NOT EXISTS scrapedb; -- Stelle sicher, dass das Schema existiert
CREATE EXTENSION http SCHEMA scrapedb;
SELECT * FROM pg_extension;

Grundlegende Tabellen und Typen

Um einen einfachen Scraper zu implementieren, benötigen wir ein paar Dinge:

  • Entrypoint: Eine URL, wo das Scraping beginnt.
  • Tasks: Eine Liste von Aufgaben, die der Scraper abarbeiten soll. Jede Aufgabe sollte einen Status haben, wie availablein_flightfinished oder failed.
  • Title: Ein Ort, um die Ergebnisse (Filmtitel) zu speichern, die wir scrapen wollen.
  • Liste von Mustern und Handlern: Damit wir wissen, wie wir jede Task-URL mit dem Status available bearbeiten.

Erstellung der Tabellen

CREATE TYPE status_enum AS ENUM ('failed', 'available', 'in_flight', 'finished');

CREATE TABLE task (
    id SERIAL PRIMARY KEY,
    url TEXT NOT NULL,
    status status_enum NOT NULL DEFAULT 'available',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE title (
    id SERIAL PRIMARY KEY,
    original_title TEXT,
    original_release_year INT,
    director TEXT,
    object_type TEXT NOT NULL
);

 

Damit Scrapes effizient durchgeführt werden können, sollten wir Indizes auf häufig verwendete Spalten setzen:

CREATE INDEX idx_task_status ON task(status);
CREATE INDEX idx_task_url ON task(url);

Scraping-Mechanismus

Scrape-Start

Wir wollen eine Funktion, um Scraping-Aufträge zu starten:

CREATE OR REPLACE FUNCTION start_scrape(url_input TEXT)
RETURNS VOID AS $$
BEGIN
    INSERT INTO task (url, status)
    VALUES (url_input, 'available');
END;
$$
 LANGUAGE plpgsql;

HTTP-Anfrage verarbeiten

CREATE OR REPLACE FUNCTION process_task(task_url TEXT)
RETURNS TABLE(content JSONB, status INT) AS $$
DECLARE
    rec RECORD;
BEGIN
    UPDATE task
    SET status = 'in_flight'
    WHERE url = task_url;

    BEGIN
        FOR rec IN
            SELECT *
            FROM http_get(task_url)
        LOOP
            IF rec.status = 200 THEN
                UPDATE task SET status = 'finished' WHERE url = task_url;
                RETURN QUERY SELECT rec.content::jsonb, rec.status;
            ELSE
                UPDATE task SET status = 'failed' WHERE url = task_url;
                RETURN QUERY SELECT '{}'::jsonb, rec.status;
            END IF;
        END LOOP;
    EXCEPTION WHEN OTHERS THEN
        UPDATE task SET status = 'failed' WHERE url = task_url;
        RETURN QUERY SELECT '{}'::jsonb, 500; -- Hier könnte eine spezifische Fehlerbehandlung eingeführt werden (z.B. für 4xx/5xx Fehlercodes).
    END;
END;
$$
 LANGUAGE plpgsql;

Automatische Verarbeitung neuer Tasks

Prozess zur Bearbeitung neuer Tasks:

CREATE OR REPLACE FUNCTION process_new_task()
RETURNS TRIGGER AS $$
DECLARE
    rec RECORD;
    payload JSONB;
BEGIN
    IF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/titles/top_x/object_type%' THEN
        
        WITH resp AS (
            SELECT content AS resp_body, status FROM process_task(NEW.url)
        ), extract_ids AS (
            SELECT array_agg((elem->>'id')::int) AS ids
            FROM (SELECT jsonb_array_elements(resp_body) AS elem FROM resp) subquery
        ), tasks AS (
            SELECT 'https://apis.example.com/object_type/movie?id='|| unnest(ids) AS url
            FROM extract_ids
        )
        INSERT INTO task (url, status)
        SELECT url, 'available'::status_enum FROM tasks;

    ELSIF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/object_type%' THEN
        
        FOR rec IN
            SELECT content::jsonb AS payload, status FROM process_task(NEW.url)
        LOOP
            payload := rec.payload;
            INSERT INTO title (
                original_title, original_release_year, director, object_type
            )
            SELECT
                payload->>'original_title',
                (payload->>'original_release_year')::INT,
                payload->>'director',
                payload->>'object_type'
            ON CONFLICT (id) DO NOTHING; -- Hier könnte eine Strategie zur Fehlerbehandlung bei Konflikten eingeführt werden.
        END LOOP;
    END IF;

    RETURN NEW; -- Rückgabe des neuen Datensatzes.
END;
$$
 LANGUAGE plpgsql;

Trigger zur Automatisierung

CREATE TRIGGER new_available_task_trigger
AFTER INSERT ON task
FOR EACH ROW
EXECUTE FUNCTION process_new_task();

Hinweis: Dieser Trigger kann mehrere Instanzen eines Prozesses gleichzeitig auslösen, was zu Race Conditions führen kann. Es wäre sinnvoll zu erwähnen, dass dies zu parallelen Datenbankprozessen führen kann. Eine Möglichkeit, dies zu vermeiden, wäre, FOR UPDATE in der SQL-Anweisung zu verwenden oder die Nutzung von LISTEN/NOTIFY mit einem separaten Worker in Betracht zu ziehen.

Scraping starten und Ergebnisse prüfen

SELECT start_scrape('https://apis.example.com/titles/top_x/object_type?genre=action');
SELECT * FROM title;

Zusätzliche Überlegungen

  • Concurrency: Ein Mechanismus zur Vermeidung von Race Conditions ist wichtig. Eine Möglichkeit wäre die Verwendung von FOR UPDATE in den entsprechenden SQL-Transaktionen oder das Einführen eines asynchronen Modells mit einer Warteschlange. Auch könnte eine Sperre (z.B. mit pg_advisory_lock) für kritische Ressourcen sinnvoll sein.

  • Performance: Bei hoher Last wäre es ratsam, eine asynchrone Verarbeitung oder Batch-Updates zu implementieren. Eine Verbesserung könnte darin bestehen, mehrere Tasks auf einmal zu verarbeiten, anstatt sie einzeln abzuarbeiten.

  • Fehlertoleranz: Tasks könnten eine retry_count-Spalte erhalten, um Fehlschläge zu erfassen und mehrmals zu versuchen, bevor sie endgültig als „failed“ markiert werden. Eine einfache Strategie könnte so aussehen:

ALTER TABLE task ADD COLUMN retry_count INT DEFAULT 0;

-- In der process_task-Funktion könnte man dann prüfen:
IF rec.status != 200 AND NEW.retry_count < 3 THEN
    UPDATE task SET retry_count = retry_count + 1, status = 'available' WHERE url = task_url;
ELSE
    UPDATE task SET status = 'failed' WHERE url = task_url;
END IF;

Läuft bisher stabil🚀

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert