@@ -310,9 +310,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
310310 std::unique_ptr<ColumnWriter> writer;
311311 RETURN_IF_ERROR (ColumnWriter::create (opts, &column, _file_writer, &writer));
312312 RETURN_IF_ERROR (writer->init ());
313- _column_writers.push_back (std::move (writer));
314-
315- _olap_data_convertor->add_column_data_convertor (column);
313+ _column_writers[cid] = std::move (writer);
314+ _olap_data_convertor->add_column_data_convertor_at (column, cid);
316315 return Status::OK ();
317316};
318317
@@ -322,8 +321,8 @@ Status VerticalSegmentWriter::init() {
322321 _opts.compression_type = _tablet_schema->compression_type ();
323322 }
324323 _olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
325- _olap_data_convertor->reserve (_tablet_schema->num_columns ());
326- _column_writers.reserve (_tablet_schema->columns (). size ());
324+ _olap_data_convertor->resize (_tablet_schema->num_columns ());
325+ _column_writers.resize (_tablet_schema->num_columns ());
327326 // we don't need the short key index for unique key merge on write table.
328327 if (_is_mow ()) {
329328 size_t seq_col_length = 0 ;
@@ -535,6 +534,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
535534 vectorized::IOlapColumnDataAccessor* seq_column = nullptr ;
536535 uint32_t segment_start_pos = 0 ;
537536 for (auto cid : including_cids) {
537+ RETURN_IF_ERROR (_create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
538538 RETURN_IF_ERROR (_olap_data_convertor->set_source_content_with_specifid_columns (
539539 &full_block, data.row_pos , data.num_rows , std::vector<uint32_t > {cid}));
540540 // here we get segment column row num before append data.
@@ -639,6 +639,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
639639 // convert missing columns and send to column writer
640640 const auto & missing_cids = _opts.rowset_ctx ->partial_update_info ->missing_cids ;
641641 for (auto cid : missing_cids) {
642+ RETURN_IF_ERROR (_create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
642643 RETURN_IF_ERROR (_olap_data_convertor->set_source_content_with_specifid_columns (
643644 &full_block, data.row_pos , data.num_rows , std::vector<uint32_t > {cid}));
644645 auto [status, column] = _olap_data_convertor->convert_column_data (cid);
@@ -697,7 +698,9 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
697698 // create full block and fill with sort key columns
698699 full_block = _tablet_schema->create_block ();
699700
700- uint32_t segment_start_pos = cast_set<uint32_t >(_column_writers.front ()->get_next_rowid ());
701+ // Use _num_rows_written instead of creating column writer 0, since all column writers
702+ // should have the same row count, which equals _num_rows_written.
703+ uint32_t segment_start_pos = cast_set<uint32_t >(_num_rows_written);
701704
702705 DCHECK (_tablet_schema->has_skip_bitmap_col ());
703706 auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx ();
@@ -714,6 +717,17 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
714717 const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs ;
715718 std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches (specified_rowsets.size ());
716719
720+ // Ensure all primary key column writers and sequence column writer are created before
721+ // aggregate_for_flexible_partial_update, because it internally calls convert_pk_columns
722+ // and convert_seq_column which need the convertors in _olap_data_convertor
723+ for (uint32_t cid = 0 ; cid < _tablet_schema->num_key_columns (); ++cid) {
724+ RETURN_IF_ERROR (_create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
725+ }
726+ if (schema_has_sequence_col) {
727+ uint32_t cid = _tablet_schema->sequence_col_idx ();
728+ RETURN_IF_ERROR (_create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
729+ }
730+
717731 // 1. aggregate duplicate rows in block
718732 RETURN_IF_ERROR (_block_aggregator.aggregate_for_flexible_partial_update (
719733 const_cast <vectorized::Block*>(data.block ), data.num_rows , specified_rowsets,
@@ -788,6 +802,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
788802
789803 // 8. encode and write all non-primary key columns(including sequence column if exists)
790804 for (auto cid = _tablet_schema->num_key_columns (); cid < _tablet_schema->num_columns (); cid++) {
805+ if (cid != _tablet_schema->sequence_col_idx ()) {
806+ RETURN_IF_ERROR (_create_column_writer (cast_set<uint32_t >(cid),
807+ _tablet_schema->column (cid), _tablet_schema));
808+ }
791809 RETURN_IF_ERROR (_olap_data_convertor->set_source_content_with_specifid_column (
792810 full_block.get_by_position (cid), data.row_pos , data.num_rows ,
793811 cast_set<uint32_t >(cid)));
@@ -953,10 +971,6 @@ Status VerticalSegmentWriter::write_batch() {
953971 !_opts.rowset_ctx ->is_transient_rowset_writer ) {
954972 bool is_flexible_partial_update =
955973 _opts.rowset_ctx ->partial_update_info ->is_flexible_partial_update ();
956- for (uint32_t cid = 0 ; cid < _tablet_schema->num_columns (); ++cid) {
957- RETURN_IF_ERROR (
958- _create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
959- }
960974 vectorized::Block full_block;
961975 for (auto & data : _batched_blocks) {
962976 if (is_flexible_partial_update) {
0 commit comments