Wednesday, September 3, 2025

Fund

 Umbrella Fund (Global Opportunities Fund Complex)

 ├── Sub-Fund A: Equity Fund (U.S. & Offshore Master-Feeder)

 │ ├── Feeder A1: U.S. Onshore Feeder (Delaware LP)

 │ ├── Feeder A2: Offshore Feeder (Cayman Company)

 │ └── Master Fund A: Cayman Master Fund

 │ └── Portfolio of Securities (Equities, Options, etc.)

 │

 ├── Sub-Fund B: Bond Fund (Standalone)

 │ ├── Share Class B1: USD Institutional

 │ ├── Share Class B2: EUR Institutional

 │ └── Share Class B3: USD Retail

 │

 └── Sub-Fund C: Multi-Asset Fund (Umbrella with Share Classes only)

       ├── Share Class C1: Accumulating, USD

       ├── Share Class C2: Distributing, EUR

       └── Share Class C3: Distributing, GBP


Friday, June 20, 2025

CICD

 stages:

  - prepare

  - generate_json

  - test_json

  - data_dictionary

  - metadata

  - finalize


default:

  image: python:3.11

  before_script:

    - pip install -r requirements.txt

    - mkdir -p artifacts


variables:

  EXCEL_INPUT: "data/source_metadata.xlsx"

  SAMPLE_JSON: "data/sample_data.json"

  OUTPUT_SCHEMA: "artifacts/generated_schema.json"

  DATA_DICTIONARY: "artifacts/data_dictionary.xlsx"

  STTM_OUTPUT: "artifacts/sttm.xlsx"

  CODE_COLUMNS_OUTPUT: "artifacts/code_columns.xlsx"


# Step 1: Convert Excel to JSON Schema

generate_schema:

  stage: generate_json

  script:

    - python scripts/excel_to_json_schema.py $EXCEL_INPUT $OUTPUT_SCHEMA

  artifacts:

    paths:

      - $OUTPUT_SCHEMA


# Step 2: Validate sample data against generated JSON Schema

test_schema_with_data:

  stage: test_json

  script:

    - python scripts/validate_json_sample.py $OUTPUT_SCHEMA $SAMPLE_JSON

  dependencies:

    - generate_schema


# Step 3: Generate Data Dictionary from JSON Schema

generate_data_dictionary:

  stage: data_dictionary

  script:

    - python scripts/json_to_data_dictionary.py $OUTPUT_SCHEMA $DATA_DICTIONARY

  dependencies:

    - generate_schema

  artifacts:

    paths:

      - $DATA_DICTIONARY


# Step 4: Generate STTM from Data Dictionary

generate_sttm:

  stage: metadata

  script:

    - python scripts/data_dictionary_to_sttm.py $DATA_DICTIONARY $STTM_OUTPUT

  dependencies:

    - generate_data_dictionary

  artifacts:

    paths:

      - $STTM_OUTPUT


# Step 5: Identify code-value columns

identify_code_columns:

  stage: finalize

  script:

    - python scripts/identify_code_columns.py $DATA_DICTIONARY $CODE_COLUMNS_OUTPUT

  dependencies:

    - generate_data_dictionary

  artifacts:

    paths:

      - $CODE_COLUMNS_OUTPUT


Sunday, June 15, 2025

CompareJsonSchemas

 import json

from openpyxl import Workbook

from openpyxl.styles import Font



def load_json_schema(file_path):

    with open(file_path, 'r') as f:

        return json.load(f)



def flatten_schema_properties(schema, path=''):

    props = {}

    if 'properties' in schema:

        for key, value in schema['properties'].items():

            full_path = f"{path}.{key}" if path else key

            props[full_path] = {

                'type': value.get('type'),

                'enum': value.get('enum'),

                'format': value.get('format'),

                'maxLength': value.get('maxLength'),

                'minimum': value.get('minimum'),

                'maximum': value.get('maximum'),

            }

            if value.get('type') == 'object':

                nested = flatten_schema_properties(value, full_path)

                props.update(nested)

    return props



def compare_required_fields(schema1, schema2):

    req1 = set(schema1.get('required', []))

    req2 = set(schema2.get('required', []))


    added = req2 - req1

    removed = req1 - req2


    differences = []

    for field in added:

        differences.append((field, 'REQUIRED_ADDED', '', 'required'))

    for field in removed:

        differences.append((field, 'REQUIRED_REMOVED', 'required', ''))

    return differences



def compare_flattened_properties(flat1, flat2):

    all_keys = set(flat1.keys()) | set(flat2.keys())

    differences = []


    for key in all_keys:

        val1 = flat1.get(key)

        val2 = flat2.get(key)


        if key not in flat1:

            differences.append((key, 'ADDED', '', json.dumps(val2)))

        elif key not in flat2:

            differences.append((key, 'REMOVED', json.dumps(val1), ''))

        elif val1 != val2:

            differences.append((key, 'MODIFIED', json.dumps(val1), json.dumps(val2)))


    return differences



def write_differences_to_excel(differences, output_path):

    wb = Workbook()

    ws = wb.active

    ws.title = "Schema Differences"


    headers = ["Path", "Change Type", "Schema 1", "Schema 2"]

    ws.append(headers)


    for cell in ws[1]:

        cell.font = Font(bold=True)


    for diff in differences:

        ws.append(diff)


    wb.save(output_path)

    print(f"✅ Differences written to {output_path}")



if __name__ == "__main__":

    schema1_path = "schema1.json"

    schema2_path = "schema2.json"

    output_excel = "schema_differences.xlsx"


    schema1 = load_json_schema(schema1_path)

    schema2 = load_json_schema(schema2_path)


    flat1 = flatten_schema_properties(schema1)

    flat2 = flatten_schema_properties(schema2)


    prop_diffs = compare_flattened_properties(flat1, flat2)

    req_diffs = compare_required_fields(schema1, schema2)


    all_diffs = prop_diffs + req_diffs

    write_differences_to_excel(all_diffs, output_excel)


Saturday, May 31, 2025

Field match report with JSON schema

 import json

import pandas as pd

from rapidfuzz import process, fuzz


def flatten_json_schema(schema, parent_key=''):

    fields = []

    for key, value in schema.get('properties', {}).items():

        full_key = f"{parent_key}.{key}" if parent_key else key

        if value.get('type') == 'object' and 'properties' in value:

            fields.extend(flatten_json_schema(value, full_key))

        else:

            fields.append(full_key)

    return fields


def match_fields(source_fields, schema_fields, threshold=85):

    match_results = []

    for field in source_fields:

        if field in schema_fields:

            match_results.append((field, field, 100, 'Exact'))

        else:

            best_match, score, _ = process.extractOne(field, schema_fields, scorer=fuzz.ratio)

            if score >= threshold:

                match_results.append((field, best_match, score, 'Fuzzy'))

            else:

                match_results.append((field, '', score, 'Unmatched'))

    return pd.DataFrame(match_results, columns=['Source Field', 'Matched Field', 'Match Score', 'Match Type'])


# --- Example usage ---

def validate_fields_against_schema(schema_path, excel_path, sheet_name=None, field_column='Field Name', threshold=85):

    with open(schema_path, 'r') as f:

        schema = json.load(f)


    df = pd.read_excel(excel_path, sheet_name=sheet_name)

    if field_column not in df.columns:

        raise ValueError(f"'{field_column}' column not found in Excel.")


    source_fields = df[field_column].dropna().astype(str).tolist()

    schema_fields = flatten_json_schema(schema)


    result_df = match_fields(source_fields, schema_fields, threshold=threshold)

    return result_df


# ---- To run the script standalone ----

if __name__ == "__main__":

    # Update paths here

    schema_file = 'Output_Schema.json'

    excel_file = 'Source_Metadata.xlsx'


    result = validate_fields_against_schema(schema_file, excel_file)

    print(result)

    result.to_excel('Field_Match_Report.xlsx', index=False)


JsonSchemato Excel with Fuzzy Source

 import json

import pandas as pd

from rapidfuzz import process, fuzz


def flatten_json_schema(schema, parent_key=''):

    """Flatten nested JSON schema properties using dot notation"""

    items = {}

    properties = schema.get('properties', {})

    for k, v in properties.items():

        full_key = f"{parent_key}.{k}" if parent_key else k

        if v.get('type') == 'object' and 'properties' in v:

            items.update(flatten_json_schema(v, full_key))

        else:

            items[full_key] = v

    return items


def find_best_match(field_name, candidates, threshold=80):

    """Return best fuzzy match from candidates if above threshold"""

    if not candidates:

        return None, 0

    match, score, _ = process.extractOne(field_name, candidates, scorer=fuzz.ratio)

    if score >= threshold:

        return match, score

    return None, 0


def json_schema_to_excel_with_fuzzy_source(schema_path, source_excel_path, output_excel_path):

    # Load JSON Schema

    with open(schema_path, 'r') as f:

        schema = json.load(f)

    

    # Flatten nested schema

    flat_props = flatten_json_schema(schema)

    

    # Load source Excel metadata, strip columns and strings

    source_df = pd.read_excel(source_excel_path)

    source_df.columns = source_df.columns.str.strip()

    source_df = source_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    

    # Prepare lists for matching

    source_fields = source_df['Field Name'].dropna().unique().tolist() if 'Field Name' in source_df.columns else []

    source_titles = source_df['Title'].dropna().unique().tolist() if 'Title' in source_df.columns else []

    source_aliases = source_df['Alias'].dropna().unique().tolist() if 'Alias' in source_df.columns else []

    

    rows = []

    required_fields = schema.get('required', [])

    

    for json_field, definition in flat_props.items():

        # First try exact match on Field Name

        matched_row = None

        if json_field in source_fields:

            matched_row = source_df[source_df['Field Name'] == json_field].iloc[0]

        else:

            # Try fuzzy match on Field Name

            best_field, score = find_best_match(json_field, source_fields)

            if best_field:

                matched_row = source_df[source_df['Field Name'] == best_field].iloc[0]

            else:

                # Try exact or fuzzy on Title

                best_title, score = find_best_match(json_field, source_titles)

                if best_title:

                    matched_row = source_df[source_df['Title'] == best_title].iloc[0]

                else:

                    # Try exact or fuzzy on Alias

                    best_alias, score = find_best_match(json_field, source_aliases)

                    if best_alias:

                        matched_row = source_df[source_df['Alias'] == best_alias].iloc[0]

        

        # Prepare row with source metadata or empty if no match

        row = {

            "Source File Name": matched_row['Source File Name'] if matched_row is not None and 'Source File Name' in matched_row else '',

            "Source Field Name": matched_row['Field Name'] if matched_row is not None and 'Field Name' in matched_row else '',

            "Source Data Type": matched_row['Source Data Type'] if matched_row is not None and 'Source Data Type' in matched_row else '',

            "Source Scale": matched_row['Source Scale'] if matched_row is not None and 'Source Scale' in matched_row else '',

            "Source PK": matched_row['Source PK'] if matched_row is not None and 'Source PK' in matched_row else '',

            "Source Nullability": matched_row['Source Nullability'] if matched_row is not None and 'Source Nullability' in matched_row else '',

            "Source Data Sample": matched_row['Source Data Sample'] if matched_row is not None and 'Source Data Sample' in matched_row else '',

            # JSON Schema metadata

            "Field Name": json_field,

            "Title": definition.get("title", ""),

            "Description": definition.get("description", ""),

            "Type": definition.get("type", ""),

            "MaxLength": definition.get("maxLength", ""),

            "Enum": ", ".join(definition.get("enum", [])) if definition.get("enum") else "",

            "Required": json_field in required_fields

        }

        rows.append(row)

    

    # Export to Excel

    df_out = pd.DataFrame(rows)

    df_out.to_excel(output_excel_path, index=False)

    print(f"Metadata Excel generated with fuzzy matching at: {output_excel_path}")


# Example usage

# json_schema_to_excel_with_fuzzy_source('Output_Schema.json', 'source_metadata.xlsx', 'merged_metadata.xlsx')


Excel to JSON Schema with Primary Key Indicator

 import pandas as pd

import json

from collections import defaultdict


def excel_to_json_schema_with_primary_key(excel_path, output_schema_path="Output_Schema.json"):

    df = pd.read_excel(excel_path)


    # Fill NaNs

    df = df.fillna("")


    # Normalize keys

    def normalize(s):

        return s.strip() if isinstance(s, str) else s


    df.columns = [normalize(col) for col in df.columns]


    root = {

        "type": "object",

        "properties": {},

        "required": []

    }


    nested_objects = defaultdict(dict)


    for _, row in df.iterrows():

        column_name = row.get("Column Name", "").strip()

        if not column_name:

            continue


        parts = column_name.split(".")

        group = row.get("Group", "").strip()

        current = root

        props = current["properties"]


        # Traverse/create nested groups

        for i, part in enumerate(parts):

            if i == len(parts) - 1:

                # Final leaf

                schema_prop = {

                    "type": row.get("Type", "").strip().lower()

                }


                if row.get("Title"):

                    schema_prop["title"] = row["Title"]


                if row.get("Description"):

                    schema_prop["description"] = row["Description"]


                if row.get("MaxLength") and schema_prop["type"] == "string":

                    schema_prop["maxLength"] = int(row["MaxLength"])


                if row.get("Enum"):

                    schema_prop["enum"] = [e.strip() for e in str(row["Enum"]).split(",")]


                if str(row.get("Primary Key", "")).strip().lower() == "yes":

                    schema_prop["primaryKey"] = True


                # Add required field

                if str(row.get("Required", "")).strip().lower() == "yes":

                    if "required" not in current:

                        current["required"] = []

                    current["required"].append(part)


                props[part] = schema_prop


            else:

                # Intermediate nested object

                if part not in props:

                    props[part] = {

                        "type": "object",

                        "properties": {}

                    }

                current = props[part]

                props = current["properties"]


    with open(output_schema_path, "w") as f:

        json.dump(root, f, indent=2)


    print(f"✅ JSON Schema written to {output_schema_path}")


if __name__ == "__main__":

    excel_to_json_schema_with_primary_key("schema_metadata.xlsx")


JSON Schema to Excel Metadata with PK infor as well

 import json

import pandas as pd


def flatten_schema(schema, group="", parent_key=""):

    """Recursively flatten the schema to extract column metadata."""

    rows = []

    properties = schema.get("properties", {})

    required = schema.get("required", [])


    for prop, details in properties.items():

        full_key = f"{parent_key}.{prop}" if parent_key else prop

        entry = {

            "Column Name": full_key,

            "Type": details.get("type", ""),

            "Title": details.get("title", ""),

            "Description": details.get("description", ""),

            "MaxLength": details.get("maxLength", ""),

            "Enum": ", ".join(details.get("enum", [])) if "enum" in details else "",

            "Group": group,

            "Primary Key": "Yes" if details.get("primaryKey", False) else "No"

        }


        if details.get("type") == "object":

            nested_group = full_key

            rows += flatten_schema(details, group=nested_group, parent_key=full_key)

        elif details.get("type") == "array" and "items" in details and details["items"].get("type") == "object":

            nested_group = full_key

            rows += flatten_schema(details["items"], group=nested_group, parent_key=full_key)

        else:

            rows.append(entry)


    return rows


def json_schema_to_excel(schema_path, output_excel="schema_metadata.xlsx"):

    with open(schema_path, "r") as f:

        schema = json.load(f)


    rows = flatten_schema(schema)

    df = pd.DataFrame(rows)

    df.to_excel(output_excel, index=False)

    print(f"✅ Metadata written to Excel: {output_excel}")


if __name__ == "__main__":

    json_schema_to_excel("Output_Schema.json")


Sample Primary Key for JSON to POJO

 {

  "properties": {

    "id": {

      "type": "string",

      "primaryKey": true,

      "maxLength": 36

    }

  },

  "required": ["id"]

}


JSON to Jacksons POJO with Primary Key

import json

import os


TYPE_MAPPING = {

    "string": "String",

    "integer": "Integer",

    "number": "Double",

    "boolean": "Boolean",

}


BEAN_VALIDATION_IMPORTS = {

    "NotNull": "javax.validation.constraints.NotNull",

    "Size": "javax.validation.constraints.Size",

}


JPA_IMPORTS = {

    "Id": "javax.persistence.Id",

    "Column": "javax.persistence.Column"

}


def capitalize(s):

    return s[0].upper() + s[1:] if s else s


def generate_class(name, schema, indent=0):

    indent_str = "    " * indent

    lines = []

    imports = set()


    lines.append(f"{indent_str}public class {name} " + "{")


    properties = schema.get("properties", {})

    required = schema.get("required", [])


    for prop, prop_schema in properties.items():

        java_type, nested_imports, nested_classes = get_java_type(prop, prop_schema, indent + 1)

        imports.update(nested_imports)


        # Collect annotations

        annotations = []


        # Jackson annotation

        annotations.append(f'@com.fasterxml.jackson.annotation.JsonProperty("{prop}")')


        # Bean Validation

        if prop in required:

            annotations.append("@NotNull")

            imports.add(BEAN_VALIDATION_IMPORTS["NotNull"])


        max_length = prop_schema.get("maxLength")

        if max_length is not None and java_type == "String":

            annotations.append(f"@Size(max = {max_length})")

            imports.add(BEAN_VALIDATION_IMPORTS["Size"])


        # JPA Primary key

        if prop_schema.get("primaryKey", False):

            annotations.append("@Id")

            imports.add(JPA_IMPORTS["Id"])

            # Optionally @Column annotation for length if string

            col_ann = "@Column("

            col_params = []

            if max_length is not None and java_type == "String":

                col_params.append(f"length = {max_length}")

            if col_params:

                col_ann += ", ".join(col_params)

                col_ann += ")"

                annotations.append(col_ann)

                imports.add(JPA_IMPORTS["Column"])


        # Write annotations

        for ann in annotations:

            lines.append(f"{indent_str}    {ann}")


        lines.extend(nested_classes)

        lines.append(f"{indent_str}    private {java_type} {prop};\n")


    # Getters and setters

    for prop, prop_schema in properties.items():

        java_type, _, _ = get_java_type(prop, prop_schema, indent + 1)

        prop_camel = capitalize(prop)


        # Getter

        lines.append(f"{indent_str}    public {java_type} get{prop_camel}() " + "{")

        lines.append(f"{indent_str}        return {prop};")

        lines.append(f"{indent_str}    }}\n")


        # Setter

        lines.append(f"{indent_str}    public void set{prop_camel}({java_type} {prop}) " + "{")

        lines.append(f"{indent_str}        this.{prop} = {prop};")

        lines.append(f"{indent_str}    }}\n")


    lines.append(f"{indent_str}}}")


    return imports, lines


def get_java_type(prop_name, prop_schema, indent):

    imports = set()

    nested_classes = []


    if "enum" in prop_schema:

        enum_name = capitalize(prop_name)

        enum_lines = generate_enum(enum_name, prop_schema["enum"], indent)

        nested_classes.extend(enum_lines)

        return enum_name, imports, nested_classes


    prop_type = prop_schema.get("type")


    if prop_type == "object":

        class_name = capitalize(prop_name)

        imports_inner, class_lines = generate_class(class_name, prop_schema, indent)

        imports.update(imports_inner)

        nested_classes.extend(class_lines)

        return class_name, imports, nested_classes


    elif prop_type == "array":

        imports.add("java.util.List")

        items = prop_schema.get("items", {})

        item_type, imports_inner, nested_inner = get_java_type(prop_name + "Item", items, indent)

        imports.update(imports_inner)

        nested_classes.extend(nested_inner)

        return f"List<{item_type}>", imports, nested_classes


    else:

        java_type = TYPE_MAPPING.get(prop_type, "Object")

        return java_type, imports, nested_classes


def generate_enum(name, values, indent=1):

    indent_str = "    " * indent

    lines = []

    lines.append(f"{indent_str}public enum {name} " + "{")

    enum_values = [v.upper().replace(" ", "_") for v in values]

    lines.append(f"{indent_str}    " + ", ".join(enum_values) + ";")

    lines.append(f"{indent_str}}}\n")

    return lines


def json_schema_to_pojo(schema_path, output_dir="generated", package="com.example", class_name="RootPojo"):

    with open(schema_path) as f:

        schema = json.load(f)


    os.makedirs(output_dir, exist_ok=True)


    imports, class_lines = generate_class(class_name, schema)


    import_lines = []

    if imports:

        import_lines.append("import " + ";\nimport ".join(sorted(imports)) + ";")


    import_lines.append("import com.fasterxml.jackson.annotation.JsonProperty;")


    java_code = []

    java_code.append(f"package {package};\n")

    java_code.extend(import_lines)

    java_code.append("\n")

    java_code.extend(class_lines)


    java_file_path = os.path.join(output_dir, f"{class_name}.java")

    with open(java_file_path, "w") as f:

        f.write("\n".join(java_code))


    print(f"✅ Java POJO with validation and primary key generated at: {java_file_path}")


if __name__ == "__main__":

    json_schema_to_pojo(

        schema_path="Output_Schema.json",

        output_dir="generated",

        package="com.example.generated",

        class_name="MyPojo"

    )


JSON to POJO with Jackson Notations

 import json

import os


TYPE_MAPPING = {

    "string": "String",

    "integer": "Integer",

    "number": "Double",

    "boolean": "Boolean",

}


def capitalize(s):

    return s[0].upper() + s[1:] if s else s


def generate_class(name, schema, indent=0):

    indent_str = "    " * indent

    lines = []

    imports = set()


    lines.append(f"{indent_str}public class {name} " + "{")


    properties = schema.get("properties", {})

    required = schema.get("required", [])


    # Fields

    for prop, prop_schema in properties.items():

        java_type, nested_imports, nested_classes = get_java_type(prop, prop_schema, indent + 1)

        imports.update(nested_imports)


        # Jackson annotation

        lines.append(f'{indent_str}    @com.fasterxml.jackson.annotation.JsonProperty("{prop}")')

        lines.extend(nested_classes)  # add nested classes if any

        lines.append(f"{indent_str}    private {java_type} {prop};\n")


    # Getters and setters

    for prop, prop_schema in properties.items():

        java_type, _, _ = get_java_type(prop, prop_schema, indent + 1)

        prop_camel = capitalize(prop)


        # Getter

        lines.append(f"{indent_str}    public {java_type} get{prop_camel}() " + "{")

        lines.append(f"{indent_str}        return {prop};")

        lines.append(f"{indent_str}    }}\n")


        # Setter

        lines.append(f"{indent_str}    public void set{prop_camel}({java_type} {prop}) " + "{")

        lines.append(f"{indent_str}        this.{prop} = {prop};")

        lines.append(f"{indent_str}    }}\n")


    lines.append(f"{indent_str}}}")


    return imports, lines


def get_java_type(prop_name, prop_schema, indent):

    """Return (java_type, imports_set, nested_class_lines)"""

    imports = set()

    nested_classes = []


    # Handle enum

    if "enum" in prop_schema:

        enum_name = capitalize(prop_name)

        enum_lines = generate_enum(enum_name, prop_schema["enum"], indent)

        nested_classes.extend(enum_lines)

        return enum_name, imports, nested_classes


    prop_type = prop_schema.get("type")


    if prop_type == "object":

        # Nested class

        class_name = capitalize(prop_name)

        imports_inner, class_lines = generate_class(class_name, prop_schema, indent)

        imports.update(imports_inner)

        nested_classes.extend(class_lines)

        return class_name, imports, nested_classes


    elif prop_type == "array":

        imports.add("java.util.List")

        items = prop_schema.get("items", {})

        item_type, imports_inner, nested_inner = get_java_type(prop_name + "Item", items, indent)

        imports.update(imports_inner)

        nested_classes.extend(nested_inner)

        return f"List<{item_type}>", imports, nested_classes


    else:

        # Simple types

        java_type = TYPE_MAPPING.get(prop_type, "Object")

        return java_type, imports, nested_classes


def generate_enum(name, values, indent=1):

    indent_str = "    " * indent

    lines = []

    lines.append(f"{indent_str}public enum {name} " + "{")

    # Add enum values in uppercase

    enum_values = [v.upper().replace(" ", "_") for v in values]

    lines.append(f"{indent_str}    " + ", ".join(enum_values) + ";")

    lines.append(f"{indent_str}}}\n")

    return lines


def json_schema_to_pojo(schema_path, output_dir="generated", package="com.example", class_name="RootPojo"):

    with open(schema_path) as f:

        schema = json.load(f)


    os.makedirs(output_dir, exist_ok=True)


    imports, class_lines = generate_class(class_name, schema)


    # Prepare import statements

    import_lines = []

    if imports:

        import_lines.append("import " + ";\nimport ".join(sorted(imports)) + ";")


    # Add Jackson imports

    import_lines.append("import com.fasterxml.jackson.annotation.JsonProperty;")


    # Compose full Java file content

    java_code = []

    java_code.append(f"package {package};\n")

    java_code.extend(import_lines)

    java_code.append("\n")

    java_code.extend(class_lines)


    java_file_path = os.path.join(output_dir, f"{class_name}.java")

    with open(java_file_path, "w") as f:

        f.write("\n".join(java_code))


    print(f"✅ Java POJO generated at: {java_file_path}")


if __name__ == "__main__":

    json_schema_to_pojo(

        schema_path="Output_Schema.json",

        output_dir="generated",

        package="com.example.generated",

        class_name="MyPojo"

    )


JSON Schema to advance POJO

 import json

import os


TYPE_MAPPING = {

    "string": "String",

    "integer": "Integer",

    "number": "Double",

    "boolean": "Boolean",

}


def capitalize(s):

    return s[0].upper() + s[1:] if s else s


def generate_class(name, schema, indent=0):

    indent_str = "    " * indent

    lines = []

    imports = set()


    lines.append(f"{indent_str}public class {name} " + "{")


    properties = schema.get("properties", {})

    required = schema.get("required", [])


    # Fields

    for prop, prop_schema in properties.items():

        java_type, nested_imports, nested_classes = get_java_type(prop, prop_schema, indent + 1)

        imports.update(nested_imports)


        # Jackson annotation

        lines.append(f'{indent_str}    @com.fasterxml.jackson.annotation.JsonProperty("{prop}")')

        lines.extend(nested_classes)  # add nested classes if any

        lines.append(f"{indent_str}    private {java_type} {prop};\n")


    # Getters and setters

    for prop, prop_schema in properties.items():

        java_type, _, _ = get_java_type(prop, prop_schema, indent + 1)

        prop_camel = capitalize(prop)


        # Getter

        lines.append(f"{indent_str}    public {java_type} get{prop_camel}() " + "{")

        lines.append(f"{indent_str}        return {prop};")

        lines.append(f"{indent_str}    }}\n")


        # Setter

        lines.append(f"{indent_str}    public void set{prop_camel}({java_type} {prop}) " + "{")

        lines.append(f"{indent_str}        this.{prop} = {prop};")

        lines.append(f"{indent_str}    }}\n")


    lines.append(f"{indent_str}}}")


    return imports, lines


def get_java_type(prop_name, prop_schema, indent):

    """Return (java_type, imports_set, nested_class_lines)"""

    imports = set()

    nested_classes = []


    # Handle enum

    if "enum" in prop_schema:

        enum_name = capitalize(prop_name)

        enum_lines = generate_enum(enum_name, prop_schema["enum"], indent)

        nested_classes.extend(enum_lines)

        return enum_name, imports, nested_classes


    prop_type = prop_schema.get("type")


    if prop_type == "object":

        # Nested class

        class_name = capitalize(prop_name)

        imports_inner, class_lines = generate_class(class_name, prop_schema, indent)

        imports.update(imports_inner)

        nested_classes.extend(class_lines)

        return class_name, imports, nested_classes


    elif prop_type == "array":

        imports.add("java.util.List")

        items = prop_schema.get("items", {})

        item_type, imports_inner, nested_inner = get_java_type(prop_name + "Item", items, indent)

        imports.update(imports_inner)

        nested_classes.extend(nested_inner)

        return f"List<{item_type}>", imports, nested_classes


    else:

        # Simple types

        java_type = TYPE_MAPPING.get(prop_type, "Object")

        return java_type, imports, nested_classes


def generate_enum(name, values, indent=1):

    indent_str = "    " * indent

    lines = []

    lines.append(f"{indent_str}public enum {name} " + "{")

    # Add enum values in uppercase

    enum_values = [v.upper().replace(" ", "_") for v in values]

    lines.append(f"{indent_str}    " + ", ".join(enum_values) + ";")

    lines.append(f"{indent_str}}}\n")

    return lines


def json_schema_to_pojo(schema_path, output_dir="generated", package="com.example", class_name="RootPojo"):

    with open(schema_path) as f:

        schema = json.load(f)


    os.makedirs(output_dir, exist_ok=True)


    imports, class_lines = generate_class(class_name, schema)


    # Prepare import statements

    import_lines = []

    if imports:

        import_lines.append("import " + ";\nimport ".join(sorted(imports)) + ";")


    # Add Jackson imports

    import_lines.append("import com.fasterxml.jackson.annotation.JsonProperty;")


    # Compose full Java file content

    java_code = []

    java_code.append(f"package {package};\n")

    java_code.extend(import_lines)

    java_code.append("\n")

    java_code.extend(class_lines)


    java_file_path = os.path.join(output_dir, f"{class_name}.java")

    with open(java_file_path, "w") as f:

        f.write("\n".join(java_code))


    print(f"✅ Java POJO generated at: {java_file_path}")


if __name__ == "__main__":

    json_schema_to_pojo(

        schema_path="Output_Schema.json",

        output_dir="generated",

        package="com.example.generated",

        class_name="MyPojo"

    )


JSON Schema to Basic POJO

 import json

import os


# Map JSON Schema types to Java types

TYPE_MAPPING = {

    "string": "String",

    "integer": "int",

    "number": "double",

    "boolean": "boolean",

}


def generate_class(name, schema, indent=0):

    indent_str = "    " * indent

    class_lines = []

    class_lines.append(f"{indent_str}public class {name} " + "{")


    # Fields

    properties = schema.get("properties", {})

    required = schema.get("required", [])


    for prop, prop_schema in properties.items():

        java_type = "Object"  # default fallback

        if "type" in prop_schema:

            t = prop_schema["type"]

            if t == "object":

                # nested object - create inner class recursively

                inner_class_name = prop.capitalize()

                inner_class = generate_class(inner_class_name, prop_schema, indent + 1)

                class_lines.extend(inner_class)

                java_type = inner_class_name

            else:

                java_type = TYPE_MAPPING.get(t, "Object")

        else:

            java_type = "Object"


        class_lines.append(f"{indent_str}    private {java_type} {prop};")


    class_lines.append("")


    # Getters and setters

    for prop, prop_schema in properties.items():

        java_type = "Object"

        if "type" in prop_schema:

            t = prop_schema["type"]

            if t == "object":

                java_type = prop.capitalize()

            else:

                java_type = TYPE_MAPPING.get(t, "Object")


        prop_camel = prop[0].upper() + prop[1:]


        # Getter

        class_lines.append(f"{indent_str}    public {java_type} get{prop_camel}() " + "{")

        class_lines.append(f"{indent_str}        return {prop};")

        class_lines.append(f"{indent_str}    }}")


        # Setter

        class_lines.append(f"{indent_str}    public void set{prop_camel}({java_type} {prop}) " + "{")

        class_lines.append(f"{indent_str}        this.{prop} = {prop};")

        class_lines.append(f"{indent_str}    }}")

        class_lines.append("")


    class_lines.append(f"{indent_str}}}")

    return class_lines


def json_schema_to_pojo(schema_path, output_dir="output", class_name="Root"):

    with open(schema_path) as f:

        schema = json.load(f)


    os.makedirs(output_dir, exist_ok=True)

    class_code = generate_class(class_name, schema)

    java_file = os.path.join(output_dir, f"{class_name}.java")


    with open(java_file, "w") as f:

        f.write("\n".join(class_code))


    print(f"Java POJO class generated at: {java_file}")


if __name__ == "__main__":

    json_schema_to_pojo("Output_Schema.json", output_dir="generated", class_name="MyPojo")


Excel Data to JSON Schema Validation with Report

 import pandas as pd

import json

import csv

from jsonschema import validate, ValidationError


def insert_nested_key(obj, key_path, value):

    keys = key_path.split(".")

    for key in keys[:-1]:

        obj = obj.setdefault(key, {})

    obj[keys[-1]] = value


def validate_json(obj, schema):

    try:

        validate(instance=obj, schema=schema)

        return True, None

    except ValidationError as e:

        return False, e.message


def excel_to_validated_json(

    input_excel_path,

    schema_path,

    output_json_path="validated_output.json",

    error_report_path="validation_errors.csv"

):

    df = pd.read_excel(input_excel_path)


    # Load JSON Schema

    with open(schema_path, "r") as f:

        schema = json.load(f)


    valid_records = []

    error_records = []


    for index, row in df.iterrows():

        json_obj = {}

        for col, val in row.items():

            insert_nested_key(json_obj, col, val)


        is_valid, error = validate_json(json_obj, schema)


        if is_valid:

            valid_records.append(json_obj)

        else:

            error_records.append({

                "Excel Row": index + 2,  # Excel row number (1-based + header)

                "Error Message": error

            })


    # Save valid JSON objects

    with open(output_json_path, "w") as f:

        json.dump(valid_records, f, indent=2)


    print(f"✅ Saved {len(valid_records)} valid JSON object(s) to: {output_json_path}")


    # Save validation error report

    if error_records:

        with open(error_report_path, "w", newline='') as csvfile:

            fieldnames = ["Excel Row", "Error Message"]

            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

            writer.writeheader()

            for error in error_records:

                writer.writerow(error)


        print(f"❌ Found {len(error_records)} invalid row(s). Errors saved to: {error_report_path}")

    else:

        print("✅ No validation errors found!")


if __name__ == "__main__":

    excel_to_validated_json(

        input_excel_path="your_excel_file.xlsx",

        schema_path="Output_Schema.json"

    )


Excel Data To JSON with schema Validation

 import pandas as pd

import json

from jsonschema import validate, ValidationError


def insert_nested_key(obj, key_path, value):

    keys = key_path.split(".")

    for key in keys[:-1]:

        obj = obj.setdefault(key, {})

    obj[keys[-1]] = value


def validate_json(obj, schema):

    try:

        validate(instance=obj, schema=schema)

        return True, None

    except ValidationError as e:

        return False, e.message


def excel_to_nested_json_with_validation(

    input_excel_path, schema_path, output_json_path="validated_output.json"

):

    df = pd.read_excel(input_excel_path)


    # Load JSON Schema

    with open(schema_path, "r") as f:

        schema = json.load(f)


    records = []

    errors = []


    for idx, (_, row) in enumerate(df.iterrows()):

        json_obj = {}

        for col, val in row.items():

            insert_nested_key(json_obj, col, val)


        is_valid, err_msg = validate_json(json_obj, schema)

        if is_valid:

            records.append(json_obj)

        else:

            errors.append((idx + 2, err_msg))  # +2 for Excel row number


    # Write valid records to file

    with open(output_json_path, "w") as f:

        json.dump(records, f, indent=2)


    print(f"✅ {len(records)} valid records saved to {output_json_path}")


    if errors:

        print(f"❌ {len(errors)} invalid record(s) found:")

        for row_num, msg in errors:

            print(f"  - Row {row_num}: {msg}")


if __name__ == "__main__":

    excel_to_nested_json_with_validation(

        "your_excel_file.xlsx",

        "Output_Schema.json"

    )


Exceldata to nested json object

 import pandas as pd

import json


def insert_nested_key(obj, key_path, value):

    keys = key_path.split(".")

    for key in keys[:-1]:

        obj = obj.setdefault(key, {})

    obj[keys[-1]] = value


def excel_to_nested_json(input_excel_path, output_json_path="output_data.json"):

    df = pd.read_excel(input_excel_path)

    records = []


    for _, row in df.iterrows():

        json_obj = {}

        for col, val in row.items():

            insert_nested_key(json_obj, col, val)

        records.append(json_obj)


    with open(output_json_path, "w") as f:

        json.dump(records, f, indent=2)


    print(f"✅ Nested JSON with {len(records)} record(s) saved to: {output_json_path}")


if __name__ == "__main__":

    excel_to_nested_json("your_excel_file.xlsx")


ValidateAllJSONFilesinFolder

 import os

import json

import pandas as pd

from jsonschema import Draft7Validator


def load_json(filepath):

    with open(filepath, "r") as f:

        return json.load(f)


def validate_file(json_data, schema):

    validator = Draft7Validator(schema)

    errors = list(validator.iter_errors(json_data))

    results = []

    for error in errors:

        path = ".".join(str(p) for p in error.path) or "[root]"

        results.append((path, error.message))

    return results


def validate_all_jsons(folder_path, schema_path, report_path="validation_report.xlsx"):

    schema = load_json(schema_path)


    results = []

    for file in os.listdir(folder_path):

        if file.endswith(".json"):

            file_path = os.path.join(folder_path, file)

            try:

                json_data = load_json(file_path)

                errors = validate_file(json_data, schema)


                if errors:

                    for path, msg in errors:

                        results.append({

                            "File": file,

                            "Status": "Invalid",

                            "Path": path,

                            "Message": msg

                        })

                else:

                    results.append({

                        "File": file,

                        "Status": "Valid",

                        "Path": "",

                        "Message": ""

                    })

            except Exception as e:

                results.append({

                    "File": file,

                    "Status": "Error",

                    "Path": "",

                    "Message": str(e)

                })


    df = pd.DataFrame(results)

    df.to_excel(report_path, index=False)

    print(f"✅ Validation report saved to: {report_path}")


if __name__ == "__main__":

    # Update these paths

    JSON_FOLDER = "./data_samples"

    SCHEMA_FILE = "output_schema.json"

    OUTPUT_REPORT = "validation_report.xlsx"


    validate_all_jsons(JSON_FOLDER, SCHEMA_FILE, OUTPUT_REPORT)


JsonSchemaValidator

 import json

from jsonschema import validate, Draft7Validator, ValidationError


def load_json_file(filepath):

    with open(filepath, "r") as f:

        return json.load(f)


def validate_json(json_data, schema):

    validator = Draft7Validator(schema)

    errors = sorted(validator.iter_errors(json_data), key=lambda e: e.path)

    return errors


def main(schema_file, data_file):

    schema = load_json_file(schema_file)

    json_data = load_json_file(data_file)


    errors = validate_json(json_data, schema)


    if not errors:

        print("✅ JSON is valid against the schema.")

    else:

        print(f"❌ Found {len(errors)} validation error(s):")

        for error in errors:

            path = ".".join([str(p) for p in error.path])

            print(f"• {path or '[root]'}: {error.message}")


if __name__ == "__main__":

    # Replace with your filenames or make it command-line-driven

    schema_path = "output_schema.json"

    json_path = "sample_data.json"

    main(schema_path, json_path)


JsonToExcel

 import json

import pandas as pd


def flatten_schema(schema, group="root", required_fields=None):

    rows = []

    properties = schema.get("properties", {})

    required_fields = required_fields or []


    for prop_name, prop_schema in properties.items():

        full_required = prop_name in required_fields

        if prop_schema.get("type") == "object" and "properties" in prop_schema:

            sub_required = prop_schema.get("required", [])

            nested_group = f"{group}.{prop_name}" if group != "root" else prop_name

            rows.extend(flatten_schema(prop_schema, nested_group, sub_required))

        else:

            row = {

                "Column Name": prop_name,

                "Data Type": prop_schema.get("type", ""),

                "Title": prop_schema.get("title", ""),

                "Description": prop_schema.get("description", ""),

                "Required": "Yes" if full_required else "No",

                "Group": group,

                "maxLength": prop_schema.get("maxLength", ""),

                "Enum": ",".join(map(str, prop_schema.get("enum", []))) if "enum" in prop_schema else "",

                "Pattern": prop_schema.get("pattern", ""),

                "Minimum": prop_schema.get("minimum", ""),

                "Maximum": prop_schema.get("maximum", ""),

                "Default": prop_schema.get("default", "")

            }

            rows.append(row)


    return rows


def json_schema_to_excel(json_file, excel_file="metadata_output.xlsx"):

    with open(json_file, "r") as f:

        schema = json.load(f)


    root_required = schema.get("required", [])

    flat_rows = flatten_schema(schema, required_fields=root_required)


    df = pd.DataFrame(flat_rows, columns=[

        "Column Name", "Data Type", "Title", "Description", "Required", "Group",

        "maxLength", "Enum", "Pattern", "Minimum", "Maximum", "Default"

    ])


    df.to_excel(excel_file, index=False)

    print(f"✅ Metadata Excel file saved as {excel_file}")


if __name__ == "__main__":

    json_schema_to_excel("output_schema.json")


Excel to JSON

 import pandas as pd

import json

from collections import defaultdict


# Excel → JSON Schema type mapping

TYPE_MAPPING = {

    "int": "integer",

    "integer": "integer",

    "float": "number",

    "double": "number",

    "decimal": "number",

    "varchar": "string",

    "char": "string",

    "string": "string",

    "bool": "boolean",

    "boolean": "boolean",

    "date": "string",

    "datetime": "string",

}


def build_field_schema(row):

    dtype_raw = str(row.get("Data Type", "")).lower()

    dtype = TYPE_MAPPING.get(dtype_raw, "string")


    field_schema = {"type": dtype}


    # Add title and description

    if pd.notna(row.get("Title")):

        field_schema["title"] = row["Title"]


    if pd.notna(row.get("Description")):

        field_schema["description"] = row["Description"]


    # String-specific

    if pd.notna(row.get("maxLength")) and dtype == "string":

        field_schema["maxLength"] = int(row["maxLength"])


    if pd.notna(row.get("Pattern")) and dtype == "string":

        field_schema["pattern"] = str(row["Pattern"])


    # Enum

    if pd.notna(row.get("Enum")):

        enum_values = [v.strip() for v in str(row["Enum"]).split(",") if v.strip()]

        if enum_values:

            field_schema["enum"] = enum_values


    # Number constraints

    if pd.notna(row.get("Minimum")) and dtype in ["integer", "number"]:

        field_schema["minimum"] = float(row["Minimum"])


    if pd.notna(row.get("Maximum")) and dtype in ["integer", "number"]:

        field_schema["maximum"] = float(row["Maximum"])


    # Default value

    if pd.notna(row.get("Default")):

        default_val = row["Default"]

        if dtype in ["integer", "number"]:

            default_val = float(default_val)

            if dtype == "integer":

                default_val = int(default_val)

        elif dtype == "boolean":

            default_val = str(default_val).lower() in ["true", "yes", "1"]

        field_schema["default"] = default_val


    return field_schema


def insert_nested(schema, group_path, prop_name, field_schema, is_required):

    parts = group_path.split(".")

    current = schema


    for part in parts:

        if "properties" not in current:

            current["properties"] = {}

        if part not in current["properties"]:

            current["properties"][part] = {"type": "object", "properties": {}}

        current = current["properties"][part]


    current["properties"][prop_name] = field_schema

    if is_required:

        current.setdefault("required", []).append(prop_name)


def excel_to_json_schema(excel_file, sheet_name=0):

    df = pd.read_excel(excel_file, sheet_name=sheet_name).fillna("")


    if 'Group' not in df.columns:

        df['Group'] = 'root'


    schema = {

        "type": "object",

        "properties": {},

        "required": []

    }


    for _, row in df.iterrows():

        column = row["Column Name"]

        group = row.get("Group", "root").strip()

        is_required = str(row.get("Required", "")).strip().lower() in ['yes', 'true', '1']

        field_schema = build_field_schema(row)


        insert_nested(schema, group, column, field_schema, is_required)


        if group == "root" and is_required:

            schema.setdefault("required", []).append(column)


    if not schema["required"]:

        schema.pop("required")


    return schema


if __name__ == "__main__":

    input_file = "table_metadata.xlsx"  # Your Excel file

    output_file = "output_schema.json"


    schema = excel_to_json_schema(input_file)


    with open(output_file, "w") as f:

        json.dump(schema, f, indent=2)


    print(f"✅ JSON schema saved to {output_file}")