Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions chromadb/test/distributed/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,29 @@ def test_function_remove_nonexistent(basic_http_client: System) -> None:
# Trying to detach this function again should raise NotFoundError
with pytest.raises(NotFoundError, match="does not exist"):
attached_fn.detach(delete_output_collection=True)

def test_attach_to_output_collection_fails(basic_http_client: System) -> None:
"""Test that attaching a function to an output collection fails"""
client = ClientCreator.from_system(basic_http_client)
client.reset()

# Create input collection
input_collection = client.create_collection(name="input_collection")
input_collection.add(ids=["id1"], documents=["test"])

_ = input_collection.attach_function(
name="test_function",
function_id="record_counter",
output_collection="output_collection",
params=None,
)

output_collection = client.get_collection(name="output_collection")

with pytest.raises(ChromaError, match="cannot attach function to an output collection"):
_ = output_collection.attach_function(
name="test_function_2",
function_id="record_counter",
output_collection="output_collection_2",
params=None,
)
4 changes: 4 additions & 0 deletions go/pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ package common
const (
DefaultTenant = "default_tenant"
DefaultDatabase = "default_database"

// SourceAttachedFunctionIDKey is the metadata key used to mark output collections
// and link them to their attached function.
SourceAttachedFunctionIDKey = "chroma:source_attached_function_id"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which reads better:

  • "source attached function id"
  • "attached function id source"

)
11 changes: 6 additions & 5 deletions go/pkg/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ var (
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")

// AttachedFunction errors
ErrAttachedFunctionAlreadyExists = errors.New("the attached function that was being created already exists for this collection")
ErrAttachedFunctionNotFound = errors.New("the requested attached function was not found")
ErrAttachedFunctionNotReady = errors.New("the requested attached function exists but is still initializing")
ErrInvalidAttachedFunctionName = errors.New("attached function name cannot start with reserved prefix '_deleted_'")
ErrHeapServiceNotEnabled = errors.New("heap service is not enabled")
ErrAttachedFunctionAlreadyExists = errors.New("the attached function that was being created already exists for this collection")
ErrAttachedFunctionNotFound = errors.New("the requested attached function was not found")
ErrAttachedFunctionNotReady = errors.New("the requested attached function exists but is still initializing")
ErrInvalidAttachedFunctionName = errors.New("attached function name cannot start with reserved prefix '_deleted_'")
ErrHeapServiceNotEnabled = errors.New("heap service is not enabled")
ErrCannotAttachToOutputCollection = errors.New("cannot attach function to an output collection")

// Function errors
ErrFunctionNotFound = errors.New("function not found")
Expand Down
62 changes: 62 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,68 @@ func (suite *APIsTestSuite) TestDeleteCollectionWithAttachedFunction() {
suite.Equal(int64(1), count)
}

func (suite *APIsTestSuite) TestCannotAttachToOutputCollection() {
ctx := context.Background()

// Create a test collection (input)
inputCollectionID := types.NewUniqueID()
inputCollectionName := "test_input_collection"
createInputCollection := &model.CreateCollection{
ID: inputCollectionID,
Name: inputCollectionName,
TenantID: suite.tenantName,
DatabaseName: suite.databaseName,
}
_, _, err := suite.coordinator.CreateCollection(ctx, createInputCollection)
suite.NoError(err)

// Create a collection that simulates an output collection (has chroma:source_attached_function_id metadata)
outputCollectionID := types.NewUniqueID()
outputCollectionName := "simulated_output_collection"
outputCollectionMetadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]()
outputCollectionMetadata.Add(common.SourceAttachedFunctionIDKey, &model.CollectionMetadataValueStringType{Value: "some-function-id"})
createOutputCollection := &model.CreateCollection{
ID: outputCollectionID,
Name: outputCollectionName,
TenantID: suite.tenantName,
DatabaseName: suite.databaseName,
Metadata: outputCollectionMetadata,
}
_, _, err = suite.coordinator.CreateCollection(ctx, createOutputCollection)
suite.NoError(err)

// Create a dummy function
functionID := uuid.New()
functionName := "test_function_for_output_test"
err = suite.db.Create(&dbmodel.Function{
ID: functionID,
Name: functionName,
IsIncremental: false,
ReturnType: "{}",
}).Error
suite.NoError(err)

// Try to attach function to the output collection - should fail
attachReq := &coordinatorpb.AttachFunctionRequest{
Name: "test_attached_fn",
InputCollectionId: outputCollectionID.String(),
OutputCollectionName: "another_output_collection",
FunctionName: functionName,
TenantId: suite.tenantName,
Database: suite.databaseName,
MinRecordsForInvocation: 100,
Params: &structpb.Struct{Fields: map[string]*structpb.Value{}},
}
_, err = suite.coordinator.AttachFunction(ctx, attachReq)
suite.Error(err)
suite.True(errors.Is(err, common.ErrCannotAttachToOutputCollection))

// Attaching to input collection should succeed
attachReq.InputCollectionId = inputCollectionID.String()
_, err = suite.coordinator.AttachFunction(ctx, attachReq)
suite.NoError(err)
}

func TestAPIsTestSuite(t *testing.T) {
testSuite := new(APIsTestSuite)
suite.Run(t, testSuite)
Expand Down
17 changes: 16 additions & 1 deletion go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
return common.ErrCollectionNotFound
}

// Check if input collection is an output collection (prevent chaining)
inputCollection := collections[0]
for _, meta := range inputCollection.CollectionMetadata {
if meta.Key != nil && *meta.Key == common.SourceAttachedFunctionIDKey {
log.Error("AttachFunction: cannot attach function to an output collection",
zap.String("input_collection_id", req.InputCollectionId))
return common.ErrCannotAttachToOutputCollection
}
}

// Serialize params
var paramsJSON string
if req.Params != nil {
Expand Down Expand Up @@ -574,14 +584,19 @@ func (s *Coordinator) FinishCreateAttachedFunction(ctx context.Context, req *coo

// 5. Create the output collection with segments
dimension := int32(1) // Default dimension for attached function output collections

// Add metadata to mark this as an output collection
outputCollectionMetadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]()
outputCollectionMetadata.Add(common.SourceAttachedFunctionIDKey, &model.CollectionMetadataValueStringType{Value: attachedFunctionID.String()})

collection := &model.CreateCollection{
ID: collectionID,
Name: attachedFunction.OutputCollectionName,
ConfigurationJsonStr: "{}", // Empty JSON object for default config
TenantID: attachedFunction.TenantID,
DatabaseName: database.Name,
Dimension: &dimension,
Metadata: nil,
Metadata: outputCollectionMetadata,
}

// Create segments for the collection (distributed setup)
Expand Down