ETL stages
Four stages in sequence. Each is a Python module with a clear input/output contract.
Fetcher
File: src/fetcher.py
Purpose: Pull unread Metaweave Forms: emails from a shared Outlook mailbox via Microsoft Graph.
Auth
OAuth2 client credentials flow via MSAL. The app needs Mail.Read + Mail.ReadWrite application permissions on the target mailbox (consented by an Azure AD admin).
get_access_token() → MSAL ConfidentialClientApplication( client_id=AZURE_CLIENT_ID, client_credential=AZURE_CLIENT_SECRET, authority=f"https://login.microsoftonline.com/{AZURE_TENANT_ID}" ).acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])Query
Filters emails server-side via OData:
GET https://graph.microsoft.com/v1.0/users/{OUTLOOK_USER_EMAIL}/messages ?$filter=isRead eq false and contains(subject, 'Metaweave Forms') &$select=id,subject,body,receivedDateTime &$orderby=receivedDateTime asc &$top=50$top=50 per call — extend with pagination if your fleet exceeds 50 unread/run.
Subject parsing
Each subject is parsed against a strict regex (config.py):
SUBJECT_PATTERN = re.compile( r"Metaweave Forms:\s*(.+?)\s*-\s*(.+?)\s*-\s*(\d{2}\.\d{2}\.\d{4})")Captures (vessel_name, report_type_raw, report_date_DD.MM.YYYY). Subjects that don’t match are skipped — this is the gate that excludes stray emails from the same mailbox.
Body extraction
If the body’s contentType is html, fetcher strips HTML tags before passing to the parser. This handles forwarded plain-text wrapped in HTML by Outlook.
Mark as read
PATCH /messages/{id} with {"isRead": true} after successful processing. Failed messages stay unread for retry on the next run.
Output
A list of FetchedEmail dataclasses:
@dataclassclass FetchedEmail: message_id: str subject: str body_text: str received_datetime: datetime vessel_name: str report_type_raw: str # "Noon Report", "Arrival Notice", etc. report_date: dateParser
File: src/parser.py
Purpose: Extract the encrypted payload from the email body, decrypt it to a JSON dict.
Header extraction
Pulls report_type_raw and form_version from a header line in the body:
re.search(r"Report:\s*(.+?)\s+(v[\d.]+)", body)# ("Noon Report", "v00.01.02")Marker extraction
Locates the encrypted block:
---------- BEGIN MW FORM DATA ---------------<base64 ciphertext>------------- END MW FORM DATA ----------------Markers are configured in config.py:
MARKER_BEGIN = "BEGIN MW FORM DATA"MARKER_END = "END MW FORM DATA"Decryption
def decrypt_payload(b64_ciphertext: str, key: str) -> dict: raw = base64.b64decode(b64_ciphertext) cipher = AES.new(key.encode(), AES.MODE_CBC, key.encode()) # IV = key (16 bytes) padded = cipher.decrypt(raw) plain = unpad(padded, AES.block_size) return json.loads(plain.decode())- AES-128-CBC (16-byte key)
- IV = key (matching the form’s CryptoJS encryption)
- PKCS7 padding
- Library:
pycryptodome
Text fallback
If markers are missing (e.g. crew hand-edited and broke the block), parser falls back to a regex-based key-value extractor that reads section headers (---Section---) and key: value lines, flattening to a dict with keys like "Section::Key". Coverage is partial — most fields will arrive but rich nested arrays won’t.
Output
@dataclassclass ParseResult: form_data: dict # the decrypted (or fallback) payload report_type_raw: str # from the header line form_version: strMapper
File: src/mapper.py
Purpose: Translate the form’s JSON payload into SQLAlchemy model instances ready for the writer.
What it produces
{ "vessel_info": {"imo": ..., "name": ..., "code": ...}, "voyage_number": "V31", "report": Report(...), # 92 scalar fields "events": [ReportEvent(...), ...], # at-sea + in-port, with nested fuel breakdown "bunker_rob": [ReportBunkerRob(...), ...],# per fuel type, per context "upcoming_ports": [ReportUpcomingPort(...), ...], "fowe_periods": [ReportFowePeriod(...), ...], "scrubber_breakdowns":[ReportScrubberBreakdown(...), ...], "bunker_deliveries": [BunkerDelivery(...), ...], "bunker_biofuels": [BunkerBiofuel(...), ...], "sof_activities": [SofActivity(...), ...], "cargo_details": [ReportCargo(...), ...], "month_end_bunker": [MonthEndBunkerReport(...), ...], "berthing": [BerthingDetails(...), ...],}Key transformations
| From form | To DB | Helper |
|---|---|---|
"6 1' 54\" N" | 6.0317 (decimal degrees) | dms_to_decimal() |
"13.04.2026 12:00:00 +03:00" | datetime(2026, 4, 13, 9, 0, tzinfo=UTC) | parse_report_datetime() |
"Yes" / "No" / "True" / "1" | True / False | parse_bool() |
"123.45" (string) | Decimal("123.45") | safe_decimal() (returns None on failure) |
"42" (string) | 42 | safe_int() |
All helpers in src/utils/datetime_utils.py and src/utils/coordinates.py return None on failure rather than raising — bad data becomes NULL, the run continues.
Per-event nested fuel
Each event has a nested fuel array with 12 consumption categories per fuel type:
propulsion · maneuver · generator · loaddischarge · deballast · igs ·boiler · incinerator · cargoheating · tankcleaning · others · flushingPlus subtotals: main_engine_consumption, aux_engine_consumption, total_consumption. These map to EventFuelConsumption rows attached to each ReportEvent.
Context routing
The mapper reads two array names depending on location (At Sea / In Port):
| Location | Array names |
|---|---|
| At Sea | atseaeventrobdetails, atseabunkerrobdetails, gsatseaeventtypes |
| In Port | inporteventrobdetails, inportbunkerrobdetails, gsinporteventtypes |
Both flow into the same DB tables; a context column tags each row.
Writer
File: src/writer.py
Purpose: Upsert into PostgreSQL.
Upsert sequence
def write_report(session, mapped, email_message_id): vessel = upsert_vessel(session, mapped["vessel_info"]) # by imo_number voyage = upsert_voyage(session, vessel.vessel_id, mapped["voyage_number"])
# Replacement: same (vessel, type, datetime) → delete old, insert new existing = session.query(Report).filter_by( vessel_id=vessel.vessel_id, report_type=report.report_type, report_datetime_utc=report.report_datetime_utc, ).one_or_none() if existing: session.delete(existing) # CASCADE drops 11 child arrays session.flush()
report = mapped["report"] report.vessel_id = vessel.vessel_id report.voyage_id = voyage.voyage_id report.email_message_id = email_message_id report.received_at = datetime.utcnow() session.add(report) session.flush() # get report_id
# Attach all 11 child arrays with report_id FK for events_list_name, items in mapped.items(): if events_list_name in CHILD_KEYS: for item in items: item.report_id = report.report_id session.add(item)
return reportCASCADE delete
All 11 child tables declare:
ForeignKey("metaweave_report.report_id", ondelete="CASCADE")So DELETE FROM metaweave_report WHERE report_id=… removes the entire family in one statement. This is what makes corrections clean.
Why delete-then-insert (not UPDATE)
Reports have variable-length child arrays. A single UPDATE would have to diff: which events to add, update, delete? Delete-then-insert is simpler and faster for typical sizes (~5 events, ~4 bunker ROBs per report). Atomicity is preserved by the surrounding transaction — session.commit() covers both the delete and the insert.
See also
- Data model — full table list and relationships
- Configuration — env vars
- Running — operational invocation