from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, TimestampType
from pyspark.sql.utils import AnalysisException
# Initialize Spark session (already initialized in Databricks)
spark = SparkSession.builder.getOrCreate()
# Define the new database name
new_database = "gw_claimcenter_raw_db_2"
# Step 1: Create a new database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {new_database}")
# Define the base path for FileStore
base_path = "dbfs:/FileStore/"
# List all CSV files that start with 'clm_' using dbutils
csv_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.startswith("clm_") and f.name.endswith(".csv")]
# Initialize a list to store detailed log information
log_data = []
# Loop through each CSV file and infer schema
for file_path in csv_files:
# Extract the file name from the file path
file_name = file_path.split("/")[-1]
table_name = file_name.replace(".csv", "")
# Read the CSV file with schema inference
try:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
read_status = "Success"
except Exception as e:
read_status = f"Failed: {e}"
log_data.append((file_name, table_name, read_status, None, None))
print(f"File '{file_name}' read failed: {e}")
continue # Skip to the next file if reading fails
# Initialize column definitions for table creation
column_defs = []
column_info = [] # For detailed logging
# Loop through inferred schema fields and build column definitions
for field in df.schema.fields:
field_name = field.name
field_type = field.dataType
# Map PySpark data types to SQL data types
if isinstance(field_type, IntegerType):
sql_type = "INT"
elif isinstance(field_type, FloatType):
sql_type = "FLOAT"
elif isinstance(field_type, BooleanType):
sql_type = "BOOLEAN"
elif isinstance(field_type, TimestampType):
sql_type = "TIMESTAMP"
else:
sql_type = "STRING"
# Ensure column names are properly quoted
column_defs.append(f"`{field_name}` {sql_type}")
column_info.append((field_name, sql_type, "PRIMARY KEY" if field_name == "ID" else "NULL"))
# Print inferred column information
print(f"\nFile '{file_name}' column info: {column_info}\n")
# Set ID column as primary key if it exists
if 'ID' in df.columns:
column_defs = [col_def.replace("`ID` INT", "`ID` INT PRIMARY KEY") for col_def in column_defs]
# Combine column definitions into a single string for SQL
column_defs_str = ", ".join(column_defs)
# Create table in the new database
try:
# Create an empty Delta table using the DataFrame schema
df.limit(0).write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")
# Insert data into the newly created Delta table
df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{table_name}")
# Log the success for the file
create_status = "Success"
print(f"Table '{table_name}' created successfully.")
except AnalysisException as ae:
create_status = f"Failed: {ae.desc}"
print(f"Table '{table_name}' creation failed: {ae.desc}")
except Exception as e:
create_status = f"Failed: {e}"
print(f"Table '{table_name}' creation failed: {e}")
# Log the details for the file
log_data.append((file_name, table_name, read_status, create_status, str(column_info)))
# Step 5: Create a log DataFrame to store the detailed log information
log_schema = ["FileName", "TableName", "ReadStatus", "CreateStatus", "ColumnInfo"]
log_df = spark.createDataFrame(log_data, log_schema)
# Save the log table in the new database
log_table_name = "file_processing_log"
log_df.write.format("delta").mode("overwrite").saveAsTable(f"{new_database}.{log_table_name}")
# Display the final log information
print("\nFinal Log Table Information:")
log_df.show(truncate=False)
# Also display the log table from the database
print("\nFinal Log Table from Database:")
spark.sql(f"SELECT * FROM {new_database}.{log_table_name}").show(truncate=False)
Comments
Post a Comment
Share this to your friends