|  | 
| 9 | 9 | """ | 
| 10 | 10 | 
 | 
| 11 | 11 | import pytest | 
| 12 |  | -from datetime import datetime, date, time | 
|  | 12 | +from datetime import datetime, date, time, timedelta, timezone | 
| 13 | 13 | import time as time_module | 
| 14 | 14 | import decimal | 
| 15 | 15 | from contextlib import closing | 
| @@ -6472,7 +6472,7 @@ def test_only_null_and_empty_binary(cursor, db_connection): | 
| 6472 | 6472 |  finally: | 
| 6473 | 6473 |  drop_table_if_exists(cursor, "#pytest_null_empty_binary") | 
| 6474 | 6474 |  db_connection.commit() | 
| 6475 |  | -  | 
|  | 6475 | + | 
| 6476 | 6476 | # ---------------------- VARCHAR(MAX) ---------------------- | 
| 6477 | 6477 | 
 | 
| 6478 | 6478 | def test_varcharmax_short_fetch(cursor, db_connection): | 
| @@ -7560,6 +7560,169 @@ def test_decimal_separator_calculations(cursor, db_connection): | 
| 7560 | 7560 |  cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") | 
| 7561 | 7561 |  db_connection.commit() | 
| 7562 | 7562 | 
 | 
|  | 7563 | +def test_datetimeoffset_read_write(cursor, db_connection): | 
|  | 7564 | + """Test reading and writing timezone-aware DATETIMEOFFSET values.""" | 
|  | 7565 | + try: | 
|  | 7566 | + test_cases = [ | 
|  | 7567 | + # Valid timezone-aware datetimes | 
|  | 7568 | + datetime(2023, 10, 26, 10, 30, 0, tzinfo=timezone(timedelta(hours=5, minutes=30))), | 
|  | 7569 | + datetime(2023, 10, 27, 15, 45, 10, 123456, tzinfo=timezone(timedelta(hours=-8))), | 
|  | 7570 | + datetime(2023, 10, 28, 20, 0, 5, 987654, tzinfo=timezone.utc) | 
|  | 7571 | + ] | 
|  | 7572 | + | 
|  | 7573 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7574 | + db_connection.commit() | 
|  | 7575 | + | 
|  | 7576 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" | 
|  | 7577 | + for i, dt in enumerate(test_cases): | 
|  | 7578 | + cursor.execute(insert_stmt, i, dt) | 
|  | 7579 | + db_connection.commit() | 
|  | 7580 | + | 
|  | 7581 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") | 
|  | 7582 | + for i, dt in enumerate(test_cases): | 
|  | 7583 | + row = cursor.fetchone() | 
|  | 7584 | + assert row is not None | 
|  | 7585 | + fetched_id, fetched_dt = row | 
|  | 7586 | + assert fetched_dt.tzinfo is not None | 
|  | 7587 | + expected_utc = dt.astimezone(timezone.utc) | 
|  | 7588 | + fetched_utc = fetched_dt.astimezone(timezone.utc) | 
|  | 7589 | + # Ignore sub-microsecond differences | 
|  | 7590 | + expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000) | 
|  | 7591 | + fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000) | 
|  | 7592 | + assert fetched_utc == expected_utc | 
|  | 7593 | + finally: | 
|  | 7594 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") | 
|  | 7595 | + db_connection.commit() | 
|  | 7596 | + | 
|  | 7597 | +def test_datetimeoffset_max_min_offsets(cursor, db_connection): | 
|  | 7598 | + """ | 
|  | 7599 | + Test inserting and retrieving DATETIMEOFFSET with maximum and minimum allowed offsets (+14:00 and -14:00). | 
|  | 7600 | + Uses fetchone() for retrieval. | 
|  | 7601 | + """ | 
|  | 7602 | + try: | 
|  | 7603 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7604 | + db_connection.commit() | 
|  | 7605 | + | 
|  | 7606 | + test_cases = [ | 
|  | 7607 | + (1, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=14)))), # max offset | 
|  | 7608 | + (2, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=-14)))), # min offset | 
|  | 7609 | + ] | 
|  | 7610 | + | 
|  | 7611 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" | 
|  | 7612 | + for row_id, dt in test_cases: | 
|  | 7613 | + cursor.execute(insert_stmt, row_id, dt) | 
|  | 7614 | + db_connection.commit() | 
|  | 7615 | + | 
|  | 7616 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") | 
|  | 7617 | + | 
|  | 7618 | + for expected_id, expected_dt in test_cases: | 
|  | 7619 | + row = cursor.fetchone() | 
|  | 7620 | + assert row is not None, f"No row fetched for id {expected_id}." | 
|  | 7621 | + fetched_id, fetched_dt = row | 
|  | 7622 | + | 
|  | 7623 | + assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" | 
|  | 7624 | + assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}" | 
|  | 7625 | + | 
|  | 7626 | + # Compare in UTC to avoid offset differences | 
|  | 7627 | + expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) | 
|  | 7628 | + fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) | 
|  | 7629 | + assert fetched_utc == expected_utc, ( | 
|  | 7630 | + f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" | 
|  | 7631 | + ) | 
|  | 7632 | + | 
|  | 7633 | + finally: | 
|  | 7634 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") | 
|  | 7635 | + db_connection.commit() | 
|  | 7636 | + | 
|  | 7637 | +def test_datetimeoffset_invalid_offsets(cursor, db_connection): | 
|  | 7638 | + """Verify driver rejects offsets beyond ±14 hours.""" | 
|  | 7639 | + try: | 
|  | 7640 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_invalid_offsets (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7641 | + db_connection.commit() | 
|  | 7642 | +  | 
|  | 7643 | + with pytest.raises(Exception): | 
|  | 7644 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", | 
|  | 7645 | + 1, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=15)))) | 
|  | 7646 | +  | 
|  | 7647 | + with pytest.raises(Exception): | 
|  | 7648 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", | 
|  | 7649 | + 2, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=-15)))) | 
|  | 7650 | + finally: | 
|  | 7651 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_invalid_offsets;") | 
|  | 7652 | + db_connection.commit() | 
|  | 7653 | + | 
|  | 7654 | +def test_datetimeoffset_dst_transitions(cursor, db_connection): | 
|  | 7655 | + """ | 
|  | 7656 | + Test inserting and retrieving DATETIMEOFFSET values around DST transitions. | 
|  | 7657 | + Ensures that driver handles DST correctly and does not crash. | 
|  | 7658 | + """ | 
|  | 7659 | + try: | 
|  | 7660 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_dst_transitions (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7661 | + db_connection.commit() | 
|  | 7662 | + | 
|  | 7663 | + # Example DST transition dates (replace with actual region offset if needed) | 
|  | 7664 | + dst_test_cases = [ | 
|  | 7665 | + (1, datetime(2025, 3, 9, 1, 59, 59, tzinfo=timezone(timedelta(hours=-5)))), # Just before spring forward | 
|  | 7666 | + (2, datetime(2025, 3, 9, 3, 0, 0, tzinfo=timezone(timedelta(hours=-4)))), # Just after spring forward | 
|  | 7667 | + (3, datetime(2025, 11, 2, 1, 59, 59, tzinfo=timezone(timedelta(hours=-4)))), # Just before fall back | 
|  | 7668 | + (4, datetime(2025, 11, 2, 1, 0, 0, tzinfo=timezone(timedelta(hours=-5)))), # Just after fall back | 
|  | 7669 | + ] | 
|  | 7670 | + | 
|  | 7671 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_dst_transitions (id, dto_column) VALUES (?, ?);" | 
|  | 7672 | + for row_id, dt in dst_test_cases: | 
|  | 7673 | + cursor.execute(insert_stmt, row_id, dt) | 
|  | 7674 | + db_connection.commit() | 
|  | 7675 | + | 
|  | 7676 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_dst_transitions ORDER BY id;") | 
|  | 7677 | + | 
|  | 7678 | + for expected_id, expected_dt in dst_test_cases: | 
|  | 7679 | + row = cursor.fetchone() | 
|  | 7680 | + assert row is not None, f"No row fetched for id {expected_id}." | 
|  | 7681 | + fetched_id, fetched_dt = row | 
|  | 7682 | + | 
|  | 7683 | + assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" | 
|  | 7684 | + assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}" | 
|  | 7685 | + | 
|  | 7686 | + # Compare UTC time to avoid issues due to offsets changing in DST | 
|  | 7687 | + expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) | 
|  | 7688 | + fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) | 
|  | 7689 | + assert fetched_utc == expected_utc, ( | 
|  | 7690 | + f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" | 
|  | 7691 | + ) | 
|  | 7692 | + | 
|  | 7693 | + finally: | 
|  | 7694 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;") | 
|  | 7695 | + db_connection.commit() | 
|  | 7696 | + | 
|  | 7697 | +def test_datetimeoffset_leap_second(cursor, db_connection): | 
|  | 7698 | + """Ensure driver handles leap-second-like microsecond edge cases without crashing.""" | 
|  | 7699 | + try: | 
|  | 7700 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_leap_second (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7701 | + db_connection.commit() | 
|  | 7702 | +  | 
|  | 7703 | + leap_second_sim = datetime(2023, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) | 
|  | 7704 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_leap_second (id, dto_column) VALUES (?, ?);", 1, leap_second_sim) | 
|  | 7705 | + db_connection.commit() | 
|  | 7706 | + | 
|  | 7707 | + row = cursor.execute("SELECT dto_column FROM #pytest_datetimeoffset_leap_second;").fetchone() | 
|  | 7708 | + assert row[0].tzinfo is not None | 
|  | 7709 | + finally: | 
|  | 7710 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_leap_second;") | 
|  | 7711 | + db_connection.commit() | 
|  | 7712 | + | 
|  | 7713 | +def test_datetimeoffset_malformed_input(cursor, db_connection): | 
|  | 7714 | + """Verify driver raises error for invalid datetimeoffset strings.""" | 
|  | 7715 | + try: | 
|  | 7716 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_malformed_input (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") | 
|  | 7717 | + db_connection.commit() | 
|  | 7718 | +  | 
|  | 7719 | + with pytest.raises(Exception): | 
|  | 7720 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_malformed_input (id, dto_column) VALUES (?, ?);", | 
|  | 7721 | + 1, "2023-13-45 25:61:00 +99:99") # invalid string | 
|  | 7722 | + finally: | 
|  | 7723 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_malformed_input;") | 
|  | 7724 | + db_connection.commit() | 
|  | 7725 | + | 
| 7563 | 7726 | def test_lowercase_attribute(cursor, db_connection): | 
| 7564 | 7727 |  """Test that the lowercase attribute properly converts column names to lowercase""" | 
| 7565 | 7728 | 
 | 
|  | 
0 commit comments