@@ -121,11 +121,34 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
121121
122122 targetKeyspace := td .wd .ct .vde .thisTablet .Keyspace
123123 lockName := fmt .Sprintf ("%s/%s" , targetKeyspace , td .wd .ct .workflow )
124- log .Infof ("Locking workflow %s" , lockName )
125- ctx , unlock , lockErr := td .wd .ct .ts .LockName (ctx , lockName , "vdiff" )
126- if lockErr != nil {
127- log .Errorf ("Locking workfkow %s failed: %v" , lockName , lockErr )
128- return lockErr
124+ log .Infof ("Locking workflow %s for VDiff %s" , lockName , td .wd .ct .uuid )
125+ // We attempt to get the lock until we can, using an exponential backoff.
126+ var (
127+ vctx context.Context
128+ unlock func (* error )
129+ lockErr error
130+ retryDelay = 100 * time .Millisecond
131+ maxRetryDelay = topo .LockTimeout
132+ backoffFactor = 1.5
133+ )
134+ for {
135+ vctx , unlock , lockErr = td .wd .ct .ts .LockName (ctx , lockName , "vdiff" )
136+ if lockErr == nil {
137+ break
138+ }
139+ log .Warningf ("Locking workfkow %s for VDiff %s initialization (stream ID: %d) failed, will wait %v before retrying: %v" ,
140+ lockName , td .wd .ct .uuid , td .wd .ct .id , retryDelay , lockErr )
141+ select {
142+ case <- ctx .Done ():
143+ return vterrors .Errorf (vtrpcpb .Code_CANCELED , "engine is shutting down" )
144+ case <- td .wd .ct .done :
145+ return ErrVDiffStoppedByUser
146+ case <- time .After (retryDelay ):
147+ if retryDelay < maxRetryDelay {
148+ retryDelay = min (time .Duration (float64 (retryDelay )* backoffFactor ), maxRetryDelay )
149+ }
150+ continue
151+ }
129152 }
130153
131154 var err error
@@ -136,7 +159,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
136159 }
137160 }()
138161
139- if err := td .stopTargetVReplicationStreams (ctx , dbClient ); err != nil {
162+ if err := td .stopTargetVReplicationStreams (vctx , dbClient ); err != nil {
140163 return err
141164 }
142165 defer func () {
@@ -151,18 +174,18 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
151174 }
152175 }()
153176
154- td .shardStreamsCtx , td .shardStreamsCancel = context .WithCancel (ctx )
177+ td .shardStreamsCtx , td .shardStreamsCancel = context .WithCancel (vctx )
155178
156- if err := td .selectTablets (ctx ); err != nil {
179+ if err := td .selectTablets (vctx ); err != nil {
157180 return err
158181 }
159- if err := td .syncSourceStreams (ctx ); err != nil {
182+ if err := td .syncSourceStreams (vctx ); err != nil {
160183 return err
161184 }
162185 if err := td .startSourceDataStreams (td .shardStreamsCtx ); err != nil {
163186 return err
164187 }
165- if err := td .syncTargetStreams (ctx ); err != nil {
188+ if err := td .syncTargetStreams (vctx ); err != nil {
166189 return err
167190 }
168191 if err := td .startTargetDataStream (td .shardStreamsCtx ); err != nil {
0 commit comments