@@ -22,7 +22,6 @@ import (
2222 "io"
2323 "net/http"
2424 "runtime"
25- "strconv"
2625 "strings"
2726 "sync"
2827 "testing"
5857 targetKsOpts = make (map [string ]string )
5958 httpClient = throttlebase .SetupHTTPClient (time .Second )
6059 sourceThrottlerAppName = throttlerapp .VStreamerName
61- targetThrottlerAppName = throttlerapp .VPlayerName
60+ targetThrottlerAppName = throttlerapp .VReplicationName
6261)
6362
6463const (
@@ -1168,7 +1167,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {
11681167
11691168func materializeProduct (t * testing.T , useVtctldClient bool ) {
11701169 t .Run ("materializeProduct" , func (t * testing.T ) {
1171- // Materializing from "product" keyspace to "customer" keyspace.
1170+ // materializing from "product" keyspace to "customer" keyspace
11721171 workflow := "cproduct"
11731172 keyspace := "customer"
11741173 defaultCell := vc .Cells [vc .CellNames [0 ]]
@@ -1182,7 +1181,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11821181
11831182 productTablets := vc .getVttabletsInKeyspace (t , defaultCell , "product" , "primary" )
11841183 t .Run ("throttle-app-product" , func (t * testing.T ) {
1185- // Now, throttle the source side component (vstreamer), and insert some rows.
1184+ // Now, throttle the streamer on source tablets, insert some rows
11861185 for _ , tab := range productTablets {
11871186 body , err := throttleApp (tab , sourceThrottlerAppName )
11881187 assert .NoError (t , err )
@@ -1193,22 +1192,19 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11931192 waitForTabletThrottlingStatus (t , tab , targetThrottlerAppName , throttlerStatusNotThrottled )
11941193 }
11951194 insertMoreProductsForSourceThrottler (t )
1196- // To be fair to the test, we give the target time to apply the new changes. We
1197- // expect it to NOT get them in the first place, we expect the additional rows
1198- // to **not appear** in the materialized view.
1195+ // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
1196+ // we expect the additional rows to **not appear** in the materialized view
11991197 for _ , tab := range customerTablets {
12001198 waitForRowCountInTablet (t , tab , keyspace , workflow , 5 )
1201- // Confirm that we updated the stats on the target tablets as expected.
1202- confirmVReplicationThrottling (t , tab , sourceKs , workflow , sourceThrottlerAppName )
12031199 }
12041200 })
12051201 t .Run ("unthrottle-app-product" , func (t * testing.T ) {
1206- // Unthrottle the vstreamer component , and expect the rows to show up.
1202+ // unthrottle on source tablets , and expect the rows to show up
12071203 for _ , tab := range productTablets {
12081204 body , err := unthrottleApp (tab , sourceThrottlerAppName )
12091205 assert .NoError (t , err )
12101206 assert .Contains (t , body , sourceThrottlerAppName )
1211- // Give time for unthrottling to take effect and for targets to fetch data.
1207+ // give time for unthrottling to take effect and for target to fetch data
12121208 waitForTabletThrottlingStatus (t , tab , sourceThrottlerAppName , throttlerStatusNotThrottled )
12131209 }
12141210 for _ , tab := range customerTablets {
@@ -1217,8 +1213,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12171213 })
12181214
12191215 t .Run ("throttle-app-customer" , func (t * testing.T ) {
1220- // Now, throttle vreplication on the target side (vplayer) , and insert some
1221- // more rows.
1216+ // Now, throttle vreplication (vcopier/vapplier) on target tablets , and
1217+ // insert some more rows.
12221218 for _ , tab := range customerTablets {
12231219 body , err := throttleApp (tab , targetThrottlerAppName )
12241220 assert .NoError (t , err )
@@ -1233,8 +1229,6 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12331229 // rows to **not appear** in the materialized view.
12341230 for _ , tab := range customerTablets {
12351231 waitForRowCountInTablet (t , tab , keyspace , workflow , 8 )
1236- // Confirm that we updated the stats on the target tablets as expected.
1237- confirmVReplicationThrottling (t , tab , sourceKs , workflow , targetThrottlerAppName )
12381232 }
12391233 })
12401234 t .Run ("unthrottle-app-customer" , func (t * testing.T ) {
@@ -1790,52 +1784,3 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
17901784func releaseInnoDBRowHistory (t * testing.T , dbConn * mysql.Conn ) {
17911785 execQuery (t , dbConn , "rollback" )
17921786}
1793-
1794- // confirmVReplicationThrottling confirms that the throttling related metrics reflect that
1795- // the workflow is being throttled as expected, via the expected app name, and that this
1796- // is impacting the lag as expected.
1797- // The tablet passed should be a target tablet for the given workflow while the keyspace
1798- // name provided should be the source keyspace as the target tablet stats note the stream's
1799- // source keyspace and shard.
1800- func confirmVReplicationThrottling (t * testing.T , tab * cluster.VttabletProcess , keyspace , workflow string , appname throttlerapp.Name ) {
1801- const (
1802- sleepTime = 5 * time .Second
1803- zv = int64 (0 )
1804- )
1805- time .Sleep (sleepTime ) // To be sure that we accrue some lag
1806-
1807- jsVal , err := getDebugVar (t , tab .Port , []string {"VReplicationThrottledCounts" })
1808- require .NoError (t , err )
1809- require .NotEqual (t , "{}" , jsVal )
1810- // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1811- throttledCount := gjson .Get (jsVal , fmt .Sprintf (`%s\.*\.tablet\.%s` , workflow , appname )).Int ()
1812- require .Greater (t , throttledCount , zv , "JSON value: %s" , jsVal )
1813-
1814- val , err := getDebugVar (t , tab .Port , []string {"VReplicationThrottledCountTotal" })
1815- require .NoError (t , err )
1816- require .NotEqual (t , "" , val )
1817- throttledCountTotal , err := strconv .ParseInt (val , 10 , 64 )
1818- require .NoError (t , err )
1819- require .GreaterOrEqual (t , throttledCountTotal , throttledCount , "Value: %s" , val )
1820-
1821- // We do not calculate replication lag for the vcopier as it's not replicating
1822- // events.
1823- if appname != throttlerapp .VCopierName {
1824- jsVal , err = getDebugVar (t , tab .Port , []string {"VReplicationLagSeconds" })
1825- require .NoError (t , err )
1826- require .NotEqual (t , "{}" , jsVal )
1827- // The JSON value looks like this: {"product.0.cproduct.4": 6}
1828- vreplLagSeconds := gjson .Get (jsVal , fmt .Sprintf (`%s\.*\.%s\.*` , keyspace , workflow )).Int ()
1829- require .NoError (t , err )
1830- // Take off 1 second to deal with timing issues in the test.
1831- minLagSecs := int64 (int64 (sleepTime .Seconds ()) - 1 )
1832- require .GreaterOrEqual (t , vreplLagSeconds , minLagSecs , "JSON value: %s" , jsVal )
1833-
1834- val , err = getDebugVar (t , tab .Port , []string {"VReplicationLagSecondsMax" })
1835- require .NoError (t , err )
1836- require .NotEqual (t , "" , val )
1837- vreplLagSecondsMax , err := strconv .ParseInt (val , 10 , 64 )
1838- require .NoError (t , err )
1839- require .GreaterOrEqual (t , vreplLagSecondsMax , vreplLagSeconds , "Value: %s" , val )
1840- }
1841- }
0 commit comments