Data engineering projects

8 CSV files stored in an Amazon S3 Bucket ingested into a PostgreSQL

You are working with Meta as a data engineer. You receive eight CSV files which are stored in an amazon S3 bucket, daily dumped by an automated process by another service.

i. The eight files are “Profile_Information.CSV”: This CSV file contains basic information about your Facebook account, such as your name, email address, phone number, gender, and date of birth.

ii. “Timeline_Posts.CSV”: This file contains all the posts you have made on your Facebook timeline, including status updates, photos, videos, and shared content.

iii. “Messages.CSV”: This file contains a record of all the messages you have sent and received on Facebook Messenger.

iv. “Friends.CSV”: This file contains a list of all the friends you have on Facebook, including their names, profile pictures, and other basic information.

v. “Advertisers.CSV”: This file contains a list of all the advertisers who have shown you ads on Facebook, as well as the ads you have clicked on or interacted with.

vi. “Likes_and_Reactions.CSV”: This file contains a list of all the pages and posts you have liked or reacted to on Facebook.

vii. “Events.CSV”: This file contains a list of all the events you have created or RSVP’d to on Facebook.

viii. “Pages.CSV”: This file contains a list of all the pages you have created, managed, or liked on Facebook.

These 8 CSV files are dumped everyday into an Amazon S3 bucket daily by the said automation. So that means, each day, you receive 8 files with different data sets of information from the service.

Task:-

We need to create a Python ETL code which will log in into the Amazon S3 bucket, read the CSV files and ingest the data into a PostgreSQL database. At first, remember to create the 8 tables in the PostgreSQL database.

Each day, log into the database, read the CSVs and ingest the data from the eight files into the respective database tables.

Remember, files are received daily, and the ETL should do these things automatically. This ETL should be scheduled using cron and the job should run at exactly 9PM East Africa Time. The Python ETL code and the cron job should both be provided.

Project Description

The project here is to create an ETL pipeline that ingests data from eight CSV files stored in an Amazon S3 bucket daily and loads it into a PostgreSQL database. The data sets contain different types of information about a Facebook account, including profile information, timeline posts, messages, friends, advertisers, likes and reactions, events, and pages. Each file contains a record of the relevant data, and we want to store this data in separate tables in the PostgreSQL database.

The ETL pipeline is designed to read the CSV files from the S3 bucket, check if there is any new data to be ingested since the last run, and load the new data into the respective tables in the PostgreSQL database. The pipeline should run daily at 9 PM East Africa Time, and the files should be deleted from the S3 bucket after they have been ingested into the database to avoid duplication.

Step-by-Step Job Task

Here are the step-by-step tasks that the Python ETL code will perform:

  1. Set up the AWS S3 client and the PostgreSQL connection.

  2. Define the file names and table names for each CSV file.

  3. Iterate through each file and ingest the data into the respective table.

  4. Check for existing data and insert only the new rows since the last run.

  5. Commit the changes to the PostgreSQL database.

  6. Delete the file from the S3 bucket after ingesting the data into the database to avoid duplication.

  7. Close the PostgreSQL connection.

The ETL pipeline will perform these steps automatically daily at 9 PM East Africa Time. The result will be a clean and up-to-date PostgreSQL database with the relevant data from the Facebook account.

import boto3
import pandas as pd
import psycopg2
from psycopg2 import sql

# Set up the AWS S3 client
s3 = boto3.client('s3')
# Set up the PostgreSQL connection
conn = psycopg2.connect(
    host="your_host_name",
    database="your_database_name",
    user="your_user_name",
    password="your_password"
)
# Set up the file names and table names
file_names = ["Profile_Information.csv", "Timeline_Posts.csv", "Messages.csv", "Friends.csv", "Advertisers.csv", "Likes_and_Reactions.csv", "Events.csv", "Pages.csv"]
table_names = [f.split('.')[0].lower() for f in file_names]
# Iterate through each file and ingest the data into the respective table
for i in range(len(file_names)):
    # Read the CSV file from S3
    obj = s3.get_object(Bucket='your_s3_bucket_name', Key=file_names[i])
    df = pd.read_csv(obj['Body'])
    # Ingest the data into the PostgreSQL table
    with conn.cursor() as cur:
        # Create the table if it does not exist
        cur.execute(
            sql.SQL("CREATE TABLE IF NOT EXISTS {} ({});").format(
                sql.Identifier(table_names[i]),
                sql.SQL(',').join(
                    [sql.Identifier(col.lower()) + sql.SQL(' TEXT') for col in df.columns]
                )
            )
        )
        # Check for existing data and insert only the new rows
        cur.execute(sql.SQL("SELECT * FROM {} ORDER BY created_at DESC LIMIT 1").format(sql.Identifier(table_names[i])))
        latest_row = cur.fetchone()
        if latest_row:
            latest_row_df = pd.DataFrame([latest_row], columns=df.columns)
            latest_index = df[df.apply(lambda x: (x == latest_row_df).all(1).any(), axis=1)].index
            df = df.iloc[latest_index+1:]
        # Insert the data into the table
        for row in df.itertuples(index=False):
            cur.execute(
                sql.SQL("INSERT INTO {} VALUES ({})").format(
                    sql.Identifier(table_names[i]),
                    sql.SQL(',').join(sql.Placeholder() * len(row))
                ), row
            )
        # Commit the changes
        conn.commit()
    # Delete the file from S3
    s3.delete_object(Bucket='your_s3_bucket_name', Key=file_names[i])
# Close the PostgreSQL connection
conn.close()

Note that, this piece of code includes a new block of code at the end of the loop that deletes the file from the S3 bucket after it has been ingested into the PostgreSQL table. This way, the script can be run daily to incrementally ingest the new data into the database tables and avoid duplicating the existing data, as well as avoid ingesting the same file multiple times.