@@ -29,8 +29,11 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2929use datafusion:: physical_expr:: { EquivalenceProperties , Partitioning } ;
3030use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
3131use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
32- use datafusion:: physical_plan:: { DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties } ;
32+ use datafusion:: physical_plan:: {
33+ DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties , execute_input_stream,
34+ } ;
3335use futures:: StreamExt ;
36+ use iceberg:: arrow:: schema_to_arrow_schema;
3437use iceberg:: spec:: { DataFile , DataFileFormat , DataFileSerde , FormatVersion } ;
3538use iceberg:: table:: Table ;
3639use iceberg:: writer:: CurrentFileStatus ;
@@ -39,6 +42,7 @@ use iceberg::writer::file_writer::location_generator::{
3942} ;
4043use iceberg:: writer:: file_writer:: { FileWriter , FileWriterBuilder , ParquetWriterBuilder } ;
4144use parquet:: file:: properties:: WriterProperties ;
45+ use uuid:: Uuid ;
4246
4347use crate :: to_datafusion_error;
4448
@@ -159,14 +163,24 @@ impl ExecutionPlan for IcebergWriteExec {
159163 self . table . file_io ( ) . clone ( ) ,
160164 DefaultLocationGenerator :: new ( self . table . metadata ( ) . clone ( ) )
161165 . map_err ( to_datafusion_error) ?,
162- // todo actual filename
163- DefaultFileNameGenerator :: new ( "what" . to_string ( ) , None , DataFileFormat :: Parquet ) ,
166+ // todo filename prefix/suffix should be configurable
167+ DefaultFileNameGenerator :: new (
168+ "datafusion" . to_string ( ) ,
169+ Some ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
170+ DataFileFormat :: Parquet ,
171+ ) ,
164172 )
165173 . build ( ) ;
166174
167- // todo repartition
168- let data = self . input . execute ( partition, context) ?;
169- let result_schema = Arc :: clone ( & self . result_schema ) ;
175+ let data = execute_input_stream (
176+ Arc :: clone ( & self . input ) ,
177+ Arc :: new (
178+ schema_to_arrow_schema ( self . table . metadata ( ) . current_schema ( ) )
179+ . map_err ( to_datafusion_error) ?,
180+ ) ,
181+ partition,
182+ Arc :: clone ( & context) ,
183+ ) ?;
170184
171185 // todo non-default partition spec?
172186 let spec_id = self . table . metadata ( ) . default_partition_spec_id ( ) ;
@@ -175,7 +189,6 @@ impl ExecutionPlan for IcebergWriteExec {
175189
176190 let stream = futures:: stream:: once ( async move {
177191 let mut writer = parquet_writer_fut. await . map_err ( to_datafusion_error) ?;
178-
179192 let mut input_stream = data;
180193
181194 while let Some ( batch_res) = input_stream. next ( ) . await {
@@ -187,27 +200,45 @@ impl ExecutionPlan for IcebergWriteExec {
187200 let data_file_builders = writer. close ( ) . await . map_err ( to_datafusion_error) ?;
188201
189202 // Convert builders to data files
190- let data_files = data_file_builders
203+ let data_files: DFResult < Vec < DataFile > > = data_file_builders
191204 . into_iter ( )
192- . map ( |mut builder| builder. partition_spec_id ( spec_id) . build ( ) . unwrap ( ) )
193- . collect :: < Vec < DataFile > > ( ) ;
205+ . map ( |mut builder| {
206+ builder. partition_spec_id ( spec_id) . build ( ) . map_err ( |e| {
207+ DataFusionError :: Execution ( format ! ( "Failed to build data file: {}" , e) )
208+ } )
209+ } )
210+ . collect ( ) ;
211+ let data_files = data_files?;
194212
195- let data_files = data_files
213+ let data_files: DFResult < Vec < String > > = data_files
196214 . into_iter ( )
197215 . map ( |f| {
198- let serde = DataFileSerde :: try_from ( f, & partition_type, is_version_1) . unwrap ( ) ;
199- let json = serde_json:: to_string ( & serde) . unwrap ( ) ;
216+ // Convert to DataFileSerde
217+ let serde =
218+ DataFileSerde :: try_from ( f, & partition_type, is_version_1) . map_err ( |e| {
219+ DataFusionError :: Execution ( format ! (
220+ "Failed to convert to DataFileSerde: {}" ,
221+ e
222+ ) )
223+ } ) ?;
224+
225+ // Serialize to JSON
226+ let json = serde_json:: to_string ( & serde) . map_err ( |e| {
227+ DataFusionError :: Execution ( format ! ( "Failed to serialize to JSON: {}" , e) )
228+ } ) ?;
229+
200230 println ! ( "Serialized data file: {}" , json) ; // todo remove log
201- json
231+ Ok ( json)
202232 } )
203- . collect :: < Vec < String > > ( ) ;
233+ . collect ( ) ;
234+ let data_files = data_files?;
204235
205236 Ok ( Self :: make_result_batch ( count, data_files) ?)
206237 } )
207238 . boxed ( ) ;
208239
209240 Ok ( Box :: pin ( RecordBatchStreamAdapter :: new (
210- result_schema,
241+ Arc :: clone ( & self . result_schema ) ,
211242 stream,
212243 ) ) )
213244 }
0 commit comments