app-python-azdoworkitemcsv/reorder_requirements.py

142 lines
5.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Re-orders a hierarchical CSV and rewrites it for Azure DevOps Boards import.
Adds:
Rich INFO-level logging of every major step
A sanity-check ensuring the output is
1) complete - same number of rows, no extras or losses
2) correct - every non-hierarchy cell identical to the input
3) functional - has required columns for ADO import, in the right order
"""
from __future__ import annotations
import argparse
import hashlib
import logging
from collections import defaultdict
import pandas as pd
def _hash_row(row: pd.Series) -> str:
"""Stable hash of a Series - used to compare multi-column equality regardless of order."""
txt = "||".join(str(v) for v in row.tolist())
return hashlib.md5(txt.encode("utf-8")).hexdigest()
def build_level_map(df: pd.DataFrame) -> dict[int, int]:
log = logging.getLogger("hierarchy")
level_cache: dict[int, int] = {}
def level_of(_id: int) -> int:
if _id in level_cache:
return level_cache[_id]
parent = df.loc[df["ID"] == _id, "Parent"].iloc[0]
if pd.isna(parent):
level_cache[_id] = 1
else:
level_cache[_id] = 1 + level_of(int(parent))
return level_cache[_id]
for _id in df["ID"]:
level_of(int(_id))
log.info("Calculated depth for %d items", len(level_cache))
return level_cache
def depth_first_order(df: pd.DataFrame) -> list[int]:
children: defaultdict[int, list[int]] = defaultdict(list)
for _, row in df.iterrows():
if not pd.isna(row["Parent"]):
children[int(row["Parent"])].append(int(row["ID"]))
ordered: list[int] = []
def visit(node_id: int) -> None:
ordered.append(node_id)
for child_id in children.get(node_id, []):
visit(child_id)
for root_id in df[pd.isna(df["Parent"])]["ID"]:
visit(int(root_id))
logging.getLogger("hierarchy").info("Produced depth-first order of %d IDs", len(ordered))
return ordered
def restructure(df: pd.DataFrame) -> pd.DataFrame:
level_map = build_level_map(df)
ordered_ids = depth_first_order(df)
df_sorted = df.set_index("ID").loc[ordered_ids].reset_index()
df_sorted["_level"] = df_sorted["ID"].map(level_map)
# New ADO-style columns
df_sorted.insert(0, "ID_new", "")
df_sorted["Title 1"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 1 else "", axis=1)
df_sorted["Title 2"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 2 else "", axis=1)
df_sorted["Title 3"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] >= 3 else "", axis=1)
cols_to_drop = {"ID", "Parent", "Title", "_level"}
other_cols = [c for c in df_sorted.columns if c not in cols_to_drop | {"ID_new", "Title 1", "Title 2", "Title 3"}]
final_df = df_sorted[["ID_new", "Title 1", "Title 2", "Title 3", *other_cols]]
final_df = final_df.rename(columns={"ID_new": "ID"})
logging.getLogger("transform").info("Restructured to %d columns (%s)", len(final_df.columns), ", ".join(final_df.columns))
return final_df, other_cols
def sanity_check(df_in: pd.DataFrame, df_out: pd.DataFrame, other_cols: list[str]) -> None:
log = logging.getLogger("check")
# 1) complete ──────────────────────────────────────────────────────────────
if len(df_in) != len(df_out):
raise ValueError(f"Row count mismatch input:{len(df_in)} vs output:{len(df_out)}")
log.info("Completeness ✔ %d rows", len(df_in))
# If there are no extra data-bearing columns, skip the correctness test
if not other_cols:
log.info("Correctness ✔ (no non-hierarchy columns present)")
else:
# 2) correct ───────────────────────────────────────────────────────────
in_hashes = df_in[other_cols].apply(_hash_row, axis=1).value_counts().sort_index()
out_hashes = df_out[other_cols].apply(_hash_row, axis=1).value_counts().sort_index()
if not in_hashes.equals(out_hashes):
diff = (in_hashes - out_hashes).replace(0, pd.NA).dropna()
raise ValueError(f"Data mismatch detected in {len(diff)} row(s) hashes differ")
log.info("Correctness ✔ all %d non-hierarchy cells identical", len(df_in) * len(other_cols))
# 3) functional ───────────────────────────────────────────────────────────
required_cols = ["ID", "Title 1", "Title 2", "Title 3"]
if df_out.columns.tolist()[:4] != required_cols:
raise ValueError("Required ADO columns missing or out of order")
if not df_out["ID"].eq("").all():
raise ValueError("The first column 'ID' must be empty for ADO import")
log.info("Functional ✔ output format matches Azure DevOps Boards import requirements")
def main() -> None:
argp = argparse.ArgumentParser(description="Re-order hierarchical CSV for ADO Boards")
argp.add_argument("input_csv")
argp.add_argument("output_csv")
argp.add_argument("-v", "--verbose", action="store_true", help="log INFO messages")
args = argp.parse_args()
logging.basicConfig(
level=logging.INFO if args.verbose else logging.WARNING,
format="%(levelname)s %(name)s: %(message)s",
)
logging.info("Reading %s", args.input_csv)
df_in = pd.read_csv(args.input_csv)
df_out, other_cols = restructure(df_in)
sanity_check(df_in, df_out, other_cols)
df_out.to_csv(args.output_csv, index=False)
logging.info("Success - wrote %d rows to %s", len(df_out), args.output_csv)
if __name__ == "__main__":
main()