@@ -17,227 +17,72 @@ limitations under the License.
1717package balancer
1818
1919import (
20- "context"
2120 "fmt"
22- "maps"
2321 "net/http"
24- "slices"
25- "strings"
26- "sync"
2722
2823 "github.com/cespare/xxhash/v2"
2924
3025 "vitess.io/vitess/go/vt/discovery"
3126 querypb "vitess.io/vitess/go/vt/proto/query"
32- "vitess.io/vitess/go/vt/proto/topodata"
33- "vitess.io/vitess/go/vt/srvtopo"
3427 "vitess.io/vitess/go/vt/topo/topoproto"
3528)
3629
37- // tabletTypesToWatch are the tablet types that will be included in the hash rings.
38- var tabletTypesToWatch = map [topodata.TabletType ]struct {}{topodata .TabletType_REPLICA : {}, topodata .TabletType_RDONLY : {}}
39-
4030// SessionBalancer implements the TabletBalancer interface. For a given session,
4131// it will return the same tablet for its duration, with preference to tablets in
4232// the local cell.
4333type SessionBalancer struct {
4434 // localCell is the cell the gateway is currently running in.
4535 localCell string
46-
47- // hc is the tablet health check.
48- hc discovery.HealthCheck
49-
50- mu sync.RWMutex
51-
52- // localTablets is a map of tablets in the local cell for each target.
53- localTablets map [discovery.KeyspaceShardTabletType ]TabletSet
54-
55- // externalTablets is a map of tablets external to this cell for each target.
56- externalTablets map [discovery.KeyspaceShardTabletType ]TabletSet
57-
58- // tablets keeps track of the latest state of each tablet, keyed by alias. This
59- // is used to remove tablets from old targets when their target changes (a
60- // PlannedReparentShard for example).
61- tablets TabletSet
6236}
6337
64- // TabletSet represents a set of tablets, keyed by alias.
65- type TabletSet map [string ]* discovery.TabletHealth
66-
6738// NewSessionBalancer creates a new session balancer.
68- func NewSessionBalancer (ctx context.Context , localCell string , topoServer srvtopo.Server , hc discovery.HealthCheck ) (TabletBalancer , error ) {
69- b := & SessionBalancer {
70- localCell : localCell ,
71- hc : hc ,
72- localTablets : make (map [discovery.KeyspaceShardTabletType ]TabletSet ),
73- externalTablets : make (map [discovery.KeyspaceShardTabletType ]TabletSet ),
74- tablets : make (TabletSet ),
75- }
76-
77- // Set up health check subscription
78- hcChan := b .hc .Subscribe ("SessionBalancer" )
79-
80- // Build initial hash rings
81-
82- // Find all the targets we're watching
83- targets , _ , err := srvtopo .FindAllTargetsAndKeyspaces (ctx , topoServer , b .localCell , discovery .KeyspacesToWatch , slices .Collect (maps .Keys (tabletTypesToWatch )))
84- if err != nil {
85- return nil , fmt .Errorf ("session balancer: failed to find all targets and keyspaces: %w" , err )
86- }
87-
88- // Add each tablet to the hash ring
89- for _ , target := range targets {
90- tablets := b .hc .GetHealthyTabletStats (target )
91- for _ , tablet := range tablets {
92- b .onTabletHealthChange (tablet )
93- }
94- }
95-
96- // Start watcher to keep track of tablet health
97- go b .watchHealthCheck (ctx , hcChan )
98-
99- return b , nil
100- }
101-
102- // watchHealthCheck watches the health check channel for tablet health changes and updates the set of tablets accordingly.
103- func (b * SessionBalancer ) watchHealthCheck (ctx context.Context , hcChan chan * discovery.TabletHealth ) {
104- for {
105- select {
106- case <- ctx .Done ():
107- b .hc .Unsubscribe (hcChan )
108- return
109- case tablet := <- hcChan :
110- if tablet == nil {
111- continue
112- }
113-
114- // Remove tablet from old target if it has changed
115- b .removeOldTablet (tablet )
116-
117- // Ignore tablets we aren't supposed to watch
118- if _ , ok := tabletTypesToWatch [tablet .Target .TabletType ]; ! ok {
119- continue
120- }
121-
122- b .onTabletHealthChange (tablet )
123- }
124- }
125- }
126-
127- // removeOldTablet removes the entry for a tablet in its old target if its target has changed. For example, if a
128- // reparent happens and a replica is now a primary, we need to remove it from the list of tablets for the replica
129- // target.
130- func (b * SessionBalancer ) removeOldTablet (newTablet * discovery.TabletHealth ) {
131- b .mu .Lock ()
132- defer b .mu .Unlock ()
133-
134- alias := tabletAlias (newTablet )
135- prevTablet , exists := b .tablets [alias ]
136- if ! exists {
137- return
138- }
139-
140- prevTargetKey := discovery .KeyFromTarget (prevTablet .Target )
141- currentTargetKey := discovery .KeyFromTarget (newTablet .Target )
142-
143- // If this tablet's target changed, remove it from its old target.
144- if prevTargetKey == currentTargetKey {
145- return
146- }
147-
148- b .removeTablet (b .localTablets , prevTargetKey , prevTablet )
149- b .removeTablet (b .externalTablets , prevTargetKey , prevTablet )
150- }
151-
152- // onTabletHealthChange is the handler for tablet health events. If a tablet goes into serving,
153- // it is added to the appropriate (local or external) hash ring for its target. If it goes out
154- // of serving, it is removed from the hash ring.
155- func (b * SessionBalancer ) onTabletHealthChange (newTablet * discovery.TabletHealth ) {
156- b .mu .Lock ()
157- defer b .mu .Unlock ()
158-
159- var tablets map [discovery.KeyspaceShardTabletType ]TabletSet
160- if newTablet .Tablet .Alias .Cell == b .localCell {
161- tablets = b .localTablets
162- } else {
163- tablets = b .externalTablets
164- }
165-
166- alias := tabletAlias (newTablet )
167- targetKey := discovery .KeyFromTarget (newTablet .Target )
168-
169- if newTablet .Serving {
170- b .addTablet (tablets , targetKey , newTablet )
171- b .tablets [alias ] = newTablet
172- } else {
173- b .removeTablet (tablets , targetKey , newTablet )
174- delete (b .tablets , alias )
175- }
176- }
177-
178- // addTablet adds a tablet to the target in the given list of tablets.
179- func (b * SessionBalancer ) addTablet (tablets map [discovery.KeyspaceShardTabletType ]TabletSet , targetKey discovery.KeyspaceShardTabletType , tablet * discovery.TabletHealth ) {
180- target , ok := tablets [targetKey ]
181- if ! ok {
182- // Create the set if one has not been created for this target yet
183- tablets [targetKey ] = make (TabletSet )
184- target = tablets [targetKey ]
185- }
186-
187- alias := tabletAlias (tablet )
188- target [alias ] = tablet
189- }
190-
191- // removeTablet removes the tablet from the target in the given list of tablets.
192- func (b * SessionBalancer ) removeTablet (tablets map [discovery.KeyspaceShardTabletType ]TabletSet , targetKey discovery.KeyspaceShardTabletType , tablet * discovery.TabletHealth ) {
193- alias := tabletAlias (tablet )
194- delete (tablets [targetKey ], alias )
39+ func NewSessionBalancer (localCell string ) TabletBalancer {
40+ return & SessionBalancer {localCell : localCell }
19541}
19642
19743// Pick is the main entry point to the balancer.
19844//
19945// For a given session, it will return the same tablet for its duration, with preference to tablets
20046// in the local cell.
201- func (b * SessionBalancer ) Pick (target * querypb.Target , _ []* discovery.TabletHealth , opts PickOpts ) * discovery.TabletHealth {
47+ func (b * SessionBalancer ) Pick (target * querypb.Target , tablets []* discovery.TabletHealth , opts PickOpts ) * discovery.TabletHealth {
20248 if opts .SessionUUID == "" {
20349 return nil
20450 }
20551
206- b .mu .RLock ()
207- defer b .mu .RUnlock ()
52+ // Find the highest weight local and external tablets
53+ var maxLocalWeight , maxExternalWeight uint64
54+ var maxLocalTablet , maxExternalTablet * discovery.TabletHealth
20855
209- targetKey := discovery .KeyFromTarget (target )
210-
211- // Try to find a tablet in the local cell first
212- tablet := pick (b .localTablets [targetKey ], opts )
213- if tablet != nil {
214- return tablet
215- }
56+ for _ , tablet := range tablets {
57+ alias := tabletAlias (tablet )
21658
217- // If we didn't find a tablet in the local cell, try external cells
218- tablet = pick (b .externalTablets [targetKey ], opts )
219- return tablet
220- }
221-
222- // pick picks the highest weight valid tablet from the set of tablets.
223- func pick (tablets TabletSet , opts PickOpts ) * discovery.TabletHealth {
224- var maxWeight uint64
225- var maxTablet * discovery.TabletHealth
226-
227- for alias , tablet := range tablets {
228- invalid := opts .InvalidTablets [alias ]
229- if invalid {
59+ // Ignore invalid tablets
60+ if _ , invalid := opts .InvalidTablets [alias ]; invalid {
23061 continue
23162 }
23263
23364 weight := weight (alias , opts .SessionUUID )
234- if tablet == nil || weight > maxWeight {
235- maxWeight = weight
236- maxTablet = tablet
65+
66+ if b .isLocal (tablet ) && ((maxLocalTablet == nil ) || (weight > maxLocalWeight )) {
67+ maxLocalWeight = weight
68+ maxLocalTablet = tablet
23769 }
70+
71+ // We can consider all tablets here since we'd only use this if there were no
72+ // valid local tablets (meaning we'd have only considered external tablets anyway).
73+ if (maxExternalTablet == nil ) || (weight > maxExternalWeight ) {
74+ maxExternalWeight = weight
75+ maxExternalTablet = tablet
76+ }
77+ }
78+
79+ // If we found a valid local tablet, use that
80+ if maxLocalTablet != nil {
81+ return maxLocalTablet
23882 }
23983
240- return maxTablet
84+ // Otherwise, use the max external tablet (if it exists)
85+ return maxExternalTablet
24186}
24287
24388// weight computes the weight of a tablet by hashing its alias and the session UUID together.
@@ -250,44 +95,13 @@ func tabletAlias(tablet *discovery.TabletHealth) string {
25095 return topoproto .TabletAliasString (tablet .Tablet .Alias )
25196}
25297
98+ // isLocal returns true if the tablet is in the local cell.
99+ func (b * SessionBalancer ) isLocal (tablet * discovery.TabletHealth ) bool {
100+ return tablet .Tablet .Alias .Cell == b .localCell
101+ }
102+
253103// DebugHandler provides a summary of the session balancer state.
254104func (b * SessionBalancer ) DebugHandler (w http.ResponseWriter , r * http.Request ) {
255105 w .Header ().Set ("Content-Type" , "text/plain" )
256- fmt .Fprint (w , b .print ())
257- }
258-
259- // print returns a string representation of the session balancer state for debugging.
260- func (b * SessionBalancer ) print () string {
261- b .mu .RLock ()
262- defer b .mu .RUnlock ()
263-
264- sb := strings.Builder {}
265-
266- sb .WriteString ("Session balancer\n " )
267- sb .WriteString ("================\n " )
268- sb .WriteString (fmt .Sprintf ("Local cell: %s\n \n " , b .localCell ))
269-
270- sb .WriteString ("Local tablets:\n " )
271-
272- for target , tablets := range b .localTablets {
273- if len (tablets ) == 0 {
274- continue
275- }
276-
277- sb .WriteString (fmt .Sprintf ("\t - Target: %s\n " , target ))
278- sb .WriteString (fmt .Sprintf ("\t \t Tablets: %+v\n " , slices .Collect (maps .Keys (tablets ))))
279- }
280-
281- sb .WriteString ("External tablets:\n " )
282-
283- for target , tablets := range b .externalTablets {
284- if len (tablets ) == 0 {
285- continue
286- }
287-
288- sb .WriteString (fmt .Sprintf ("\t - Target: %s\n " , target ))
289- sb .WriteString (fmt .Sprintf ("\t \t Tablets: %+v\n " , slices .Collect (maps .Keys (tablets ))))
290- }
291-
292- return sb .String ()
106+ fmt .Fprintf (w , "Local cell: %s" , b .localCell )
293107}
0 commit comments