Warum? Weil ich wissen wollte, ob es geht, aber das kennen wir ja. Für die meisten Anwendungsfälle mag das keine gute Idee sein, aber es macht auf jeden Fall Spaß.
Docker-Image und Installation der Extension
Das wichtigste Element ist die Postgres-Extension pgsql-http
, die es uns erlaubt, HTTP-Aufrufe direkt aus Postgres heraus zu machen. Diese Extension ist nicht standardmäßig enthalten, daher müssen wir ein eigenes Docker-Image erstellen, das diese Abhängigkeit beinhaltet. Ein einfaches Dockerfile wie das folgende reicht aus:
FROM postgres:17
RUN apt-get update \
&& apt-get install -y curl \
&& apt-get install -y libcurl4-openssl-dev make gcc g++ \
&& apt-get install -y postgresql-server-dev-17 \
&& apt-get -y install postgresql-17-http
Hinweis: Falls postgresql-17-http
nicht direkt verfügbar ist, kann die Extension auch manuell über PGXN oder durch Kompilieren installiert werden. Um die HTTP-Extension zu kompilieren, sind zusätzliche Pakete erforderlich.
Sobald du das Image gestartet hast, musst du die Extension laden. Du kannst die Tabelle pg_extension
überprüfen, um sicherzustellen, dass die Extension erfolgreich installiert wurde.
CREATE SCHEMA IF NOT EXISTS scrapedb; -- Stelle sicher, dass das Schema existiert
CREATE EXTENSION http SCHEMA scrapedb;
SELECT * FROM pg_extension;
Grundlegende Tabellen und Typen
Um einen einfachen Scraper zu implementieren, benötigen wir ein paar Dinge:
- Entrypoint: Eine URL, wo das Scraping beginnt.
- Tasks: Eine Liste von Aufgaben, die der Scraper abarbeiten soll. Jede Aufgabe sollte einen Status haben, wie
available
,in_flight
,finished
oderfailed
. - Title: Ein Ort, um die Ergebnisse (Filmtitel) zu speichern, die wir scrapen wollen.
- Liste von Mustern und Handlern: Damit wir wissen, wie wir jede Task-URL mit dem Status
available
bearbeiten.
Erstellung der Tabellen
CREATE TYPE status_enum AS ENUM ('failed', 'available', 'in_flight', 'finished');
CREATE TABLE task (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL,
status status_enum NOT NULL DEFAULT 'available',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE title (
id SERIAL PRIMARY KEY,
original_title TEXT,
original_release_year INT,
director TEXT,
object_type TEXT NOT NULL
);
Damit Scrapes effizient durchgeführt werden können, sollten wir Indizes auf häufig verwendete Spalten setzen:
CREATE INDEX idx_task_status ON task(status);
CREATE INDEX idx_task_url ON task(url);
Scraping-Mechanismus
Scrape-Start
Wir wollen eine Funktion, um Scraping-Aufträge zu starten:
CREATE OR REPLACE FUNCTION start_scrape(url_input TEXT)
RETURNS VOID AS $$
BEGIN
INSERT INTO task (url, status)
VALUES (url_input, 'available');
END;
$$
LANGUAGE plpgsql;
HTTP-Anfrage verarbeiten
BEGIN
FOR rec IN
SELECT *
FROM http_get(task_url)
LOOP
IF rec.status = 200 THEN
UPDATE task SET status = 'finished' WHERE url = task_url;
RETURN QUERY SELECT rec.content::jsonb, rec.status;
ELSE
UPDATE task SET status = 'failed' WHERE url = task_url;
RETURN QUERY SELECT '{}'::jsonb, rec.status;
END IF;
END LOOP;
EXCEPTION WHEN OTHERS THEN
UPDATE task SET status = 'failed' WHERE url = task_url;
RETURN QUERY SELECT '{}'::jsonb, 500; -- Hier könnte eine spezifische Fehlerbehandlung eingeführt werden (z.B. für 4xx/5xx Fehlercodes).
END;
END;
$$
LANGUAGE plpgsql;
Automatische Verarbeitung neuer Tasks
Prozess zur Bearbeitung neuer Tasks:
CREATE OR REPLACE FUNCTION process_new_task()
RETURNS TRIGGER AS $$
DECLARE
rec RECORD;
payload JSONB;
BEGIN
IF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/titles/top_x/object_type%' THEN
WITH resp AS (
SELECT content AS resp_body, status FROM process_task(NEW.url)
), extract_ids AS (
SELECT array_agg((elem->>'id')::int) AS ids
FROM (SELECT jsonb_array_elements(resp_body) AS elem FROM resp) subquery
), tasks AS (
SELECT 'https://apis.example.com/object_type/movie?id='|| unnest(ids) AS url
FROM extract_ids
)
INSERT INTO task (url, status)
SELECT url, 'available'::status_enum FROM tasks;
ELSIF NEW.status = 'available' AND NEW.url LIKE '%/apis.example.com/object_type%' THEN
FOR rec IN
SELECT content::jsonb AS payload, status FROM process_task(NEW.url)
LOOP
payload := rec.payload;
INSERT INTO title (
original_title, original_release_year, director, object_type
)
SELECT
payload->>'original_title',
(payload->>'original_release_year')::INT,
payload->>'director',
payload->>'object_type'
ON CONFLICT (id) DO NOTHING; -- Hier könnte eine Strategie zur Fehlerbehandlung bei Konflikten eingeführt werden.
END LOOP;
END IF;
RETURN NEW; -- Rückgabe des neuen Datensatzes.
END;
$$
LANGUAGE plpgsql;