Continue from blog post -->
Guidewire Self Managed version H2 Tables ingest into Databricks Delta Tables
Published Notebook for PySpark will load Guidewire CSV files and then create delta tables based on auto-detecting datatypes and then finally all the load and transformation Process also creating in separate delta file processing log table final steps
Here is the Code available on databricks
Code logic below
1. Create New Guidewire Claim Tables from CSV File Auto-Detect Data Type V1.0 Template
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, TimestampTypefrom pyspark.sql.utils import AnalysisException# Initialize Spark session (already initialized in Databricks)spark = SparkSession.builder.getOrCreate()# Define the new database namenew_database = "gw_claimcenter_raw_db_2"# Step 1: Create a new database if it doesn't existspark.sql(f"CREATE DATABASE IF NOT EXISTS {new_database}")# Define the base path for FileStorebase_path = "dbfs:/FileStore/"# List all CSV files that start with 'clm_' using dbutilscsv_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.startswith("clm_") and f.name.endswith(".csv")]# Initialize a list to store detailed log informationlog_data = []# Loop through each CSV file and infer schemafor file_path in csv_files:# Extract the file name from the file pathfile_name = file_path.split("/")[-1]table_name = file_name.replace(".csv", "")# Read the CSV file with schema inferencetry:df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)read_status = "Success"except Exception as e:read_status = f"Failed: {e}"log_data.append((file_name, table_name, read_status, None, None))print(f"File '{file_name}' read failed: {e}")continue # Skip to the next file if reading fails# Initialize column definitions for table creationcolumn_defs = []column_info = [] # For detailed logging# Loop through inferred schema fields and build column definitionsfor field in df.schema.fields:field_name = field.namefield_type = field.dataType# Map PySpark data types to SQL data typesif isinstance(field_type, IntegerType):sql_type = "INT"elif isinstance(field_type, FloatType):sql_type = "FLOAT"elif isinstance(field_type, BooleanType):sql_type = "BOOLEAN"elif isinstance(field_type, TimestampType):sql_type = "TIMESTAMP"else:sql_type = "STRING"# Ensure column names are properly quotedcolumn_defs.append(f"`{field_name}` {sql_type}")column_info.append((field_name, sql_type, "PRIMARY KEY" if field_name == "ID" else "NULL"))# Print inferred column informationprint(f"\nFile '{file_name}' column info: {column_info}\n")# Set ID column as primary key if it existsif 'ID' in df.columns:column_defs = [col_def.replace("`ID` INT", "`ID` INT PRIMARY KEY") for col_def in column_defs]# Combine column definitions into a single string for SQLcolumn_defs_str = ", ".join(column_defs)# Create table in the new databasetry:# Create an empty Delta table using the DataFrame schemadf.limit(0).write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")# Insert data into the newly created Delta tabledf.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")# Log the success for the filecreate_status = "Success"print(f"Table '{table_name}' created successfully.")except AnalysisException as ae:create_status = f"Failed: {ae.desc}"print(f"Table '{table_name}' creation failed: {ae.desc}")except Exception as e:create_status = f"Failed: {e}"print(f"Table '{table_name}' creation failed: {e}")# Log the details for the filelog_data.append((file_name, table_name, read_status, create_status, str(column_info)))# Step 5: Create a log DataFrame to store the detailed log informationlog_schema = ["FileName", "TableName", "ReadStatus", "CreateStatus", "ColumnInfo"]log_df = spark.createDataFrame(log_data, log_schema)# Save the log table in the new databaselog_table_name = "file_processing_log"log_df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{log_table_name}")# Display the final log informationprint("\nFinal Log Table Information:")log_df.show(truncate=False)# Also display the log table from the databaseprint("\nFinal Log Table from Database:")spark.sql(f"SELECT * FROM {new_database}.{log_table_name}").show(truncate=False)
2. Found ID has some Null values in each tables doing data cleaning
from pyspark.sql.utils import AnalysisException# Define the source and target database namessource_database = "gw_claimcenter_raw_db_2"target_database = "gw_claimcenter_cleaned_db_2"# Step 1: Create the target database if it doesn't existspark.sql(f"CREATE DATABASE IF NOT EXISTS {target_database}")# Fetch list of tables in the source databasetables_list = spark.catalog.listTables(source_database)# Loop through each table and filter rows where ID is not NULLfor table in tables_list:table_name = table.namesource_table_name = f"{source_database}.{table_name}"target_table_name = f"{target_database}.{table_name}"try:# Read the existing Delta table into a DataFramedf = spark.table(source_table_name)# Check if 'ID' column exists in the DataFrameif 'ID' in df.columns:# Filter out rows where 'ID' is NULLdf_filtered = df.filter(df['ID'].isNotNull())# Overwrite the target table with the filtered DataFramedf_filtered.write.format("delta").mode("overwrite").saveAsTable(target_table_name)print(f"Delta table '{target_table_name}' overwritten successfully with non-NULL 'ID' rows.")else:print(f"Table '{table_name}' does not have an 'ID' column, skipping.")except AnalysisException as ae:print(f"Error processing table '{table_name}': {ae.desc}")except Exception as e:print(f"Error processing table '{table_name}': {e}")# Display a final messageprint("\nOverwriting of Delta tables with non-NULL 'ID' rows completed.")
Comments
Post a Comment
Share this to your friends