Data engineering projects
Python ETL code to ingest PDFs files into an Oracle table
Assume you are receiving PDF files with data which you intend to ingest and load to an oracle database.
The assumption is that the PDF files maintain a structure exactly like a CSV file format which can be easily ingested into a database table.
For this scenario, we need a Python code which automates an ETL process for ingesting data from PDF files into an Oracle database.
The code may follows these steps:
Sets the SFTP connection parameters for accessing the folder where the daily PDF files are dumped.
Sets the Oracle database connection parameters for connecting to the database where the data will be ingested.
Sets the local file path for storing the converted CSV files.
Connects to the SFTP server and retrieves the list of PDF files in the folder.
Loops through each PDF file in the folder and converts it to CSV using PyPDF2 and Tabula packages.
Connects to the Oracle database and creates a new table (if not exists) with the same name as the CSV file.
Inserts the data from the CSV file into the table.
Logs the completion of the ETL job.
The code uses several Python libraries such as pysftp, os, PyPDF2, pandas, cx_Oracle, and tabula. The main steps in the code are as follows:
Connects to the SFTP server and retrieves the list of PDF files in the folder using the pysftp library.
Loops through each PDF file in the folder and converts it to CSV using PyPDF2 and Tabula packages.
Connects to the Oracle database and creates a new table with the same name as the CSV file (if it does not already exist) using the cx_Oracle library.
Inserts the data from the CSV file into the table using cx_Oracle library.
Logs the completion of the ETL job.
The code can be run automatically using a cron job at 9:00 PM each night, which will check for any new PDF files in the SFTP folder, convert them to CSV, and ingest the data into the Oracle database without human intervention.
Complete code
You can try this ETL and make it better my improving it:
#importing the necessary libraries
import pysftp
import os
import PyPDF2
import pandas as pd
import cx_Oracle
import tabula
from datetime import datetime
# Setting the SFTP connection parameters
sftp_host = "sftp.example.com"
sftp_user = "username"
sftp_pass = "password"
sftp_path = "/path/to/sftp/folder/"
# Setting the Oracle database connection parameters
oracle_host = "oracle.example.com"
oracle_port = 1521
oracle_service = "ORACLE_SERVICE_NAME"
oracle_user = "username"
oracle_pass = "password"
# Setting the local file path for storing the converted CSV files
local_path = "/path/to/local/folder/"
# Connecting to the SFTP server and get the list of PDF files in the folder
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
with pysftp.Connection(sftp_host, username=sftp_user, password=sftp_pass, cnopts=cnopts) as sftp:
sftp.cwd(sftp_path)
pdf_files = [f for f in sftp.listdir() if f.endswith(".pdf")]
# Looping through each PDF file in the folder and convert it to CSV
for pdf_file in pdf_files:
# Set the file paths for the PDF and CSV files
pdf_path = os.path.join(sftp_path, pdf_file)
csv_file = os.path.splitext(pdf_file)[0] + ".csv"
csv_path = os.path.join(local_path, csv_file)
# Opening the PDF file in binary mode and get the number of pages
with sftp.open(pdf_path, "rb") as f:
pdf_reader = PyPDF2.PdfFileReader(f)
num_pages = pdf_reader.numPages
# Creating an empty list to store the extracted data
data = []
# Looping through each page in the PDF file and extract the data
for i in range(num_pages):
df = tabula.read_pdf(pdf_path, pages=str(i+1))[0]
df_dict = df.to_dict(orient="records")
data.extend(df_dict)
# Converting the list of dictionaries to a Pandas data frame and write to CSV
df = pd.DataFrame(data)
df.to_csv(csv_path, index=False)
# Connecting to the Oracle database and insert the data
dsn = cx_Oracle.makedsn(oracle_host, oracle_port, service_name=oracle_service)
with cx_Oracle.connect(user=oracle_user, password=oracle_pass, dsn=dsn) as conn:
cursor = conn.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {csv_file} (column1 VARCHAR2(50), column2 NUMBER, ...)")
with open(csv_path, "r") as csv_file:
cursor.execute(f"DELETE FROM {csv_file} WHERE DATE_TRUNC('DAY', date_column) = DATE_TRUNC('DAY', SYSDATE)")
cursor.copy_from(csv_file, csv_file.name, sep=",", columns=("column1", "column2", ...))
conn.commit()
cursor.close()
print(f"Data from {pdf_file} has been ingested into the {csv_file} table.")
# Logging the completion of the ETL job
now = datetime.now()
current_time = now.strftime("%Y-%m-%d %H:%M:%S")
print(f"Python ETL job completed at {current_time}.")
Note that you'll need to replace the placeholder values for the SFTP and Oracle connection parameters, as well as the local file path and the table column names, with the correct parameters