141 lines
5.8 KiB
Python
141 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Re-orders a hierarchical CSV and rewrites it for Azure DevOps Boards import.
|
||
|
||
Adds:
|
||
• Rich INFO-level logging of every major step
|
||
• A sanity-check ensuring the output is
|
||
1) complete - same number of rows, no extras or losses
|
||
2) correct - every non-hierarchy cell identical to the input
|
||
3) functional - has required columns for ADO import, in the right order
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import logging
|
||
from collections import defaultdict
|
||
import pandas as pd
|
||
|
||
def _hash_row(row: pd.Series) -> str:
|
||
"""Stable hash of a Series - used to compare multi-column equality regardless of order."""
|
||
txt = "||".join(str(v) for v in row.tolist())
|
||
return hashlib.md5(txt.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def build_level_map(df: pd.DataFrame) -> dict[int, int]:
|
||
log = logging.getLogger("hierarchy")
|
||
level_cache: dict[int, int] = {}
|
||
|
||
def level_of(_id: int) -> int:
|
||
if _id in level_cache:
|
||
return level_cache[_id]
|
||
parent = df.loc[df["ID"] == _id, "Parent"].iloc[0]
|
||
if pd.isna(parent):
|
||
level_cache[_id] = 1
|
||
else:
|
||
level_cache[_id] = 1 + level_of(int(parent))
|
||
return level_cache[_id]
|
||
|
||
for _id in df["ID"]:
|
||
level_of(int(_id))
|
||
log.info("Calculated depth for %d items", len(level_cache))
|
||
return level_cache
|
||
|
||
|
||
def depth_first_order(df: pd.DataFrame) -> list[int]:
|
||
children: defaultdict[int, list[int]] = defaultdict(list)
|
||
for _, row in df.iterrows():
|
||
if not pd.isna(row["Parent"]):
|
||
children[int(row["Parent"])].append(int(row["ID"]))
|
||
|
||
ordered: list[int] = []
|
||
|
||
def visit(node_id: int) -> None:
|
||
ordered.append(node_id)
|
||
for child_id in children.get(node_id, []):
|
||
visit(child_id)
|
||
|
||
for root_id in df[pd.isna(df["Parent"])]["ID"]:
|
||
visit(int(root_id))
|
||
logging.getLogger("hierarchy").info("Produced depth-first order of %d IDs", len(ordered))
|
||
return ordered
|
||
|
||
|
||
def restructure(df: pd.DataFrame) -> pd.DataFrame:
|
||
level_map = build_level_map(df)
|
||
ordered_ids = depth_first_order(df)
|
||
|
||
df_sorted = df.set_index("ID").loc[ordered_ids].reset_index()
|
||
df_sorted["_level"] = df_sorted["ID"].map(level_map)
|
||
|
||
# New ADO-style columns
|
||
df_sorted.insert(0, "ID_new", "")
|
||
df_sorted["Title 1"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 1 else "", axis=1)
|
||
df_sorted["Title 2"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] == 2 else "", axis=1)
|
||
df_sorted["Title 3"] = df_sorted.apply(lambda r: r["Title"] if r["_level"] >= 3 else "", axis=1)
|
||
|
||
cols_to_drop = {"ID", "Parent", "Title", "_level"}
|
||
other_cols = [c for c in df_sorted.columns if c not in cols_to_drop | {"ID_new", "Title 1", "Title 2", "Title 3"}]
|
||
|
||
final_df = df_sorted[["ID_new", "Title 1", "Title 2", "Title 3", *other_cols]]
|
||
final_df = final_df.rename(columns={"ID_new": "ID"})
|
||
logging.getLogger("transform").info("Restructured to %d columns (%s)", len(final_df.columns), ", ".join(final_df.columns))
|
||
return final_df, other_cols
|
||
|
||
|
||
def sanity_check(df_in: pd.DataFrame, df_out: pd.DataFrame, other_cols: list[str]) -> None:
|
||
log = logging.getLogger("check")
|
||
|
||
# 1) complete ──────────────────────────────────────────────────────────────
|
||
if len(df_in) != len(df_out):
|
||
raise ValueError(f"Row count mismatch – input:{len(df_in)} vs output:{len(df_out)}")
|
||
log.info("Completeness ✔ %d rows", len(df_in))
|
||
|
||
# If there are no extra data-bearing columns, skip the correctness test
|
||
if not other_cols:
|
||
log.info("Correctness ✔ (no non-hierarchy columns present)")
|
||
else:
|
||
# 2) correct ───────────────────────────────────────────────────────────
|
||
in_hashes = df_in[other_cols].apply(_hash_row, axis=1).value_counts().sort_index()
|
||
out_hashes = df_out[other_cols].apply(_hash_row, axis=1).value_counts().sort_index()
|
||
|
||
if not in_hashes.equals(out_hashes):
|
||
diff = (in_hashes - out_hashes).replace(0, pd.NA).dropna()
|
||
raise ValueError(f"Data mismatch detected in {len(diff)} row(s) – hashes differ")
|
||
log.info("Correctness ✔ all %d non-hierarchy cells identical", len(df_in) * len(other_cols))
|
||
|
||
# 3) functional ───────────────────────────────────────────────────────────
|
||
required_cols = ["ID", "Title 1", "Title 2", "Title 3"]
|
||
if df_out.columns.tolist()[:4] != required_cols:
|
||
raise ValueError("Required ADO columns missing or out of order")
|
||
if not df_out["ID"].eq("").all():
|
||
raise ValueError("The first column 'ID' must be empty for ADO import")
|
||
|
||
log.info("Functional ✔ output format matches Azure DevOps Boards import requirements")
|
||
|
||
def main() -> None:
|
||
argp = argparse.ArgumentParser(description="Re-order hierarchical CSV for ADO Boards")
|
||
argp.add_argument("input_csv")
|
||
argp.add_argument("output_csv")
|
||
argp.add_argument("-v", "--verbose", action="store_true", help="log INFO messages")
|
||
args = argp.parse_args()
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO if args.verbose else logging.WARNING,
|
||
format="%(levelname)s %(name)s: %(message)s",
|
||
)
|
||
|
||
logging.info("Reading %s …", args.input_csv)
|
||
df_in = pd.read_csv(args.input_csv)
|
||
|
||
df_out, other_cols = restructure(df_in)
|
||
sanity_check(df_in, df_out, other_cols)
|
||
|
||
df_out.to_csv(args.output_csv, index=False)
|
||
logging.info("Success - wrote %d rows to %s", len(df_out), args.output_csv)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|