SQL Queries Code Interviews QA 50 plus

Image
Here are SQL-focused interview questions with only the relevant SQL code: 1. Find the second highest salary from an Employee table. SELECT MAX(Salary) AS SecondHighestSalary FROM Employees WHERE Salary < (SELECT MAX(Salary) FROM Employees); Using ROW_NUMBER(): WITH RankedSalaries AS (   SELECT Salary, ROW_NUMBER() OVER (ORDER BY Salary DESC) AS Rank   FROM Employees ) SELECT Salary AS SecondHighestSalary FROM RankedSalaries WHERE Rank = 2; --- 2. Write a query to calculate a running total of sales. SELECT   OrderID,   OrderDate,   Amount,   SUM(Amount) OVER (ORDER BY OrderDate) AS RunningTotal FROM Orders; --- 3. Retrieve customers who placed no orders using a LEFT JOIN. SELECT c.CustomerID, c.CustomerName FROM Customers c LEFT JOIN Orders o ON c.CustomerID = o.CustomerID WHERE o.OrderID IS NULL; --- 4. Write a query to find the top 3 highest salaries. SELECT DISTINCT Salary FROM Employees ORDER BY Salary DESC LIMIT 3; Using DENSE_RANK(): WIT...

Pyspark load and transform guidewire table with it's datatype autodetecting capacity





Continue from blog post --> 
Guidewire Self Managed version H2 Tables ingest into Databricks Delta Tables


Published Notebook for PySpark will load Guidewire CSV files and then create delta tables based on auto-detecting datatypes and then finally all the load and transformation Process also creating in separate delta file processing log table final steps

Here is the Code available on databricks

Code logic below 

1. Create New Guidewire Claim Tables from CSV File Auto-Detect Data Type V1.0 Template


from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, TimestampType
from pyspark.sql.utils import AnalysisException

# Initialize Spark session (already initialized in Databricks)
spark = SparkSession.builder.getOrCreate()

# Define the new database name
new_database = "gw_claimcenter_raw_db_2"

# Step 1: Create a new database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {new_database}")

# Define the base path for FileStore
base_path = "dbfs:/FileStore/"

# List all CSV files that start with 'clm_' using dbutils
csv_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.startswith("clm_") and f.name.endswith(".csv")]

# Initialize a list to store detailed log information
log_data = []

# Loop through each CSV file and infer schema
for file_path in csv_files:
    # Extract the file name from the file path
    file_name = file_path.split("/")[-1]
    table_name = file_name.replace(".csv", "")
   
    # Read the CSV file with schema inference
    try:
        df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
        read_status = "Success"
    except Exception as e:
        read_status = f"Failed: {e}"
        log_data.append((file_name, table_name, read_status, None, None))
        print(f"File '{file_name}' read failed: {e}")
        continue  # Skip to the next file if reading fails

    # Initialize column definitions for table creation
    column_defs = []
    column_info = []  # For detailed logging

    # Loop through inferred schema fields and build column definitions
    for field in df.schema.fields:
        field_name = field.name
        field_type = field.dataType

        # Map PySpark data types to SQL data types
        if isinstance(field_type, IntegerType):
            sql_type = "INT"
        elif isinstance(field_type, FloatType):
            sql_type = "FLOAT"
        elif isinstance(field_type, BooleanType):
            sql_type = "BOOLEAN"
        elif isinstance(field_type, TimestampType):
            sql_type = "TIMESTAMP"
        else:
            sql_type = "STRING"
       
        # Ensure column names are properly quoted
        column_defs.append(f"`{field_name}` {sql_type}")
        column_info.append((field_name, sql_type, "PRIMARY KEY" if field_name == "ID" else "NULL"))

    # Print inferred column information
    print(f"\nFile '{file_name}' column info: {column_info}\n")

    # Set ID column as primary key if it exists
    if 'ID' in df.columns:
        column_defs = [col_def.replace("`ID` INT", "`ID` INT PRIMARY KEY") for col_def in column_defs]

    # Combine column definitions into a single string for SQL
    column_defs_str = ", ".join(column_defs)

    # Create table in the new database
    try:
        # Create an empty Delta table using the DataFrame schema
        df.limit(0).write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")
       
        # Insert data into the newly created Delta table
        df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")

        # Log the success for the file
        create_status = "Success"
        print(f"Table '{table_name}' created successfully.")
    except AnalysisException as ae:
        create_status = f"Failed: {ae.desc}"
        print(f"Table '{table_name}' creation failed: {ae.desc}")
    except Exception as e:
        create_status = f"Failed: {e}"
        print(f"Table '{table_name}' creation failed: {e}")

    # Log the details for the file
    log_data.append((file_name, table_name, read_status, create_status, str(column_info)))

# Step 5: Create a log DataFrame to store the detailed log information
log_schema = ["FileName", "TableName", "ReadStatus", "CreateStatus", "ColumnInfo"]
log_df = spark.createDataFrame(log_data, log_schema)

# Save the log table in the new database
log_table_name = "file_processing_log"
log_df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{log_table_name}")

# Display the final log information
print("\nFinal Log Table Information:")
log_df.show(truncate=False)

# Also display the log table from the database
print("\nFinal Log Table from Database:")
spark.sql(f"SELECT * FROM {new_database}.{log_table_name}").show(truncate=False)


2. Found ID has some Null values in each tables doing data cleaning


from pyspark.sql.utils import AnalysisException

# Define the source and target database names
source_database = "gw_claimcenter_raw_db_2"
target_database = "gw_claimcenter_cleaned_db_2"

# Step 1: Create the target database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_database}")

# Fetch list of tables in the source database
tables_list = spark.catalog.listTables(source_database)

# Loop through each table and filter rows where ID is not NULL
for table in tables_list:
    table_name = table.name
    source_table_name = f"{source_database}.{table_name}"
    target_table_name = f"{target_database}.{table_name}"

    try:
        # Read the existing Delta table into a DataFrame
        df = spark.table(source_table_name)

        # Check if 'ID' column exists in the DataFrame
        if 'ID' in df.columns:
            # Filter out rows where 'ID' is NULL
            df_filtered = df.filter(df['ID'].isNotNull())

            # Overwrite the target table with the filtered DataFrame
            df_filtered.write.format("delta").mode("overwrite").saveAsTable(target_table_name)

            print(f"Delta table '{target_table_name}' overwritten successfully with non-NULL 'ID' rows.")
        else:
            print(f"Table '{table_name}' does not have an 'ID' column, skipping.")
   
    except AnalysisException as ae:
        print(f"Error processing table '{table_name}': {ae.desc}")
    except Exception as e:
        print(f"Error processing table '{table_name}': {e}")

# Display a final message
print("\nOverwriting of Delta tables with non-NULL 'ID' rows completed.")

 

 

Comments

Popular posts from this blog

"How to maintain or retain tabs in same tab after button click events or postback?" using JQuery in ASP.NET C#

Login and Registration forms in C# windows application with Back end Microsoft SQL Server for data access