π‘ The challenge is to replace DMS as the tool for copying data from RDS to the Data Lake.
Prerequisites
-
Glue with proper network routing and permissions to access RDS
-
AWS endpoint for Glue to access Secret Manager
-
Role with policies allowing access to RDS
-
Adjustments to the RDS Security Group
-
Connection configured in Glue
- Must be in the same VPC and Subnet as RDS
- Credentials stored in Secret Manager
-
-
RDS access credentials
- User
- Password
- Host
- etc
Developing the Pyspark connection to RDS
-
Downloaded the JDBC driver following AWS documentation for connecting to RDS with Postgres
- In the end, it’s the regular PostgreSQL JDBC driver
- Uploaded the driver to an S3 bucket and added the path to the Glue Job configuration (basically the spark-submit of the job), using the argument below inside
default_arguments
:
1"--extra-jars" = "s3://<libs-bucket>/lib/postgresql-42.7.5.jar"
-
Creating the Spark code to connect to RDS
-
There are two ways to do this:
- Via JDBC and direct Spark Session
- Via Glue Connection and Glue Context within Spark
-
Using direct Spark Session is the better approach since it allows you to run SQL queries on the source, avoiding loading the entire table into Spark β which isnβt possible when using Glue Context.
1class RDSPostgresIngestor(BaseTool):
2 def __init__(self, spark_session, glue_context, tool_settings):
3 super().__init__(spark_session, glue_context, tool_settings)
4 self.rds_user = "rds_user"
5 self.rds_pwd = "rds_password"
6 self.rds_host = "host.rds.amazonaws.com"
7 self.rds_db = "db"
8 self.jdbc_url = f"jdbc:postgresql://{self.rds_host}:5432/{self.rds_db}"
9 self.rds_properties = {
10 "user": self.rds_user,
11 "password": self.rds_pwd,
12 "driver": "org.postgresql.Driver"
13 }
14 self.rds_connection_name = "rds_connection_v2"
15 self.options = tool_settings.get("options", {})
16
17 def validate_settings(self):
18 if not self.options:
19 raise ValueError("'tool_settings' must be provided.")
20
21 def execute(self) -> DataFrame:
22 table = self.options.get("table", "test_table")
23 # try:
24 # datasource = self.glue_context.create_dynamic_frame.from_catalog(
25 # database = self.rds_db,
26 # table_name = table,
27 # connection_options = {
28 # "connectionName": self.rds_connection_name
29 # }
30 # )
31 # except Exception as e:
32 # raise ConnectionError(f"Unable to connect to RDS {e}")
33 # return datasource.toDF()
34 return self.spark_session.read.format("jdbc").option("url", self.jdbc_url). \
35 option("driver", "org.postgresql.Driver").option("dbtable", table).option("user", self.rds_user). \
36 option("password", self.rds_pwd).load()
37 return self.spark_session.read.jdbc(url=self.jdbc_url, table=table, properties=self.rds_properties)
Comments