Skip to main content

Pyspark load and transform guidewire table with it's datatype autodetecting capacity





Continue from blog post --> 
Guidewire Self Managed version H2 Tables ingest into Databricks Delta Tables


Published Notebook for PySpark will load Guidewire CSV files and then create delta tables based on auto-detecting datatypes and then finally all the load and transformation Process also creating in separate delta file processing log table final steps

Here is the Code available on databricks

Code logic below 

1. Create New Guidewire Claim Tables from CSV File Auto-Detect Data Type V1.0 Template


from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, TimestampType
from pyspark.sql.utils import AnalysisException

# Initialize Spark session (already initialized in Databricks)
spark = SparkSession.builder.getOrCreate()

# Define the new database name
new_database = "gw_claimcenter_raw_db_2"

# Step 1: Create a new database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {new_database}")

# Define the base path for FileStore
base_path = "dbfs:/FileStore/"

# List all CSV files that start with 'clm_' using dbutils
csv_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.startswith("clm_") and f.name.endswith(".csv")]

# Initialize a list to store detailed log information
log_data = []

# Loop through each CSV file and infer schema
for file_path in csv_files:
    # Extract the file name from the file path
    file_name = file_path.split("/")[-1]
    table_name = file_name.replace(".csv", "")
   
    # Read the CSV file with schema inference
    try:
        df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
        read_status = "Success"
    except Exception as e:
        read_status = f"Failed: {e}"
        log_data.append((file_name, table_name, read_status, None, None))
        print(f"File '{file_name}' read failed: {e}")
        continue  # Skip to the next file if reading fails

    # Initialize column definitions for table creation
    column_defs = []
    column_info = []  # For detailed logging

    # Loop through inferred schema fields and build column definitions
    for field in df.schema.fields:
        field_name = field.name
        field_type = field.dataType

        # Map PySpark data types to SQL data types
        if isinstance(field_type, IntegerType):
            sql_type = "INT"
        elif isinstance(field_type, FloatType):
            sql_type = "FLOAT"
        elif isinstance(field_type, BooleanType):
            sql_type = "BOOLEAN"
        elif isinstance(field_type, TimestampType):
            sql_type = "TIMESTAMP"
        else:
            sql_type = "STRING"
       
        # Ensure column names are properly quoted
        column_defs.append(f"`{field_name}` {sql_type}")
        column_info.append((field_name, sql_type, "PRIMARY KEY" if field_name == "ID" else "NULL"))

    # Print inferred column information
    print(f"\nFile '{file_name}' column info: {column_info}\n")

    # Set ID column as primary key if it exists
    if 'ID' in df.columns:
        column_defs = [col_def.replace("`ID` INT", "`ID` INT PRIMARY KEY") for col_def in column_defs]

    # Combine column definitions into a single string for SQL
    column_defs_str = ", ".join(column_defs)

    # Create table in the new database
    try:
        # Create an empty Delta table using the DataFrame schema
        df.limit(0).write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")
       
        # Insert data into the newly created Delta table
        df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")

        # Log the success for the file
        create_status = "Success"
        print(f"Table '{table_name}' created successfully.")
    except AnalysisException as ae:
        create_status = f"Failed: {ae.desc}"
        print(f"Table '{table_name}' creation failed: {ae.desc}")
    except Exception as e:
        create_status = f"Failed: {e}"
        print(f"Table '{table_name}' creation failed: {e}")

    # Log the details for the file
    log_data.append((file_name, table_name, read_status, create_status, str(column_info)))

# Step 5: Create a log DataFrame to store the detailed log information
log_schema = ["FileName", "TableName", "ReadStatus", "CreateStatus", "ColumnInfo"]
log_df = spark.createDataFrame(log_data, log_schema)

# Save the log table in the new database
log_table_name = "file_processing_log"
log_df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{log_table_name}")

# Display the final log information
print("\nFinal Log Table Information:")
log_df.show(truncate=False)

# Also display the log table from the database
print("\nFinal Log Table from Database:")
spark.sql(f"SELECT * FROM {new_database}.{log_table_name}").show(truncate=False)


2. Found ID has some Null values in each tables doing data cleaning


from pyspark.sql.utils import AnalysisException

# Define the source and target database names
source_database = "gw_claimcenter_raw_db_2"
target_database = "gw_claimcenter_cleaned_db_2"

# Step 1: Create the target database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_database}")

# Fetch list of tables in the source database
tables_list = spark.catalog.listTables(source_database)

# Loop through each table and filter rows where ID is not NULL
for table in tables_list:
    table_name = table.name
    source_table_name = f"{source_database}.{table_name}"
    target_table_name = f"{target_database}.{table_name}"

    try:
        # Read the existing Delta table into a DataFrame
        df = spark.table(source_table_name)

        # Check if 'ID' column exists in the DataFrame
        if 'ID' in df.columns:
            # Filter out rows where 'ID' is NULL
            df_filtered = df.filter(df['ID'].isNotNull())

            # Overwrite the target table with the filtered DataFrame
            df_filtered.write.format("delta").mode("overwrite").saveAsTable(target_table_name)

            print(f"Delta table '{target_table_name}' overwritten successfully with non-NULL 'ID' rows.")
        else:
            print(f"Table '{table_name}' does not have an 'ID' column, skipping.")
   
    except AnalysisException as ae:
        print(f"Error processing table '{table_name}': {ae.desc}")
    except Exception as e:
        print(f"Error processing table '{table_name}': {e}")

# Display a final message
print("\nOverwriting of Delta tables with non-NULL 'ID' rows completed.")

 

 

Comments

Popular posts from this blog

"How to maintain or retain tabs in same tab after button click events or postback?" using JQuery in ASP.NET C#

In this post I'll share an details about " How to maintain or retain tabs in same tab after button click events or postback? " Step 1: you need to download Jquery and JQueryUI Javascript libraries from this site http://jqueryui.com/ Step 2: As usually you can create ASP.NET website from Visual Studio IDE and add Jquery and JqueryUI plugins in the header section of aspx page. Step 3: Add HiddenField control inside aspx page which is very useful to retain tab in same page Step 4: Use the HiddenField ID in Jquery code to indicate that CurrentTab Index Step 5: In code Behind, using Enumerations concept give the tab index values as user defined variable  Step 6: Use the Enum values in every Button click events on different tabs to check that tab could be retained in the same tab Further, Here I'll give the code details and snap shot pictures, 1. Default.aspx: Design Page First Second Third ...

Login and Registration forms in C# windows application with Back end Microsoft SQL Server for data access

In this article, I'm gonna share about how to make login and register form with MS SQL database; 1. Flow Chart Logic 2. Normal Features 3. Form Designs Login Form Design Sign in Form Design Password Retrieve Form 4. Database Design and SQL queries and Stored Procedure Create new Database as "schooldata" create table registerdata (  ID int identity,  Username nvarchar(100),  Password nvarchar(100),  Fullname  nvarchar(100),  MobileNO nvarchar(100),  EmailID nvarchar(100)  ) select * from registerdata create procedure regis (  @Username as nvarchar(100),  @Password as nvarchar(100),  @Fullname as nvarchar(100),  @MobileNO as nvarchar(100),  @EmailID as nvarchar(100)  ) as begin insert into registerdata (Username, Password, Fullname, MobileNO,EmailID) values (@Username, @Password, @Fullname, @MobileNO, @EmailID) ...

Guidewire Related Interview Question and answers part 1

common Guidewire questions and answers 20 Guidewire BC Q&A Top 100 Guidewire Interview FAQ Guidewire Claimcenter 20 Interview Questions Guidewire Rating concepts