Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"

opasdktest "github.com/open-policy-agent/opa/v1/sdk/test"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -558,8 +560,7 @@ func TestAuthorizeRequestFilter(t *testing.T) {
openpolicyagent.WithEnvoyMetadataBytes(envoyMetaDataConfig),
)

opaFactory, err := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(tracingtest.NewTracer()),
openpolicyagent.WithOpenPolicyAgentInstanceConfig(opts...))
opaFactory, err := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(tracingtest.NewTracer()), openpolicyagent.WithOpenPolicyAgentInstanceConfig(opts...))
assert.NoError(t, err)

ftSpec := NewOpaAuthorizeRequestSpec(opaFactory)
Expand Down Expand Up @@ -606,6 +607,279 @@ func TestAuthorizeRequestFilter(t *testing.T) {
}
}

func TestAuthorizeRequestFilterWithS3DecisionLogPlugin(t *testing.T) {
for _, ti := range []struct {
msg string
filterName string
extraeskipBefore string
extraeskipAfter string
regoQuery string
requestPath string
requestMethod string
body string
contextExtensions string
expectedBody string
expectedHeaders http.Header
expectedStatus int
backendHeaders http.Header
removeHeaders http.Header
discoveryPath string
discoveryBundle string
}{
{
msg: "Allow Requests for log upload success",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusOK,
expectedBody: "Welcome!",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-success",
},
{
msg: "Deny Requests for log upload success",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/not-allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusForbidden,
expectedBody: "",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-success",
},
{
msg: "Allow Requests for log upload timeout",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusOK,
expectedBody: "Welcome!",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-timeout",
},
{
msg: "Allow Requests for log upload forbidden",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusOK,
expectedBody: "Welcome!",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-forbidden",
},
{
msg: "Allow Requests for log upload path not found",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusOK,
expectedBody: "Welcome!",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-notfound",
},
{
msg: "Allow Requests for log upload throws 5xx",
filterName: "opaAuthorizeRequest",
regoQuery: "envoy/authz/allow",
requestPath: "/allow",
requestMethod: "GET",
contextExtensions: "",
expectedStatus: http.StatusOK,
expectedBody: "Welcome!",
expectedHeaders: make(http.Header),
backendHeaders: make(http.Header),
removeHeaders: make(http.Header),
discoveryPath: "logs-5xx",
},
} {
t.Run(ti.msg, func(t *testing.T) {
t.Logf("Running test for %v", ti)
clientServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Welcome!"))
assert.True(t, isHeadersPresent(t, ti.backendHeaders, r.Header), "Enriched request header is absent.")
assert.True(t, isHeadersAbsent(t, ti.removeHeaders, r.Header), "Unwanted HTTP Headers present.")

body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, ti.body, string(body))
}))
defer clientServer.Close()

var logUploadCount int32
s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "logs-success") {
atomic.AddInt32(&logUploadCount, 1)
w.WriteHeader(http.StatusOK)
} else if strings.Contains(r.URL.Path, "logs-forbidden") {
atomic.AddInt32(&logUploadCount, 1)
w.WriteHeader(http.StatusForbidden)
} else if strings.Contains(r.URL.Path, "logs-timeout") {
atomic.AddInt32(&logUploadCount, 1)
time.Sleep(5 * time.Second)
w.WriteHeader(http.StatusOK)
} else if strings.Contains(r.URL.Path, "logs-5xx") {
atomic.AddInt32(&logUploadCount, 1)
w.WriteHeader(http.StatusInternalServerError)
} else {
atomic.AddInt32(&logUploadCount, 1)
w.WriteHeader(http.StatusNotFound)
}
}))
defer s3Server.Close()

opaControlPlane := opasdktest.MustNewServer(
opasdktest.MockBundle("/bundles/test", map[string]string{
"main.rego": `
package envoy.authz

default allow = false

allow if {
input.parsed_path == [ "allow" ]
input.parsed_query == {}
}
`,
}),
opasdktest.MockBundle("/bundles/discovery", map[string]string{
"data.json": `{
"discovery": {
"bundles": {
"bundles/test": {
"persist": false,
"resource": "bundles/test",
"service": "test"
}
}}
}`,
}),
)

config := []byte(fmt.Sprintf(`{
"services": {
"test": {
"url": %q
}
},
"discovery": {
"name": "discovery",
"resource": "/bundles/discovery",
"service": "test"
},
"labels": {
"environment": "test"
},
"decision_logs": {
"plugin": "eopa_dl"
},
"plugins": {
"envoy_ext_authz_grpc": {
"path": %q,
"dry-run": false
},
"eopa_dl": {
"buffer": {
"type": "memory",
"max_bytes": 50000000
},
"output": {
"type": "s3",
"bucket": %q,
"endpoint": %q,
"force_path": true,
"region": "eu-central-1",
"access_key_id": "myid",
"access_secret": "mysecret",
"timeout": "2s",
"batching": {
"at_bytes": 10000000,
"at_period": "1s"
}
}
}
}
}`, opaControlPlane.URL(), ti.regoQuery, ti.discoveryPath, s3Server.URL))

fr := make(filters.Registry)

envoyMetaDataConfig := []byte(`{
"filter_metadata": {
"envoy.filters.http.header_to_metadata": {
"policy_type": "ingress"
}
}
}`)

opts := make([]func(*openpolicyagent.OpenPolicyAgentInstanceConfig) error, 0)
opts = append(opts,
openpolicyagent.WithConfigTemplate(config),
openpolicyagent.WithEnvoyMetadataBytes(envoyMetaDataConfig),
)
opaFactory, err := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(tracingtest.NewTracer()),
openpolicyagent.WithOpenPolicyAgentInstanceConfig(opts...))
assert.NoError(t, err)

ftSpec := NewOpaAuthorizeRequestSpec(opaFactory)
fr.Register(ftSpec)
ftSpec = NewOpaAuthorizeRequestWithBodySpec(opaFactory)
fr.Register(ftSpec)
fr.Register(builtin.NewSetPath())

r := eskip.MustParse(fmt.Sprintf(`* -> %s %s("%s", "%s") %s -> "%s"`, ti.extraeskipBefore, ti.filterName, "test", ti.contextExtensions, ti.extraeskipAfter, clientServer.URL))

proxy := proxytest.New(fr, r...)

var bodyReader io.Reader
if ti.body != "" {
bodyReader = strings.NewReader(ti.body)
}

req, err := http.NewRequest(ti.requestMethod, proxy.URL+ti.requestPath, bodyReader)
assert.NoError(t, err)

rsp, err := proxy.Client().Do(req)
assert.NoError(t, err)

assert.Equal(t, ti.expectedStatus, rsp.StatusCode, "HTTP status does not match")

assert.True(t, isHeadersPresent(t, ti.expectedHeaders, rsp.Header), "HTTP Headers do not match")
defer rsp.Body.Close()
body, err := io.ReadAll(rsp.Body)
assert.NoError(t, err)
assert.Equal(t, ti.expectedBody, string(body), "HTTP Body does not match")

time.Sleep(2 * time.Second) // wait for async decision log to be sent
assert.True(t, atomic.LoadInt32(&logUploadCount) >= 1, "Decision log upload was not attempted")

// Simulate a second request while decision log batching/upload is in progress
proxy.Client().Timeout = 20 * time.Millisecond
rsp, err = proxy.Client().Do(req)
assert.NoError(t, err)
assert.Equal(t, ti.expectedStatus, rsp.StatusCode, "HTTP status does not match")
})
}
}

func TestCreateFilterArguments(t *testing.T) {
opaRegistry, err := openpolicyagent.NewOpenPolicyAgentRegistry(
openpolicyagent.WithOpenPolicyAgentInstanceConfig(openpolicyagent.WithConfigTemplate([]byte(""))))
Expand Down
12 changes: 11 additions & 1 deletion filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
dl "github.com/open-policy-agent/eopa/pkg/plugins/decision_logs"
"github.com/open-policy-agent/opa/v1/util"
"io"
"maps"
"math/rand"
Expand Down Expand Up @@ -690,7 +692,15 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, bundleName str
return nil, err
}

discoveryPlugin, err := discovery.New(manager, discovery.Factories(map[string]plugins.Factory{envoy.PluginName: envoy.Factory{}}), discovery.Hooks(configHooks))
discoveryOpts := map[string]plugins.Factory{envoy.PluginName: envoy.Factory{}, dl.DLPluginName: dl.Factory()}

var bootConfig map[string]any
err = util.Unmarshal(configBytes, &bootConfig)
if err != nil {
return nil, err
}
Comment on lines +697 to +701
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not exactly related to enabling the eopa_dl plugin.

It seems unless we add discovery.BootConfig function during the discovery initialization, the decision_log or plugins configuration provided in the opaconfig.yaml is not effective.
As we are planning to work with both bootstrapped and discovered configuration (Possible from opa v0.64.0 onwards) we will need this.

If I did not miss anything, This configuration we have for the clusters is not currently doing anything and will start to make an impact once this change is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so regardless the decision log config will be overwritten by Styra DAS correct?
So to test it in integration we would need to change the discovery config?

Copy link
Collaborator Author

@wisinghe wisinghe Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current filter implementation, yes.

This code change takes away the requirement to change the discovery config, by passing the bootstrap config into the discovery initialization (at line 703)


discoveryPlugin, err := discovery.New(manager, discovery.Factories(discoveryOpts), discovery.Hooks(configHooks), discovery.BootConfig(bootConfig))
if err != nil {
return nil, err
}
Expand Down
39 changes: 39 additions & 0 deletions filters/openpolicyagent/openpolicyagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,44 @@
{"discovery":{"bundles":{"bundles/test":{"persist":false,"resource":"bundles/test","service":"test"}}}}
`,
}),
opasdktest.MockBundle("/bundles/discovery-eopa-plugin", map[string]string{
"data.json": `{
"discovery": {
"bundles": {
"bundles/test": {
"persist": false,
"resource": "bundles/test",
"service": "test"
}
},
"decision_logs": {
"plugin": "eopa_dl"
},
"plugins": {
"eopa_dl": {
"buffer": {
"type": "memory",
"max_bytes": 50000000
},
"output": {
"type": "s3",
"bucket": "logs",
"endpoint": "https://example.s3.eu-central-1.amazonaws.com/",
"force_path": true,
"region": "eu-central-1",
"access_key_id": "myid",
"access_secret": "mysecret",
"batching": {
"at_bytes": 10000000,
"at_period": "1s"
}
}
}
}
}
}
`,
}),
opasdktest.MockBundle("/bundles/discovery-with-wrong-bundle", map[string]string{
"data.json": `
{"discovery":{"bundles":{"bundles/non-existing-bundle":{"persist":false,"resource":"bundles/non-existing-bundle","service":"test"}}}}
Expand Down Expand Up @@ -1084,6 +1122,7 @@
expectedTriggerMode plugins.TriggerMode
discoveryBundle string
resourceBundle bool
enableEopaPlugins bool

Check failure on line 1125 in filters/openpolicyagent/openpolicyagent_test.go

View workflow job for this annotation

GitHub Actions / tests

field enableEopaPlugins is unused (U1000)
}

func runWithTestCases(t *testing.T, cases []opaInstanceStartupTestCase, test func(t *testing.T, tc opaInstanceStartupTestCase)) {
Expand Down
Loading
Loading