Data engineering scenarios
Python ETL scheduled using cron (Query an Oracle table and convert the output into CSV, then send the CSV via email)
Scenario:
A customer wants you to run queries in an oracle database table, then convert the output of the query into a CSV file, then automates sending the CSV to the customer through email.
To schedule a Python ETL (Extract, Transform, Load) process using cron on a Linux system, you can follow these steps:
- Write a Python script that connects to an Oracle database, queries a table, and saves the result into a CSV file. You can use the
cx_Oracle
module to connect to Oracle, and thecsv
module to write the CSV file.
Here’s an example script that queries a table called “employees” and saves the result into a file called “employees.csv”:
import cx_Oracle
import csv
# Connect to Oracle
dsn_tns = cx_Oracle.makedsn('<hostname>', '<port>', service_name='<service_name>')
conn = cx_Oracle.connect(user='<username>', password='<password>', dsn=dsn_tns)
# Query the table
cur = conn.cursor()
cur.execute("SELECT * FROM employees")
rows = cur.fetchall()
# Write the CSV file
with open('employees.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([col[0] for col in cur.description]) # Write column headers
for row in rows:
writer.writerow(row)
# Close the database connection
cur.close()
conn.close()
Save the Python script in a directory of your choice, such as
/home/user/scripts
.Make the script executable by running the command
chmod +x /home/user/scripts/etl.py
.Open the crontab file by running the command
crontab -e
.Add a new line at the bottom of the file to schedule the Python script to run at a specific time. For example, to run the script every day at 6am, add the following line:
0 6 * * * /home/user/scripts/etl.py
This line specifies that the script should be run at minute 0 and hour 6 every day. You can customize the schedule by changing the numbers and the asterisks to match your desired timing.
Save the crontab file and exit.
Test the script by running it manually using the command
/home/user/scripts/etl.py
. If the script runs successfully, it should create a new file called "employees.csv" in the same directory.To send the CSV file via email, you can use the
smtplib
module to connect to an SMTP server and send an email with an attachment. Here's an example script that sends an email with the CSV file as an attachment:
import smtplib
import os
from email.message import EmailMessage
# Create an email message
msg = EmailMessage()
msg['Subject'] = 'Oracle table export'
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg.set_content('Please find attached the CSV file exported from Oracle')
# Add the CSV file as an attachment
with open('employees.csv', 'rb') as f:
file_data = f.read()
file_name = os.path.basename(f.name)
msg.add_attachment(file_data, maintype='text', subtype='csv', filename=file_name)
# Connect to the SMTP server and send the email
smtp_server = 'smtp.example.com'
smtp_username = 'sender@example.com'
smtp_password = 'password'
with smtplib.SMTP(smtp_server, 587) as smtp:
smtp.starttls()
smtp.login(smtp_username, smtp_password)
smtp.send_message(msg)
Save the email script in the same directory as the ETL script.
Modify the ETL script to call the email script after it finishes writing the CSV file.
Here’s an updated version of the ETL script that calls the email script after writing the CSV file:
import cx_Oracle
import csv
import os
import smtplib
from email.message import EmailMessage
# Connect to Oracle
dsn_tns = cx_Oracle.makedsn('<hostname>', '<port>', service_name='<service_name>')
conn = cx_Oracle.connect(user='<username>', password='<password>', dsn=dsn_tns)
# Query the table
cur = conn.cursor()
cur.execute("SELECT * FROM employees")
rows = cur.fetchall()
# Write the CSV file
csv_file = 'employees.csv'
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([col[0] for col in cur.description]) # Write column headers
for row in rows:
writer.writerow(row)
# Close the database connection
cur.close()
conn.close()
# Send email with the CSV file as an attachment
msg = EmailMessage()
msg['Subject'] = 'Oracle table export'
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg.set_content('Please find attached the CSV file exported from Oracle')
with open(csv_file, 'rb') as f:
file_data = f.read()
file_name = os.path.basename(f.name)
msg.add_attachment(file_data, maintype='text', subtype='csv', filename=file_name)
smtp_server = 'smtp.example.com'
smtp_username = 'sender@example.com'
smtp_password = 'password'
with smtplib.SMTP(smtp_server, 587) as smtp:
smtp.starttls()
smtp.login(smtp_username, smtp_password)
smtp.send_message(msg)
Make sure to replace the placeholders <hostname>
, <port>
, <service_name>
, <username>
, <password>
, <
sender@example.com
>
, and <
recipient@example.com
>
with the actual values for your Oracle database, email server, and email addresses.
Step by step explanations of each step of the Python ETL (Extract, Transform, Load) script.
First, we need to import the necessary modules for the script:
Step 1: Install Required Python Modules The first step is to install the required Python modules. We need the cx_Oracle and csv modules for the ETL process, and the smtplib and EmailMessage modules for sending the email. We can install these modules using pip, the Python package manager. Open a terminal window and enter the following commands:
import cx_Oracle
import csv
import os
import smtplib
from email.message import EmailMessage
cx_Oracle
is a Python module that allows us to connect to an Oracle database and perform SQL queries.csv
is a built-in Python module that provides functionality for reading and writing CSV files.os
is another built-in Python module that provides functionality for interacting with the operating system, such as getting the current working directory and file paths.smtplib
is a Python module for sending emails using the Simple Mail Transfer Protocol (SMTP).EmailMessage
is a class from theemail.message
module that allows us to create and send email messages.
Step 2: Write the ETL Script The next step is to write the ETL script that will extract data from the Oracle database, transform it into CSV format, and save it to disk. We’ll call this script etl.py
.
Here’s the complete code:
import cx_Oracle
import csv
# Connect to Oracle
dsn_tns = cx_Oracle.makedsn('hostname', 'port', service_name='service_name')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
# Query the table
cur = conn.cursor()
cur.execute("SELECT * FROM employees")
rows = cur.fetchall()
# Write the CSV file
with open('employees.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([col[0] for col in cur.description]) # Write column headers
for row in rows:
writer.writerow(row)
# Close the database connection
cur.close()
conn.close()
In this script, we first connect to the Oracle database using the cx_Oracle
module. We then execute a SELECT query on the employees
table and fetch all the rows using the fetchall()
method.
So, for connection, we need to connect to the Oracle database:
dsn_tns = cx_Oracle.makedsn('<hostname>', '<port>', service_name='<service_name>')
conn = cx_Oracle.connect(user='<username>', password='<password>', dsn=dsn_tns)
dsn_tns
is a string that specifies the Oracle database connection details, including the hostname, port number, and service name.cx_Oracle.makedsn
is a function that creates a Data Source Name (DSN) string from the hostname, port number, and service name.conn
is a connection object that is returned when we call theconnect
method of thecx_Oracle
module. We pass in the database connection details, including the username and password.
Once we are connected to the database, we can execute a SQL query and fetch the results:
cur = conn.cursor()
cur.execute("SELECT * FROM employees")
rows = cur.fetchall()
cur
is a cursor object that is returned when we call thecursor
method of the connection object. We can use this cursor to execute SQL queries and fetch the results.cur.execute
is a method that takes an SQL query as a string and executes it on the Oracle database."SELECT * FROM employees"
is an SQL query that selects all columns and rows from theemployees
table in the database.cur.fetchall()
is a method that retrieves all the rows returned by the SQL query and returns them as a list of tuples.
Next, we need to write the results of the SQL query to a CSV file.
For example, we may write the data to a CSV file called employees.csv
using the csv
module. First, we create a csv.writer
object and write the column headers using the description
attribute of the cursor object. We then iterate over the rows and write each row to the CSV file.
csv_file = 'employees.csv'
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([col[0] for col in cur.description]) # Write column headers
for row in rows:
writer.writerow(row)
csv_file
is a string that specifies the name of the CSV file that we want to create.open
is a built-in Python function that opens a file for reading or writing.'w'
is a string that specifies the mode in which to open the file ('w'
for write mode).newline=''
is a parameter that specifies the line terminator to use when writing to the CSV file (we set it to the empty string to use the default line terminator).writer
is a CSV writer object that we create by passing in the file object returned byopen
.writer.writerow
is a method that writes a single row of data to the CSV file. We first write the column headers by callingcur.description
to get a list of the column names, and then we write
If we are writing two different scripts for the convertion to CSV and sending independently, we may have to close the connection, then work on the script for sending the CSV.
For this option, we close the database connection using the close()
method of the cursor and connection objects.
Here is how we close the database connection:
conn.close()
conn.close()
is a method that closes the database connection.
But if we are working on one script, after writing the results to the CSV file, we need to send the file via email. Here’s how we can do that:
msg = EmailMessage()
msg['Subject'] = 'Employee Data'
msg['From'] = '<sender_email_address>'
msg['To'] = '<recipient_email_address>'
msg.set_content('Please find attached the latest employee data.')
with open(csv_file, 'rb') as f:
file_data = f.read()
msg.add_attachment(file_data, maintype='text', subtype='csv', filename=csv_file)
with smtplib.SMTP('<smtp_server_address>') as smtp:
smtp.login('<sender_email_address>', '<sender_email_password>')
smtp.send_message(msg)
msg
is anEmailMessage
object that we create by calling the class constructor.msg['Subject']
,msg['From']
, andmsg['To']
are attributes of theEmailMessage
object that we set to specify the subject line, sender email address, and recipient email address, respectively.msg.set_content
is a method that sets the body of the email message to a plain text string.open
is a built-in Python function that opens a file for reading or writing.'rb'
is a string that specifies the mode in which to open the file ('rb'
for read mode in binary).file_data
is a binary string that contains the contents of the CSV file.msg.add_attachment
is a method that adds an attachment to the email message. We pass in the binary data for the CSV file, the MIME type ('text'
), the MIME subtype ('csv'
), and the filename for the attachment.smtplib.SMTP
is a class that represents an SMTP server connection.smtp.login
is a method that authenticates the connection using the sender email address and password.smtp.send_message
is a method that sends the email message to the recipient email address. We pass in theEmailMessage
object that we created earlier.
Step 3: Test the ETL Script before we schedule the script to run automatically, let’s test it manually to make sure it works as expected.
Open a terminal window and navigate to the directory where you saved the etl.py
script. Then, run the following comman
python etl.py
This should create a file called employees.csv
in the same directory as the script. Open the file in a text editor or spreadsheet program to verify that the data was written correctly.
If working on two different scripts, one for the generation of the CSV and the second one for emailing, we will have to work on the email script as an additional step
Step 4: Write the Email Script The next step is to write the script that will send the CSV file as an attachment via email. We’ll call this script email.py
. Here's the complete code:
import smtplib
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
# Create a message object
msg = MIMEMultipart()
msg['Subject'] = 'Oracle table export'
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg.attach(MIMEText('Please find attached the CSV file exported from Oracle'))
# Open the CSV file and attach it to the message
with open('employees.csv', 'rb') as f:
csv_data = f.read()
csv_name = os.path.basename(f.name)
csv_part = MIMEApplication(csv_data, Name=csv_name)
csv_part['Content-Disposition'] = f'attachment; filename="{csv_name}"'
msg.attach(csv_part)
# Connect to the SMTP server and send the message
smtp_server = 'smtp.example.com'
smtp_username = 'sender@example.com'
smtp_password = 'password'
with smtplib.SMTP(smtp_server, 587) as server:
server.starttls()
server.login(smtp_username, smtp_password)
server.sendmail(msg['From'], msg['To'], msg.as_string())
This code creates a MIMEMultipart
message object and sets the Subject
, From
, and To
headers. It then attaches a MIMEText
part that contains a message body and opens the CSV file, reads its contents, and attaches it to the message as a MIMEApplication
part with a filename that matches the original file name.
The code then connects to an SMTP server using the smtplib
library, logs in using a username and password, and sends the message using the sendmail
method of the SMTP server object.
So the complete Python ETL script for querying an Oracle table and converting the output to CSV, then sending the CSV via email using cron would look like this:
import cx_Oracle
import csv
import os
import smtplib
from email.message import EmailMessage
# Connect to Oracle database
dsn_tns = cx_Oracle.makedsn('<hostname>', '<port>', service_name='<service_name>')
conn = cx_Oracle.connect(user='<username>', password='<password>', dsn=dsn_tns)
# Execute SQL query and fetch results
cur = conn.cursor()
cur.execute("SELECT * FROM employees")
rows = cur.fetchall()
# Write results to CSV file
csv_file = 'employees.csv'
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([col[0] for col in cur.description]) # Write column headers
This code sends an email with an attached CSV file exported from Oracle. It uses the smtplib and email.message modules to create an email message and add the CSV file as an attachment.
The email message contains a subject, a sender email address, a recipient email address, and a plain text message. The CSV file is added as an attachment with the add_attachment() method.
The SMTP server details and the email account username and password are provided to connect to the SMTP server and send the email using the SMTP protocol. The SMTP connection is established using the smtplib.SMTP() method and the login credentials are authenticated using the smtp.login() method.
The starttls() method is used to encrypt the SMTP connection for secure communication. The smtp.send_message() method is used to send the email message with the attached CSV file as an attachment.
Again, it’s possible to generate the CSV file and then send it as an attachment in an email using a single Python script.
Here’s an example code that does this:
this:
import smtplib
import os
from email.message import EmailMessage
import cx_Oracle
import csv
# Set up database connection details
dsn = cx_Oracle.makedsn(host='localhost', port=1521, service_name='ORCL')
conn = cx_Oracle.connect(user='yourusername', password='yourpassword', dsn=dsn)
cur = conn.cursor()
# Query the database and write results to CSV file
cur.execute('SELECT * FROM employees')
with open('employees.csv', 'w', newline='') as f:
csv_writer = csv.writer(f)
csv_writer.writerow([i[0] for i in cur.description]) # Write column headers
csv_writer.writerows(cur)
# Close database connection
cur.close()
conn.close()
# Create an email message
msg = EmailMessage()
msg['Subject'] = 'Oracle table export'
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg.set_content('Please find attached the CSV file exported from Oracle')
# Add the CSV file as an attachment
with open('employees.csv', 'rb') as f:
file_data = f.read()
file_name = os.path.basename(f.name)
msg.add_attachment(file_data, maintype='text', subtype='csv', filename=file_name)
# Connect to the SMTP server and send the email
smtp_server = 'smtp.example.com'
smtp_username = 'sender@example.com'
smtp_password = 'password'
with smtplib.SMTP(smtp_server, 587) as smtp:
smtp.starttls()
smtp.login(smtp_username, smtp_password)
smtp.send_message(msg)
In this example code, we first set up the database connection details, query the database, and write the results to a CSV file named “employees.csv”. Then, we create an email message, add the CSV file as an attachment, and send the email using an SMTP server.
Note that you will need to replace the database connection details, email addresses, and SMTP server details with your own values.