Web-Scraper mit Postgres bauen
Warum? Weil ich wissen wollte, ob es geht, aber das kennen wir ja. Für die meisten Anwendungsfälle mag das keine gute Idee sein, aber es macht auf jeden Fall Spaß.
Docker-Image und Installation der Extension
Das wichtigste Element ist die Postgres-Extension pgsql-http, die es uns erlaubt, HTTP-Aufrufe direkt aus Postgres heraus zu machen. Diese Extension ist nicht standardmäßig enthalten, daher müssen wir ein eigenes Docker-Image erstellen, das diese Abhängigkeit beinhaltet. Ein einfaches Dockerfile wie das folgende reicht aus:
FROM postgres:17
RUN apt-get update \
&& apt-get install -y curl \
&& apt-get install -y libcurl4-openssl-dev make gcc g++ \
&& apt-get install -y postgresql-server-dev-17 \
&& apt-get -y install postgresql-17-http
Hinweis: Falls postgresql-17-http nicht direkt verfügbar ist, kann die Extension auch manuell über PGXN oder durch Kompilieren installiert werden. Um die HTTP-Extension zu kompilieren, sind zusätzliche Pakete erforderlich.
Sobald du das Image gestartet hast, musst du die Extension laden. Du kannst die Tabelle pg_extension überprüfen, um sicherzustellen, dass die Extension erfolgreich installiert wurde.
CREATE SCHEMA IF NOT EXISTS scrapedb; -- Stelle sicher, dass das Schema existiert
CREATE EXTENSION http SCHEMA scrapedb;
SELECT * FROM pg_extension;
Grundlegende Tabellen und Typen
Um einen einfachen Scraper zu implementieren, benötigen wir ein paar Dinge:
- Entrypoint: Eine URL, wo das Scraping beginnt.
- Tasks: Eine Liste von Aufgaben, die der Scraper abarbeiten soll. Jede Aufgabe sollte einen Status haben, wie
available,in_flight,finishedoderfailed. - Title: Ein Ort, um die Ergebnisse (Filmtitel) zu speichern, die wir scrapen wollen.
- Liste von Mustern und Handlern: Damit wir wissen, wie wir jede Task-URL mit dem Status
availablebearbeiten.
Erstellung der Tabellen
CREATE TYPE status_enum AS ENUM ('failed', 'available', 'in_flight', 'finished');
CREATE TABLE task (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL,
status status_enum NOT NULL DEFAULT 'available',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE title (
id SERIAL PRIMARY KEY,
original_title TEXT,
original_release_year INT,
director TEXT,
object_type TEXT NOT NULL
);
Damit Scrapes effizient durchgeführt werden können, sollten wir Indizes auf häufig verwendete Spalten setzen:
CREATE INDEX idx_task_status ON task(status);
CREATE INDEX idx_task_url ON task(url);
Scraping-Mechanismus
Scrape-Start
Wir wollen eine Funktion, um Scraping-Aufträge zu starten:
CREATE OR REPLACE FUNCTION start_scrape(url_input TEXT)
RETURNS VOID AS $$
BEGIN
INSERT INTO task (url, status)
VALUES (url_input, 'available');
END;
$$
LANGUAGE plpgsql;
HTTP-Anfrage verarbeiten
BEGIN
FOR rec IN
SELECT *
FROM http_get(task_url)
LOOP
IF rec.status = 200 THEN
UPDATE task SET status = 'finished' WHERE url = task_url;
RETURN QUERY SELECT rec.content::jsonb, rec.status;
ELSE
UPDATE task SET status = 'failed' WHERE url = task_url;
RETURN QUERY SELECT '{}'::jsonb, rec.status;
END IF;
END LOOP;
EXCEPTION WHEN OTHERS THEN
UPDATE task SET status = 'failed' WHERE url = task_url;
RETURN QUERY SELECT '{}'::jsonb, 500; -- Hier könnte eine spezifische Fehlerbehandlung eingeführt werden (z.B. für 4xx/5xx Fehlercodes).
END;
END;
$$
LANGUAGE plpgsql;
Automatische Verarbeitung neuer Tasks
Prozess zur Bearbeitung neuer Tasks:
CREATE OR REPLACE FUNCTION process_new_task()
RETURNS TRIGGER AS $$
DECLARE
rec RECORD;
payload JSONB;
BEGIN
IF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/titles/top_x/object_type%' THEN
WITH resp AS (
SELECT content AS resp_body, status FROM process_task(NEW.url)
), extract_ids AS (
SELECT array_agg((elem->>'id')::int) AS ids
FROM (SELECT jsonb_array_elements(resp_body) AS elem FROM resp) subquery
), tasks AS (
SELECT 'https://apis.example.com/object_type/movie?id='|| unnest(ids) AS url
FROM extract_ids
)
INSERT INTO task (url, status)
SELECT url, 'available'::status_enum FROM tasks;
ELSIF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/object_type%' THEN
FOR rec IN
SELECT content::jsonb AS payload, status FROM process_task(NEW.url)
LOOP
payload := rec.payload;
INSERT INTO title (
original_title, original_release_year, director, object_type
)
SELECT
payload->>'original_title',
(payload->>'original_release_year')::INT,
payload->>'director',
payload->>'object_type'
ON CONFLICT (id) DO NOTHING; -- Hier könnte eine Strategie zur Fehlerbehandlung bei Konflikten eingeführt werden.
END LOOP;
END IF;
RETURN NEW; -- Rückgabe des neuen Datensatzes.
END;
$$
LANGUAGE plpgsql;
Piehnat