DEV Community

ByteLedger
ByteLedger

Posted on

Validate Records in OpenSearch from a CSV (with Logging and Health Checks)

πŸ“Œ Overview

When working with data pipelines or audit tasks, we often need to verify that certain records exist in our OpenSearch cluster.

This post walks through a Python script that:

  • Reads (recordId, recordDate) pairs from a CSV file
  • Checks if each record exists in OpenSearch
  • Logs results with a timestamped log file
  • Outputs a results CSV for reporting
  • Performs a fast _cluster/health check before running queries

πŸ—‚ Folder Structure

opensearch-checks/
β”œβ”€ .env
β”œβ”€ inputs/
β”‚ └─ records.csv
β”œβ”€ logs/ # auto-created
β”œβ”€ outputs/ # auto-created
└─ check_records.py

Key features:

Strict config from .env β€” no defaults in code
Pre-check: / _cluster/health endpoint
Exact match on recordId.keyword
Date range match for the full day
Logs + CSV output for easy tracking

πŸš€ How to Run
pip install python-dotenv
python check_records.py

inputs/records.csv (example)

recordId,recordDate 00000000-0000-0000-0000-000000000001,2025-07-31 00000000-0000-0000-0000-000000000002,2025-07-31 
Enter fullscreen mode Exit fullscreen mode

.env (required)

OPENSEARCH_URL=https://your-opensearch-host:9200 OPENSEARCH_USER=your-username OPENSEARCH_PASS=your-password INDEX_PATTERN=my_index_pattern_2025*/_search INPUT_CSV=./inputs/records.csv LOG_DIR=./logs OUTPUT_DIR=./outputs 
Enter fullscreen mode Exit fullscreen mode

The script: check_records.py

""" OpenSearch existence checks for (recordId, recordDate) pairs with a fast _cluster/health pre-check. What it does ------------ 1) Loads configuration strictly from a .env file (no hard-coded defaults). 2) Pre-checks OpenSearch availability via GET /_cluster/health (fails fast if unreachable or red). 3) Reads an input CSV with headers: recordId, recordDate (YYYY-MM-DD). 4) For each row, runs an existence query: - exact term match on recordId.keyword - range match on recordDate for the full day (00:00:00.000 -> 23:59:59.999) 5) Produces: - timestamped log file (PASS/FAIL per row) - timestamped results CSV (recordId, recordDate, status) Security note ------------- SSL verification is disabled here for convenience in internal/test setups. For production, enable certificate verification and use a trusted CA/cert chain. """ from __future__ import annotations import csv import json import logging import os import ssl from base64 import b64encode from dataclasses import dataclass from datetime import datetime from typing import Dict, Any from urllib.request import Request, urlopen, URLError, HTTPError from dotenv import load_dotenv # ---------------- Configuration ---------------- @dataclass(frozen=True) class Config: """Strongly-typed configuration loaded from environment.""" url: str user: str password: str index_pattern: str # e.g., "my_index_pattern_2025*/_search" input_csv: str # e.g., "./inputs/records.csv" log_dir: str # e.g., "./logs" output_dir: str # e.g., "./outputs" @staticmethod def from_env() -> "Config": load_dotenv() # only reads .env / process env; no defaults url = os.getenv("OPENSEARCH_URL") user = os.getenv("OPENSEARCH_USER") password = os.getenv("OPENSEARCH_PASS") index = os.getenv("INDEX_PATTERN") input_csv = os.getenv("INPUT_CSV") log_dir = os.getenv("LOG_DIR") output_dir = os.getenv("OUTPUT_DIR") missing = [k for k, v in { "OPENSEARCH_URL": url, "OPENSEARCH_USER": user, "OPENSEARCH_PASS": password, "INDEX_PATTERN": index, "INPUT_CSV": input_csv, "LOG_DIR": log_dir, "OUTPUT_DIR": output_dir, }.items() if not v] if missing: raise ValueError(f"Missing required variables in .env: {', '.join(missing)}") return Config(url, user, password, index, input_csv, log_dir, output_dir) # ---------------- Minimal OpenSearch client ---------------- class OpenSearchClient: """Tiny OpenSearch client using urllib (no external HTTP deps).""" def __init__(self, base_url: str, user: str, password: str, verify_ssl: bool = False) -> None: self.base_url = base_url.rstrip("/") self.auth_header = { "Authorization": "Basic " + b64encode(f"{user}:{password}".encode()).decode(), "Content-Type": "application/json", } self.ctx = ssl.create_default_context() if not verify_ssl: self.ctx.check_hostname = False self.ctx.verify_mode = ssl.CERT_NONE def get_json(self, endpoint: str) -> Dict[str, Any]: url = f"{self.base_url}/{endpoint.lstrip('/')}" req = Request(url, method="GET") # Only Authorization header on GET (no need for Content-Type) req.add_header("Authorization", self.auth_header["Authorization"]) with urlopen(req, context=self.ctx) as resp: payload = json.loads(resp.read().decode("utf-8")) logging.info("GET %s -> %s", endpoint, resp.status) return payload def post_json(self, endpoint: str, body: Dict[str, Any]) -> Dict[str, Any]: url = f"{self.base_url}/{endpoint.lstrip('/')}" req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST") for k, v in self.auth_header.items(): req.add_header(k, v) with urlopen(req, context=self.ctx) as resp: payload = json.loads(resp.read().decode("utf-8")) logging.info("POST %s -> %s", endpoint, resp.status) return payload # ---------------- Health pre-check ---------------- def assert_cluster_healthy(client: OpenSearchClient) -> None: """ Fail fast if OpenSearch is not reachable or reports 'red' status. Accepts 'green' and 'yellow' as pass for read-only checks. """ try: health = client.get_json("_cluster/health") status = (health.get("status") or "").lower() if status not in {"green", "yellow"}: raise RuntimeError(f"Cluster health is '{status}', expected green/yellow.") logging.info("Cluster health OK: %s", status) except (HTTPError, URLError) as e: raise RuntimeError(f"Cluster health check failed: {e}") from e # ---------------- Existence query ---------------- def record_exists(client: OpenSearchClient, index_pattern: str, record_id: str, record_date: str) -> bool: """ Return True if any document exists for: - recordId.keyword == record_id - recordDate between 'record_date 00:00:00.000' and 'record_date 23:59:59.999' """ start = f"{record_date} 00:00:00.000" end = f"{record_date} 23:59:59.999" query = { "query": { "bool": { "must": [ {"term": {"recordId.keyword": record_id}}, {"range": {"recordDate": {"gte": start, "lte": end}}}, ] } }, "size": 0 } try: res = client.post_json(index_pattern, query) total = res.get("hits", {}).get("total", {}) value = total.get("value", total if isinstance(total, int) else 0) return value > 0 except Exception as e: logging.error("Query failed for recordId=%s date=%s: %s", record_id, record_date, e) return False # ---------------- Main ---------------- def main() -> None: cfg = Config.from_env() # Prepare paths + logging os.makedirs(cfg.log_dir, exist_ok=True) os.makedirs(cfg.output_dir, exist_ok=True) ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_file = os.path.join(cfg.log_dir, f"record_check_{ts}.log") out_file = os.path.join(cfg.output_dir, f"record_results_{ts}.csv") logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Initialize client client = OpenSearchClient(cfg.url, cfg.user, cfg.password, verify_ssl=False) # Pre-flight: cluster health assert_cluster_healthy(client) # Process CSV -> Results CSV with open(cfg.input_csv, newline="", encoding="utf-8") as fin, \ open(out_file, "w", newline="", encoding="utf-8") as fout: reader = csv.DictReader(fin) required_headers = {"recordId", "recordDate"} if not reader.fieldnames or required_headers - set(reader.fieldnames): raise ValueError(f"CSV must have headers: recordId,recordDate (got {reader.fieldnames})") writer = csv.writer(fout) writer.writerow(["recordId", "recordDate", "status"]) for row in reader: rec_id = (row.get("recordId") or "").strip() date = (row.get("recordDate") or "").strip() if not rec_id or not date: logging.warning("Skipping row with missing values: %s", row) continue ok = record_exists(client, cfg.index_pattern, rec_id, date) status = "PASS" if ok else "FAIL" writer.writerow([rec_id, date, status]) (logging.info if ok else logging.warning)("%s: recordId=%s date=%s", status, rec_id, date) print(f"Done.\nLog: {log_file}\nResults: {out_file}") if __name__ == "__main__": main() 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)