#!/usr/bin/env python3 """ Re-orders a hierarchical CSV and rewrites it for Azure DevOps Boards import. Adds: • Rich INFO-level logging of every major step • A sanity-check ensuring the output is 1) complete - same number of rows, no extras or losses 2) correct - every non-hierarchy cell identical to the input 3) functional - has required columns for ADO import, in the right order """ from __future__ import annotations import argparse import hashlib import logging from collections import defaultdict import pandas as pd def _hash_row(row: pd.Series) -> str: """Stable hash of a Series - used to compare multi-column equality regardless of order.""" txt = "||".join(str(v) for v in row.tolist()) return hashlib.md5(txt.encode("utf-8")).hexdigest() def build_level_map(df: pd.DataFrame) -> dict[int, int]: log = logging.getLogger("hierarchy") level_cache: dict[int, int] = {} def level_of(_id: int) -> int: if _id in level_cache: return level_cache[_id] parent = df.loc[df["ID"] == _id, "Parent"].iloc[0] if pd.isna(parent): level_cache[_id] = 1 else: level_cache[_id] = 1 + level_of(int(parent)) return level_cache[_id] for _id in df["ID"]: level_of(int(_id)) log.info("Calculated depth for %d items", len(level_cache)) return level_cache def depth_first_order(df: pd.DataFrame) -> list[int]: children: defaultdict[int, list[int]] = defaultdict(list) for _, row in df.iterrows(): if not pd.isna(row["Parent"]): children[int(row["Parent"])].append(int(row["ID"])) ordered: list[int] = [] def visit(node_id: int) -> None: ordered.append(node_id) for child_id in children.get(node_id, []): visit(child_id) for root_id in df[pd.isna(df["Parent"])]["ID"]: visit(int(root_id)) logging.getLogger("hierarchy").info("Produced depth-first order of %d IDs", len(ordered)) return ordered def restructure(df: pd.DataFrame) -> pd.DataFrame: level_map = build_level_map(df) ordered_ids = depth_first_order(df) df_sorted = df.set_index("ID").loc[ordered_ids].reset_index() df_sorted["_level"] = df_sorted["ID"].map(level_map) # New ADO-style columns df_sorted.insert(0, "ID_new", "") df_sorted["Title 1"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 1 else "", axis=1) df_sorted["Title 2"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 2 else "", axis=1) df_sorted["Title 3"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] >= 3 else "", axis=1) cols_to_drop = {"ID", "Parent", "Title", "_level"} other_cols = [c for c in df_sorted.columns if c not in cols_to_drop | {"ID_new", "Title 1", "Title 2", "Title 3"}] final_df = df_sorted[["ID_new", "Title 1", "Title 2", "Title 3", *other_cols]] final_df = final_df.rename(columns={"ID_new": "ID"}) logging.getLogger("transform").info("Restructured to %d columns (%s)", len(final_df.columns), ", ".join(final_df.columns)) return final_df, other_cols def sanity_check(df_in: pd.DataFrame, df_out: pd.DataFrame, other_cols: list[str]) -> None: log = logging.getLogger("check") # 1) complete ────────────────────────────────────────────────────────────── if len(df_in) != len(df_out): raise ValueError(f"Row count mismatch – input:{len(df_in)} vs output:{len(df_out)}") log.info("Completeness ✔ %d rows", len(df_in)) # If there are no extra data-bearing columns, skip the correctness test if not other_cols: log.info("Correctness ✔ (no non-hierarchy columns present)") else: # 2) correct ─────────────────────────────────────────────────────────── in_hashes = df_in[other_cols].apply(_hash_row, axis=1).value_counts().sort_index() out_hashes = df_out[other_cols].apply(_hash_row, axis=1).value_counts().sort_index() if not in_hashes.equals(out_hashes): diff = (in_hashes - out_hashes).replace(0, pd.NA).dropna() raise ValueError(f"Data mismatch detected in {len(diff)} row(s) – hashes differ") log.info("Correctness ✔ all %d non-hierarchy cells identical", len(df_in) * len(other_cols)) # 3) functional ─────────────────────────────────────────────────────────── required_cols = ["ID", "Title 1", "Title 2", "Title 3"] if df_out.columns.tolist()[:4] != required_cols: raise ValueError("Required ADO columns missing or out of order") if not df_out["ID"].eq("").all(): raise ValueError("The first column 'ID' must be empty for ADO import") log.info("Functional ✔ output format matches Azure DevOps Boards import requirements") def main() -> None: argp = argparse.ArgumentParser(description="Re-order hierarchical CSV for ADO Boards") argp.add_argument("input_csv") argp.add_argument("output_csv") argp.add_argument("-v", "--verbose", action="store_true", help="log INFO messages") args = argp.parse_args() logging.basicConfig( level=logging.INFO if args.verbose else logging.WARNING, format="%(levelname)s %(name)s: %(message)s", ) logging.info("Reading %s …", args.input_csv) df_in = pd.read_csv(args.input_csv) df_out, other_cols = restructure(df_in) sanity_check(df_in, df_out, other_cols) df_out.to_csv(args.output_csv, index=False) logging.info("Success - wrote %d rows to %s", len(df_out), args.output_csv) if __name__ == "__main__": main()