@@ -27,23 +27,48 @@ def extract(mongodb_secret: str, collection_name: str):
2727 collection = db [collection_name ] # db.getCollection("products")
2828
2929 with open (f"/tmp/{ collection_name } .jsonl" , "w" ) as fh :
30- fh .writelines ([dumps (doc ) + "\n " for doc in collection .find ()])
30+ if collection_name == 'operations' :
31+ # Filter operations from last 6 months
32+ six_months_ago = datetime .today () - timedelta (days = 180 )
33+ cursor = collection .find ({"createdAt" : {"$gte" : six_months_ago }})
34+ else :
35+ cursor = collection .find ()
36+
37+ fh .writelines ([dumps (doc ) + "\n " for doc in cursor ])
3138
3239
3340def load (postgre_secret : str , table_name : str ):
3441 """Load data from local JSONLine file to PostgresSQL."""
42+
3543 engine = create_engine (
36- postgre_secret
44+ postgre_secret ,
45+ connect_args = {
46+ "sslmode" : "require" ,
47+ "gssencmode" : "disable" ,
48+ "connect_timeout" : 30 ,
49+ "application_name" : "etl_production"
50+ }
3751 ) # Create engine to connect to Postgre database
3852 Class = orm_classes [table_name ]
3953 connection = engine .connect () # Connect to the database
40- sql_query = sqlalchemy .text (
41- f"drop table if exists { table_name } cascade;"
42- ) # Transaction to drop the table and its dependent objects.
43- connection .execute (sql_query ) # Execute transaction
44- connection .commit () # Commit the transaction
54+
55+ # Set the statement_timeout (in milliseconds) using an SQL query
56+ statement_timeout = 300000 # 5 minutes in milliseconds
57+ query_statement_timeout = sqlalchemy .text (
58+ f"SET statement_timeout = { statement_timeout } ;"
59+ )
60+ connection .execute (query_statement_timeout ) # Set the statement_timeout
61+
62+ # Create the tables if they don't exist
4563 SQLModel .metadata .create_all (engine )
4664
65+ # Truncate the table and its dependent objects (to remove all data but keep structure)
66+ truncate_query = sqlalchemy .text (
67+ f"TRUNCATE TABLE { table_name } CASCADE;"
68+ )
69+ connection .execute (truncate_query )
70+ connection .commit ()
71+
4772 with open (f"/tmp/{ table_name } .jsonl" , "r" ) as fh :
4873 objects = [Class (id = i , ** json .loads (p )) for i , p in enumerate (fh .readlines ())]
4974
0 commit comments