Sponsored by PRISE Ltd.
www.prisetools.com
www.prisetools.com
How to "re-parallelize" skewed joins
Case description
Assume that we have 1M customers, 4M transactions and our top customer produce the 2.5% of all transactions.Others produce the remaining 97.5% of transactions approx. evenly.Scroll down to the bottom of the post for sample table and data generator SQL.
Our task is to join a "Customer" and a "Transaction" tables on Customer_id.
The join
SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
GROUP BY 1;
We experience a pretty slow execution.On the ViewPoint we see that only one AMP is working, while others are not.
What is the problem?
There are two separate subsets of the Transact table from "joinability" point of view:
- "Peak" part (records of top customer(s))
Very few customers have very much Transact records. Product join would be cost effective - "Even" part (records of other customers)
Much customers have much, but specifically evenly few Transact records. Merge join would be ideal.
Execution plan looks like this:
This query is optimized using type 2 profile T2_Linux64, profileid 21.
1) First, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.t.
2) Next, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.c.
3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
4) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
all-rows scan with a condition of ("NOT (D_DB_TMP.t.Customer_ID IS
NULL)") into Spool 4 (all_amps), which is redistributed by the
hash code of (D_DB_TMP.t.Customer_ID) to all AMPs. Then we do a
SORT to order Spool 4 by row hash. The size of Spool 4 is
estimated with low confidence to be 125 rows (2,125 bytes). The
estimated time for this step is 0.01 seconds.
5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of a
RowHash match scan, which is joined to D_DB_TMP.c by way of a
RowHash match scan. Spool 4 and D_DB_TMP.c are joined using a
merge join, with a join condition of ("D_DB_TMP.c.Customer_ID =
Customer_ID"). The result goes into Spool 3 (all_amps), which is
built locally on the AMPs. The size of Spool 3 is estimated with
index join confidence to be 125 rows (10,375 bytes). The
estimated time for this step is 0.02 seconds.
6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
way of an all-rows scan , grouping by field1 (
D_DB_TMP.c.Customer_name). Aggregate Intermediate Results are
computed globally, then placed in Spool 5. The size of Spool 5 is
estimated with no confidence to be 94 rows (14,758 bytes). The
estimated time for this step is 0.02 seconds.
7) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by way of
an all-rows scan into Spool 1 (all_amps), which is built locally
on the AMPs. The size of Spool 1 is estimated with no confidence
to be 94 rows (8,742 bytes). The estimated time for this step is
0.02 seconds.
8) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 1 are sent back to the user as the result of
statement 1. The total estimated time is 0.07 seconds.
1) First, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.t.
2) Next, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.c.
3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
4) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
all-rows scan with a condition of ("NOT (D_DB_TMP.t.Customer_ID IS
NULL)") into Spool 4 (all_amps), which is redistributed by the
hash code of (D_DB_TMP.t.Customer_ID) to all AMPs. Then we do a
SORT to order Spool 4 by row hash. The size of Spool 4 is
estimated with low confidence to be 125 rows (2,125 bytes). The
estimated time for this step is 0.01 seconds.
5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of a
RowHash match scan, which is joined to D_DB_TMP.c by way of a
RowHash match scan. Spool 4 and D_DB_TMP.c are joined using a
merge join, with a join condition of ("D_DB_TMP.c.Customer_ID =
Customer_ID"). The result goes into Spool 3 (all_amps), which is
built locally on the AMPs. The size of Spool 3 is estimated with
index join confidence to be 125 rows (10,375 bytes). The
estimated time for this step is 0.02 seconds.
6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
way of an all-rows scan , grouping by field1 (
D_DB_TMP.c.Customer_name). Aggregate Intermediate Results are
computed globally, then placed in Spool 5. The size of Spool 5 is
estimated with no confidence to be 94 rows (14,758 bytes). The
estimated time for this step is 0.02 seconds.
7) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by way of
an all-rows scan into Spool 1 (all_amps), which is built locally
on the AMPs. The size of Spool 1 is estimated with no confidence
to be 94 rows (8,742 bytes). The estimated time for this step is
0.02 seconds.
8) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 1 are sent back to the user as the result of
statement 1. The total estimated time is 0.07 seconds.
How to identify
If you experience extremely asymmetric AMP load you can suspect on this case.
Find highly skewed JOIN steps in the DBQL (set all logging options on):
select top 50
a.MaxAMPCPUTime * (hashamp()+1) / nullifzero(a.CPUTime) Skw,a.CPUTime,a.MaxAMPCPUTime * (hashamp()+1) CoveringCPUTime,
b.*
from dbc.dbqlsteptbl a
join dbc.dbqlogtbl b on a.procid=b.procid and a.queryid=b.queryid
where
StepName='JIN'
and CPUtime > 100
and Skw > 2
order by CoveringCPUTime desc;
(Note: Covering CPU time is <No-of-AMPs> * <Max AMP's CPU time>. Virtually this amount of CPU is consumed because asymmetric load of the system) Or if you suspect a specific query, check the demography of the join field(s) in the "big" table:
SELECT TOP 100 <Join_field>, count(*) Nbr
FROM <Big_table> GROUP BY 1 ORDER BY 2 DESC;
If the top occurences are spectacularly larger than others (or than average) the idea likely matches.Solution
Break the query into two parts: join the top customer(s) separately, and then all others. Finally union the results. (Sometimes additional modification also required if the embedding operation(s) - the group by here - is/are not decomposable on the same parameter.)First we have to identify the top customer(s):
SELECT TOP 5 Customer_id, count(*) Nbr
FROM Transact GROUP BY 1 ORDER BY 2 DESC;
Customer_id Nbr
------------------------------
345 100004
499873 4
677423 4
187236 4
23482 4
Replace the original query with his one:
SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id in (345)
/*
ID of the top Customer(s).
If more customers are salient, list them, but max ~5
*/
GROUP BY 1
UNION ALL
SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id not in (345) -- Same customer(s)
GROUP BY 1
;
Be sure that Customer.Customer_id, Transact.Transact_id and Transact.Customer_id have statistics!
Rhis query is more complex, has more steps, scans Transact table 2 times, but runs much faster, you can check it.
But why? And how to determine which "top" customers worth to be handled separately?
Read ahead.
Explanation
Calculation
Let's do some maths:
Assume that we are on a 125 AMP system.
Customer table contains 1M records with unique ID.
We have ~4.1M records in the Transact table, 100k for the top customer (ID=345), and 4 for each other customers. This matches the 2.5% we assumed above.
If the Transact table is redistributed on hash(Customer_id) then we will get ~33k records on each AMPs, excluding AMP(hash(345)). Here we'll get ~133k (33k + 100K).
That means that this AMP will process ~4x more data than others, therefore runs 4x longer.
With other words in 75% of this JOIN step's time 124 AMPs will DO NOTHING with the query.
Moreover the preparation and subsequent steps are problematic also: the JOIN is prepared by a redistribution which produces a strongly skewed spool, and the JOIN's result stays locally on the AMPs being skewed also.
Optimized version
This query will consume moderately more CPU, but it is distributed evenly across the AMPs, utilizing the Teradata's full parallel capability.It contains a product join also, but is it no problem it joins 1 records to the selected 100k records of Transacts, that will be lightning fast.
All
Look at the execution plan of the broken-up query:
This query is optimized using type 2 profile T2_Linux64, profileid 21.
1) First, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.t.
2) Next, we lock a distinct D_DB_TMP."pseudo table" for read on a
RowHash to prevent global deadlock for D_DB_TMP.c.
3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
4) We do a single-AMP RETRIEVE step from D_DB_TMP.c by way of the
unique primary index "D_DB_TMP.c.Customer_ID = 345" with no
residual conditions into Spool 4 (all_amps), which is duplicated
on all AMPs. The size of Spool 4 is estimated with high
confidence to be 125 rows (10,625 bytes). The estimated time for
this step is 0.01 seconds.
5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of an
all-rows scan, which is joined to D_DB_TMP.t by way of an all-rows
scan with a condition of ("D_DB_TMP.t.Customer_ID = 345"). Spool
4 and D_DB_TMP.t are joined using a product join, with a join
condition of ("Customer_ID = D_DB_TMP.t.Customer_ID"). The result
goes into Spool 3 (all_amps), which is built locally on the AMPs.
The size of Spool 3 is estimated with low confidence to be 99,670
rows (8,272,610 bytes). The estimated time for this step is 0.09
seconds.
6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
way of an all-rows scan , grouping by field1 (
D_DB_TMP.c.Customer_name). Aggregate Intermediate Results are
computed globally, then placed in Spool 5. The size of Spool 5 is
estimated with no confidence to be 74,753 rows (11,736,221 bytes).
The estimated time for this step is 0.20 seconds.
7) We execute the following steps in parallel.
1) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by
way of an all-rows scan into Spool 1 (all_amps), which is
built locally on the AMPs. The size of Spool 1 is estimated
with no confidence to be 74,753 rows (22,052,135 bytes). The
estimated time for this step is 0.02 seconds.
2) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
all-rows scan with a condition of ("D_DB_TMP.t.Customer_ID <>
3454") into Spool 9 (all_amps), which is redistributed by the
hash code of (D_DB_TMP.t.Customer_ID) to all AMPs. The size
of Spool 9 is estimated with high confidence to be 4,294,230
rows (73,001,910 bytes). The estimated time for this step is
1.80 seconds.
8) We do an all-AMPs JOIN step from D_DB_TMP.c by way of an all-rows
scan with a condition of ("D_DB_TMP.c.Customer_ID <> 3454"), which
is joined to Spool 9 (Last Use) by way of an all-rows scan.
D_DB_TMP.c and Spool 9 are joined using a single partition hash
join, with a join condition of ("D_DB_TMP.c.Customer_ID =
Customer_ID"). The result goes into Spool 8 (all_amps), which is
built locally on the AMPs. The size of Spool 8 is estimated with
low confidence to be 4,294,230 rows (356,421,090 bytes). The
estimated time for this step is 0.72 seconds.
9) We do an all-AMPs SUM step to aggregate from Spool 8 (Last Use) by
way of an all-rows scan , grouping by field1 (
D_DB_TMP.c.Customer_name). Aggregate Intermediate Results are
computed globally, then placed in Spool 10. The size of Spool 10
is estimated with no confidence to be 3,220,673 rows (505,645,661
bytes). The estimated time for this step is 8.46 seconds.
10) We do an all-AMPs RETRIEVE step from Spool 10 (Last Use) by way of
an all-rows scan into Spool 1 (all_amps), which is built locally
on the AMPs. The size of Spool 1 is estimated with no confidence
to be 3,295,426 rows (972,150,670 bytes). The estimated time for
this step is 0.32 seconds.
11) Finally, we send out an END TRANSACTION step to all AMPs involved
in processing the request.
-> The contents of Spool 1 are sent back to the user as the result of
statement 1. The total estimated time is 11.60 seconds.
Sample structures
The table structures (simplified for the example):
CREATE TABLE Customer
(
Customer_ID INTEGER
, Customer_name VARCHAR(200)
)
UNIQUE PRIMARY INDEX (Customer_id)
;
insert into Customer values (1,'Cust-1');
Run 20x:
insert
into Customer select mx + sum(1) over (order by Customer_id rows
unbounded preceding) id, 'Cust-' || trim(id) from Customer cross join
(select max(Customer_id) mx from Customer) x;
collect statistics using sample on customer column (Customer_id);
CREATE TABLE Transact
(
Transaction_ID INTEGER
, Customer_ID INTEGER
)
UNIQUE PRIMARY INDEX (Transaction_id)
;
insert into Transact values (1,1);
Run 22x:
insert
into Transact select mx + sum(1) over (order by Transaction_id rows
unbounded preceding) id, id mod 1000000 from Transact cross join (select
max(Transaction_id) mx from Transact) x;
insert into
Transact select mx + sum(1) over (order by Transaction_id rows unbounded
preceding) id, 345 from Transact t cross join (select
max(Transaction_id) mx from Transact) x where t.Transaction_id <
100000;
collect statistics using sample on Transact column (Customer_id);
collect statistics using sample on Transact column (Transaction_id) ;
Sponsored by PRISE Ltd.
www.prisetools.com
www.prisetools.com