Skip to content

Commit da8cb80

Browse files
authored
fix(client): add nilguards to updateBroker (#3393)
This _should_ be a rare edge case as the updateBroker func is only called from updateMetadata which does already do an up-front `client.Close()` check under read-lock before then acquiring the write lock. However, there's potentially a small window of opportunity that if client.Close() was called whilst metadata refresh was in-flight and for whatever reason the updateMetadata goroutine gets pre-empted in-between the readlock release and the write lock acquire then the client could have been closed and so `client.brokers` will be nil. I wouldn't have expected this to ever happen, but it was reported by a user in an older Sarama version in #3391 and there's no harm in adding the nilguard just in case. Signed-off-by: Dominic Evans <[email protected]>
1 parent 0628f24 commit da8cb80

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,9 +685,16 @@ func (client *client) randomizeSeedBrokers(addrs []string) {
685685
}
686686

687687
func (client *client) updateBroker(brokers []*Broker) {
688+
if client.brokers == nil {
689+
return
690+
}
691+
688692
currentBroker := make(map[int32]*Broker, len(brokers))
689693

690694
for _, broker := range brokers {
695+
if broker == nil {
696+
continue
697+
}
691698
currentBroker[broker.ID()] = broker
692699
if client.brokers[broker.ID()] == nil { // add new broker
693700
client.brokers[broker.ID()] = broker

client_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/rcrowley/go-metrics"
1616
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
1718
)
1819

1920
func TestSimpleClient(t *testing.T) {
@@ -1330,3 +1331,70 @@ func TestMetricsCleanup(t *testing.T) {
13301331
t.Errorf("excepted 1 metric, found: %v", all)
13311332
}
13321333
}
1334+
1335+
func TestUpdateBroker(t *testing.T) {
1336+
t.Run("closed client doesn't panic", func(t *testing.T) {
1337+
c := &client{}
1338+
fn := func() {
1339+
c.updateBroker(nil)
1340+
c.updateBroker([]*Broker{
1341+
{
1342+
id: 0,
1343+
addr: "127.0.0.1:9092",
1344+
},
1345+
})
1346+
}
1347+
require.NotPanics(t, fn)
1348+
})
1349+
1350+
t.Run("open client adds new broker entries", func(t *testing.T) {
1351+
c := &client{
1352+
brokers: make(map[int32]*Broker),
1353+
}
1354+
fn := func() {
1355+
c.updateBroker([]*Broker{
1356+
{
1357+
id: 0,
1358+
addr: "127.0.0.1:9092",
1359+
},
1360+
})
1361+
}
1362+
require.NotPanics(t, fn)
1363+
require.Len(t, c.brokers, 1)
1364+
assert.Equal(t, 0, int(c.brokers[0].ID()))
1365+
assert.Equal(t, "127.0.0.1:9092", c.brokers[0].Addr())
1366+
})
1367+
1368+
t.Run("open client adds, updates and removes broker entries", func(t *testing.T) {
1369+
c := &client{
1370+
brokers: map[int32]*Broker{
1371+
0: {
1372+
id: 0,
1373+
addr: "127.0.0.1:9092",
1374+
},
1375+
1: {
1376+
id: 1,
1377+
addr: "127.0.0.1:9093",
1378+
},
1379+
},
1380+
}
1381+
fn := func() {
1382+
c.updateBroker([]*Broker{
1383+
{
1384+
id: 1,
1385+
addr: "127.0.0.1:19093", // new addr for existing broker
1386+
},
1387+
{
1388+
id: 2,
1389+
addr: "127.0.0.1:19094",
1390+
},
1391+
})
1392+
}
1393+
require.NotPanics(t, fn)
1394+
require.Len(t, c.brokers, 2)
1395+
assert.Equal(t, 1, int(c.brokers[1].ID()))
1396+
assert.Equal(t, "127.0.0.1:19093", c.brokers[1].Addr())
1397+
assert.Equal(t, 2, int(c.brokers[2].ID()))
1398+
assert.Equal(t, "127.0.0.1:19094", c.brokers[2].Addr())
1399+
})
1400+
}

0 commit comments

Comments
 (0)