πŸ’‘ The challenge is to replace DMS as the tool for copying data from RDS to the Data Lake.

Prerequisites

  • Glue with proper network routing and permissions to access RDS

    • AWS endpoint for Glue to access Secret Manager

    • Role with policies allowing access to RDS

    • Adjustments to the RDS Security Group

    • Connection configured in Glue

      • Must be in the same VPC and Subnet as RDS
      • Credentials stored in Secret Manager
  • RDS access credentials

    • User
    • Password
    • Host
    • etc

Developing the Pyspark connection to RDS

  1. Downloaded the JDBC driver following AWS documentation for connecting to RDS with Postgres

    • In the end, it’s the regular PostgreSQL JDBC driver
    • Uploaded the driver to an S3 bucket and added the path to the Glue Job configuration (basically the spark-submit of the job), using the argument below inside default_arguments:
    1"--extra-jars" = "s3://<libs-bucket>/lib/postgresql-42.7.5.jar"
    
  2. Creating the Spark code to connect to RDS

  • There are two ways to do this:

    • Via JDBC and direct Spark Session
    • Via Glue Connection and Glue Context within Spark
  • Using direct Spark Session is the better approach since it allows you to run SQL queries on the source, avoiding loading the entire table into Spark β€” which isn’t possible when using Glue Context.

 1class RDSPostgresIngestor(BaseTool):
 2    def __init__(self, spark_session, glue_context, tool_settings):
 3        super().__init__(spark_session, glue_context, tool_settings)
 4        self.rds_user = "rds_user"
 5        self.rds_pwd = "rds_password"
 6        self.rds_host = "host.rds.amazonaws.com"
 7        self.rds_db = "db"
 8        self.jdbc_url = f"jdbc:postgresql://{self.rds_host}:5432/{self.rds_db}"
 9        self.rds_properties = {
10            "user": self.rds_user,
11            "password": self.rds_pwd,
12            "driver": "org.postgresql.Driver"
13        }
14        self.rds_connection_name = "rds_connection_v2"
15        self.options = tool_settings.get("options", {})
16
17    def validate_settings(self):
18        if not self.options:
19            raise ValueError("'tool_settings' must be provided.")
20    
21    def execute(self) -> DataFrame:
22        table = self.options.get("table", "test_table")
23        # try:
24        #     datasource = self.glue_context.create_dynamic_frame.from_catalog(
25        #         database = self.rds_db,  
26        #         table_name = table,  
27        #         connection_options = {
28        #             "connectionName": self.rds_connection_name
29        #         }
30        #     )
31        # except Exception as e:
32        #     raise ConnectionError(f"Unable to connect to RDS {e}")
33        # return datasource.toDF()
34        return self.spark_session.read.format("jdbc").option("url", self.jdbc_url). \
35            option("driver", "org.postgresql.Driver").option("dbtable", table).option("user", self.rds_user). \
36            option("password", self.rds_pwd).load()
37        return self.spark_session.read.jdbc(url=self.jdbc_url, table=table, properties=self.rds_properties)