This topic describes common scenarios where you can optimize SQL statements to achieve better performance and provides optimization examples.
Optimize the concurrency
The degree of parallelism is a measure of parallel computing. For example, in an execution plan, if a task with the ID M1 uses 1,000 instances, its degree of parallelism is 1000
. You can improve task efficiency by setting and adjusting the degree of parallelism.
This section describes the scenarios where the concurrency can be optimized.
Force an instance to execute
In some operations, the system forcefully calls only one instance to execute tasks. The following operations are used as examples.
You perform an aggregation without specifying a
group by
clause, or you use a constant in thegroup by
clause.You specify a constant for
partition by
in theover
clause of a window function.You specify a constant for
distribute by
orcluster by
in a SQL statement.
Solution: Check whether the operations based on a constant are necessary. We recommend that you cancel these operations to prevent the system from forcefully calling only one instance to execute tasks.
An excessively large or small number of instances are called
The execution performance does not always become better as the concurrency increases. If you call an excessively large number of instances for a job, the execution speed may decrease due to the following reasons:
An excessively large number of instances lead to a longer period of time to wait for resources and a larger number of waits in queues.
It takes time to initialize each instance. The higher the concurrency, the longer the total time taken by the initialization, and the lower the percentage of valid execution time.
In the following scenarios, the system forcefully calls an excessively large number of instances.
The system must read data from many small-sized partitions. For example, if you execute an SQL statement to read data from 10,000 partitions, the system forcefully calls 10,000 instances.
Solution: Optimize your SQL statements to reduce the number of partitions from which you want to read data. For example, you can prune the partitions that do not need to be read or split a large job into multiple small jobs.
Reading only
256 MB
of data at a time is insufficient. This causes short instance execution times. If the total input data is large, this results in an excessively high degree of parallelism, which causes instances to spend most of their time queuing for resources.Solution: Run the following commands to decrease the maximum number of instances that can be concurrently called for a reduce task. In this case, the amount of data that is processed on each instance increases.
SET odps.stage.mapper.split.size=<256>; SET odps.stage.reducer.num=<Maximum number of concurrent instances>;
Configure the number of instances
Tasks that involve table reading
Method 1: Adjust the concurrency by configuring parameters.
-- Configure the maximum amount of input data of a mapper. Unit: MB. -- Default value: 256. Valid values: [1,Integer.MAX_VALUE]. SET odps.sql.mapper.split.size=<value>;
Method 2: Use a split size hint provided by MaxCompute to adjust the concurrency of read operations on a single table.
-- Set the split size to 1 MB. This setting indicates that a task is split into subtasks based on a size of 1 MB when data in the src table is read. SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;
Method 3: Split table data based on the data amount or number of rows or by specifying the concurrency.
In Method 1, the
odps.sql.mapper.split.size
parameter supports only a global setting for the Mapper stage, with a minimum value of 1 MB. If needed, you can adjust the degree of parallelism based on table dimensions. This is especially useful when the amount of data per row is small but the subsequent computation is heavy. You can reduce the number of rows processed in parallel to increase the degree of parallelism for the task.You can run one of the following commands to adjust the concurrency.
Configure the size of a single shard for concurrent processing in tables.
SET odps.sql.split.size = {"table1": 1024, "table2": 512};
Configure the number of rows for concurrent processing in tables.
SET odps.sql.split.row.count = {"table1": 100, "table2": 500};
Configure the concurrency for tables.
SET odps.sql.split.dop = {"table1": 1, "table2": 5};
NoteThe
odps.sql.split.row.count
andodps.sql.split.dop
parameters can be used only for internal tables, non-transactional tables, and non-clustered tables.Tasks that do not involve table reading
You can adjust the concurrency using one of the following methods:
Method 1: Adjust the
odps.stage.reducer.num
value. Use the following command to set the degree of parallelism for the Reducer. This setting affects all related tasks.-- Configure the number of instances that are called to execute reducer tasks. -- Valid values: [1,99999]. SET odps.stage.reducer.num=<value>;
Method 2: Adjust the
odps.stage.joiner.num
value. Use the following command to set the degree of parallelism for the Joiner. This setting affects all related tasks.-- Configure the number of instances that are called to execute joiner tasks. -- Valid values: [1,99999]. SET odps.stage.joiner.num=<value>;
Method 3: Adjust the
odps.sql.mapper.split.size
value.For tasks that do not involve table reading, their concurrency is affected by the concurrency of tasks that involve table reading. You can adjust the concurrency of tasks that do not involve table reading by adjusting the concurrency of tasks that involve table reading.
Optimize window functions
If window functions are used in SQL statements, a reduce task is assigned to each window function. Many window functions consume a large amount of resources. You can optimize the window functions that meet both of the following conditions:
The OVER clause which defines how to partition and sort rows in a table must be the same.
Multiple window functions must be executed at the same level of nesting in an SQL statement.
The window functions that meet the preceding conditions are merged to be executed by one reduce task. The following SQL statement provides an example:
SELECT RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK, ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num FROM MyTable;
Optimize subqueries
The following statement contains a subquery:
SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);
If the subquery on the table_b table returns more than 9,999 values for the col1 column, the system reports the following error: records returned from subquery exceeded limit of 9999
. In this case, use a JOIN statement instead, as shown in the following example.
SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
If the DISTINCT keyword is not used, the subquery result table c may contain duplicate values in the col1 column. In this case, the query on table a returns more results.
If the DISTINCT keyword is used, only one worker is assigned to perform the subquery. If the subquery involves a large amount of data, the whole query slows down.
If you are sure that the values that meet the subquery conditions in the col1 column are unique, you can delete the DISTINCT keyword to improve the query performance.
Optimize joins
When you join two tables, we recommend that you use the WHERE clause based on the following rules:
Specify the partition limits of the primary table in the WHERE clause. We recommend that you define a subquery for the primary table to obtain the required data first.
Write the WHERE clause of the primary table at the end of the statement.
Specify the partition limits of the secondary table in the ON clause or a subquery, instead of in the WHERE clause.
The following code provides an example:
SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301; SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate. SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;
Optimize aggregate functions
To optimize aggregate functions, you can replace the collect_list
function with the wm_concat
function. The following example shows how to do this.
-- Implement the collect_list function. SELECT concat_ws(',', sort_array(collect_list(key))) FROM src; -- Implement the wm_concat function for better performance. SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src; -- Implement the collect_list function. SELECT array_join(collect_list(key), ',') FROM src; -- Implement the wm_concat function for better performance. SELECT wm_concat(',', key) FROM src;