Skip to content

TurboWay/DorisClient

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

42 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DorisClient

python for apache-doris

Install

pip install DorisClient

Use

Create Test Table

CREATE TABLE `streamload_test` ( `id` int(11) NULL COMMENT "", `shop_code` varchar(64) NULL COMMENT "", `sale_amount` decimal(18, 2) NULL COMMENT "" ) ENGINE=OLAP UNIQUE KEY(`id`) COMMENT "test" DISTRIBUTED BY HASH(`id`) BUCKETS 3 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2" ); -- If you want to enable sequence streamload, make sure Doris table enable sequence load first -- ALTER TABLE streamload_test ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "bigint");

streamload

from DorisClient import DorisSession, DorisLogger, Logger # DorisLogger.setLevel('ERROR') # default:INFO doris_cfg = { 'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'], 'database': 'testdb', 'user': 'test', 'passwd': '123456', } doris = DorisSession(**doris_cfg) # append data = [ {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99'}, {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5'}, {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3'}, ] doris.streamload('streamload_test', data) # delete data = [ {'id': '1'}, ] doris.streamload('streamload_test', data, merge_type='DELETE') # merge data = [ {'id': '10', 'shop_code': 'sdd1', 'sale_amount': '99', 'delete_flag': 0}, {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5', 'delete_flag': 1}, {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3', 'delete_flag': 1}, ] doris.streamload('streamload_test', data, merge_type='MERGE', delete='delete_flag=1') # Sequence append data = [ {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 11, }, {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 2}, {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 1}, ] doris.streamload('streamload_test', data, sequence_col='source_sequence') # Sequence merge data = [ {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 100, 'delete_flag': 0}, {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 120, 'delete_flag': 0}, {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 100, 'delete_flag': 1}, ] doris.streamload('streamload_test', data, sequence_col='source_sequence', merge_type='MERGE', delete='delete_flag=1') # streamload default retry config: max_retry=3, retry_diff_seconds=3 # if you don't want to retry, "_streamload" can help you doris._streamload('streamload_test', data) # if you want to changed retry config, follow code will work  from DorisClient import DorisSession, Retry max_retry = 5 retry_diff_seconds = 10 class MyDoris(DorisSession): @Retry(max_retry=max_retry, retry_diff_seconds=retry_diff_seconds) def streamload(self, table, dict_array, **kwargs): return self._streamload(table, dict_array, **kwargs) doris = MyDoris(**doris_cfg) doris.streamload('streamload_test', data)

execute doris-sql

from DorisClient import DorisSession doris_cfg = { 'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'], 'database': 'testdb', 'user': 'test', 'passwd': '123456', } doris = DorisSession(**doris_cfg) sql = 'select * from streamload_test limit 1' # fetch all the rows by sql, return dict array rows = doris.read(sql) print(rows) # fetch all the rows by sql, return tuple array rows = doris.read(sql, cursors=None) print(rows) # execute sql commit doris.execute('truncate table streamload_test')

collect meta

from DorisClient import DorisMeta doris_cfg = { 'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'], 'database': 'testdb', 'user': 'test', 'passwd': '123456', } dm = DorisMeta(**doris_cfg) # auto create table for collect doris meta # 1. meta_table for saving all table meta # 2. meta_tablet for saving all tablet meta # 3. meta_partition for saving all partition meta # 4. meta_size for saving all table size meta # 5. meta_table_count for saving all table row count # 6. meta_materialized_view for saving all materialized view # 6. meta_backup for saving all backup view dm.create_tables() # collect table meta >> meta_table dm.collect_table() # collect partition meta >> meta_partition dm.collect_partition() # collect tablet meta >> meta_tablet  # deploy collect_partition dm.collect_tablet() # collect table size meta >> meta_size dm.collect_size() # collect table row count >> meta_table_count dm.collect_table_count() # collect materialized view meta >> meta_materialized_view dm.collect_materialized_view(only_insert=True) # collect backup meta >> meta_backup dm.collect_backup()

modify buckets

from DorisClient import DorisAdmin # # debug # import logging # logger = logging.getLogger() # logger.setLevel(logging.DEBUG)  doris_cfg = { 'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'], 'database': 'testdb', 'user': 'test', 'passwd': '123456', } da = DorisAdmin(**doris_cfg) # modify the number and method of buckets for the specified table da.modify(database_name='testdb', table_name='streamload_test', distribution_key='id,shop_code', buckets=1) # modify the number and method of buckets for partition da.modify(database_name='testdb', table_name='partition_tb', partition_name='p20231214', buckets=2) # only rebuild table and remove unsupport properties da.modify(database_name='testdb', table_name='streamload_test', only_rebuild=True, ignore_properties='in_memory') # only rebuild table and add properties da.modify(database_name='testdb', table_name='streamload_test', only_rebuild=True, add_properties='"enable_unique_key_merge_on_write" = "true"')

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages