Skip to content
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export(Id)
export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresExportLargeObject)
export(postgresHasDefault)
export(postgresImportLargeObject)
export(postgresIsTransacting)
Expand Down
48 changes: 47 additions & 1 deletion R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ postgresIsTransacting <- function(conn) {

#' Imports a large object from file
#'
#' Returns an object idenfier (Oid) for the imported large object
#' Returns an object identifier (Oid) for the imported large object.
#' This function must be called within a transaction.
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
Expand Down Expand Up @@ -191,3 +192,48 @@ postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) {

connection_import_lo_from_file(conn@ptr, filepath, oid)
}

#' Exports a large object to file
#'
#' Exports a large object from the database to a file on disk. This function
#' uses PostgreSQL's `lo_export()` function which efficiently streams the data
#' directly to disk without loading it into memory, making it suitable for
#' very large objects (GB+) that would cause memory issues with `lo_get()`.
#' This function must be called within a transaction.
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param oid the object identifier (Oid) of the large object to export
#' @param filepath a path where the large object should be exported
#' @return invisible NULL on success, or stops with an error
#' @examples
#' \dontrun{
#' con <- postgresDefault()
#' filepath <- 'your_image.png'
#' dbWithTransaction(con, {
#' oid <- postgresImportLargeObject(con, filepath)
#' })
#' # Later, export the large object back to a file
#' dbWithTransaction(con, {
#' postgresExportLargeObject(con, oid, 'exported_image.png')
#' })
#' }
postgresExportLargeObject <- function(conn, oid, filepath) {
if (!postgresIsTransacting(conn)) {
stopc("Cannot export a large object outside of a transaction")
}

if (is.null(oid)) {
stopc("'oid' cannot be NULL")
}
if (is.na(oid)) {
stopc("'oid' cannot be NA")
}
if (oid < 0) {
stopc("'oid' cannot be negative")
}

connection_export_lo_to_file(conn@ptr, oid, filepath)
invisible()
}
4 changes: 4 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ connection_import_lo_from_file <- function(con, filename, oid) {
.Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid)
}

connection_export_lo_to_file <- function(con, oid, filename) {
invisible(.Call(`_RPostgres_connection_export_lo_to_file`, con, oid, filename))
}

connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}
Expand Down
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ reference:
- postgresHasDefault
- postgresWaitForNotify
- postgresImportLargeObject
- postgresExportLargeObject

development:
mode: auto
Expand Down
39 changes: 39 additions & 0 deletions man/postgresExportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion man/postgresImportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ Oid DbConnection::import_lo_from_file(std::string filename, Oid p_oid) {
return(lo_oid);
}

void DbConnection::export_lo_to_file(Oid p_oid, std::string filename) {
int result = lo_export(pConn_, p_oid, filename.c_str());
if (result != 1) cpp11::stop(PQerrorMessage(pConn_));
}

void DbConnection::copy_data(std::string sql, cpp11::list df) {
LOG_DEBUG << sql;

Expand Down
1 change: 1 addition & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DbConnection : boost::noncopyable {
void copy_data(std::string sql, cpp11::list df);

Oid import_lo_from_file(std::string file_path, Oid p_oid);
void export_lo_to_file(Oid p_oid, std::string file_path);


void check_connection();
Expand Down
5 changes: 5 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid
return con->import_lo_from_file(filename, oid);
}

[[cpp11::register]]
void connection_export_lo_to_file(DbConnection* con, Oid oid, std::string filename) {
con->export_lo_to_file(oid, filename);
}

[[cpp11::register]]
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) {
return con->copy_data(sql, df);
Expand Down
9 changes: 9 additions & 0 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filenam
END_CPP11
}
// connection.cpp
void connection_export_lo_to_file(DbConnection* con, Oid oid, std::string filename);
extern "C" SEXP _RPostgres_connection_export_lo_to_file(SEXP con, SEXP oid, SEXP filename) {
BEGIN_CPP11
connection_export_lo_to_file(cpp11::as_cpp<cpp11::decay_t<DbConnection*>>(con), cpp11::as_cpp<cpp11::decay_t<Oid>>(oid), cpp11::as_cpp<cpp11::decay_t<std::string>>(filename));
return R_NilValue;
END_CPP11
}
// connection.cpp
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df);
extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) {
BEGIN_CPP11
Expand Down Expand Up @@ -207,6 +215,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_export_lo_to_file", (DL_FUNC) &_RPostgres_connection_export_lo_to_file, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
Expand Down
101 changes: 96 additions & 5 deletions tests/testthat/test-ImportLargeObject.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
test_that("can import and read a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(), '/data/large_object.txt')
test_file_path <- file.path(test_path(), "data", "large_object.txt")
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path)
})
Expand All @@ -11,15 +11,45 @@ test_that("can import and read a large object", {
"select lo_get($1) as lo_data",
params = list(oid)
)$lo_data[1])
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres'
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string "postgres"
expect_equal(lo_data, large_object_txt)
})


test_that("can import and export a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- file.path(test_path(), "data", "large_object.txt")

# Import the large object
oid <- dbWithTransaction(con, {
postgresImportLargeObject(con, test_file_path)
})
expect_gt(oid, 0)

# Export to a temporary file
temp_file <- tempfile(fileext = ".txt")
on.exit(unlink(temp_file), add = TRUE)

dbWithTransaction(con, {
postgresExportLargeObject(con, oid, temp_file)
})

# Verify the exported file exists and has correct content
expect_true(file.exists(temp_file))
exported_content <- readBin(temp_file, "raw", file.size(temp_file))
original_content <- readBin(test_file_path, "raw", file.size(test_file_path))
expect_equal(exported_content, original_content)

# Clean up large object
dbExecute(con, "SELECT lo_unlink($1)", params = list(oid))
})


test_that("importing to an existing oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(), '/data/large_object.txt')
test_file_path <- file.path(test_path(), "data", "large_object.txt")
oid <- 1234
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path, oid)
Expand All @@ -37,13 +67,74 @@ test_that("importing to an existing oid throws error", {
test_that("import from a non-existing path throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(
test_file_path <- file.path(
test_path(),
'/data/large_object_that_does_not_exist.txt'
"data",
"large_object_that_does_not_exist.txt"
)
expect_error(
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path)
})
)
})


test_that("export outside transaction throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
postgresExportLargeObject(con, 12345, tempfile()),
"Cannot export a large object outside of a transaction"
)
})


test_that("export with NULL oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, NULL, tempfile())
}),
"'oid' cannot be NULL"
)
})


test_that("export with NA oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, NA, tempfile())
}),
"'oid' cannot be NA"
)
})


test_that("export with negative oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, -1, tempfile())
}),
"'oid' cannot be negative"
)
})


test_that("export of non-existent oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
temp_file <- tempfile()
on.exit(unlink(temp_file), add = TRUE)

expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, 999999, temp_file)
})
)
})