Pipeline overview
このコンテンツはまだ日本語訳がありません。
The Metaweave pipeline is a Python ETL that consumes the form’s emails, decrypts them, and upserts to PostgreSQL. It runs as a single CLI — python -m src.main — with no daemon, no queue, no message broker. Designed to be triggered on a schedule.
Data flow
Vessel-side form (HTML) │ ▼ Submit → AES-128-CBC encrypted JSON in email body │Microsoft Outlook ─── shared mailbox ────► │ ▼ fetch unread "Metaweave Forms:" emails │src/fetcher.py ← Microsoft Graph API + MSAL OAuth2 │ ▼ body text + parsed subject (vessel, type, date) │src/parser.py ← extract BEGIN/END markers, AES decrypt → JSON dict │ ▼ ParseResult(form_data, report_type_raw, form_version) │src/mapper.py ← 92 scalars + 11 arrays → SQLAlchemy model objects │ ▼ dict { vessel_info, voyage_no, report, events, bunker_rob, … } │src/writer.py ← upsert Vessel, upsert Voyage, delete-then-insert Report + children │ ▼ CASCADE delete on report replacement │PostgreSQL (Cloud SQL) │17 metaweave_* tables: Vessel → Voyage → Report → 11 child tablesStages
Fetcher Microsoft Graph API. OAuth2 client credentials. Filters unread emails by subject pattern. Marks as read after fetch.
Parser Extracts the encrypted block between BEGIN/END markers. AES-128-CBC + PKCS7 + base64. Falls back to regex parsing for hand-edited emails.
Mapper 92 scalar fields + 11 arrays → SQLAlchemy model instances. DMS → decimal coords, datetime parsing, type coercion.
Writer Upsert Vessel by IMO, upsert Voyage by (vessel, voyage_number), delete-then-insert Report by (vessel, report_type, report_datetime_utc) with CASCADE.
Designed for
- Scheduled invocation — typically every 5–15 minutes via cron or a serverless scheduler
- Idempotent reads — marks emails as read after fetch, so a re-run won’t re-process
- Safe re-submissions — corrections replace previous rows by design (CASCADE delete + insert)
- Single mailbox per fleet — one shared Outlook mailbox is the input; the script loops every unread email
Not designed for
- High volume (>10,000 emails/run) — sequential processing, single thread
- Multi-tenant — one Azure app registration → one mailbox → one Postgres database
- Real-time delivery — there’s a polling delay between Submit and ingest (driven by your cron cadence)
See also
- Installation — venv, dependencies, Cloud SQL setup
- Running —
python -m src.mainflags and modes - ETL stages — fetcher / parser / mapper / writer in detail
- Data model — the 17 SQLAlchemy tables
- Bootstrap scripts —
excel_to_history.pyandgenerate_metaweave_test_history.py(inhistory-tools/) - Configuration — environment variables