Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions packages/core/src/tracing/openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
import { SPAN_STATUS_ERROR } from '../../tracing';
import { startSpan, startSpanManual } from '../../tracing/trace';
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
import { handleCallbackErrors } from '../../utils/handleCallbackErrors';
import {
GEN_AI_OPERATION_NAME_ATTRIBUTE,
GEN_AI_REQUEST_AVAILABLE_TOOLS_ATTRIBUTE,
Expand Down Expand Up @@ -126,97 +127,104 @@ function addRequestAttributes(span: Span, params: Record<string, unknown>): void
}
}

/**
* Handle common error catching and reporting for streaming requests
*/
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
captureException(error, {
mechanism: { handled: false, type: 'auto.ai.openai.stream', data: { function: methodPath } },
});

if (span.isRecording()) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
}
throw error;
}

/**
* Instrument a method with Sentry spans
* Following Sentry AI Agents Manual Instrumentation conventions
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
*
* This implementation uses Proxy and handleCallbackErrors to preserve the original
* return type (e.g., OpenAI's APIPromise with .withResponse() method).
*/
function instrumentMethod<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
originalMethod: (...args: T) => R | Promise<R>,
methodPath: InstrumentedMethod,
context: unknown,
options: OpenAiOptions,
): (...args: T) => Promise<R> {
return async function instrumentedMethod(...args: T): Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);
): (...args: T) => R | Promise<R> {
return new Proxy(originalMethod, {
apply(target, _thisArg, args: T): R | Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);

const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;
const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;

if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}
if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}

const result = await originalMethod.apply(context, args);
const result = await target.apply(context, args);

return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
// For streaming requests that fail before stream creation, we still want to record
// them as streaming requests but end the span gracefully
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai.stream',
data: {
function: methodPath,
},
},
});
span.end();
throw error;
}
},
);
} else {
// Non-streaming responses
return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
return handleStreamingError(error, span, methodPath);
}
},
);
}

// Non-streaming responses: use handleCallbackErrors to preserve original return type (e.g., APIPromise)
return startSpan(
{
name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}
span => {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}

const result = await originalMethod.apply(context, args);
addResponseAttributes(span, result, options.recordOutputs);
return result;
} catch (error) {
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
return handleCallbackErrors(
() => target.apply(context, args),
error => {
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
},
},
},
});
throw error;
}
});
},
() => {},
result => addResponseAttributes(span, result as OpenAiResponse, options.recordOutputs),
);
},
);
}
};
},
}) as (...args: T) => R | Promise<R>;
}

/**
Expand Down
150 changes: 147 additions & 3 deletions packages/core/test/tracing/openai-integration-functions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,56 @@ import { beforeEach, describe, expect, it } from 'vitest';
import type { OpenAiClient } from '../../src';
import { instrumentOpenAiClient } from '../../src/tracing/openai';

/**
* Mock APIPromise that simulates OpenAI SDK's APIPromise behavior
* APIPromise extends Promise but has additional methods like withResponse()
*/
class MockAPIPromise<T> extends Promise<T> {
private _response: { headers: Record<string, string> };

constructor(
executor: (resolve: (value: T) => void, reject: (reason?: unknown) => void) => void,
response?: { headers: Record<string, string> },
) {
super(executor);
this._response = response || { headers: { 'x-request-id': 'test-request-id' } };
}

/**
* Simulates OpenAI's APIPromise.withResponse() method
* Returns both the data and the raw response
*/
withResponse(): Promise<{ data: T; response: { headers: Record<string, string> } }> {
return this.then(data => ({
data,
response: this._response,
}));
}

// Override then to return MockAPIPromise to maintain the chain
// This is important for preserving the APIPromise type through .then() chains
override then<TResult1 = T, TResult2 = never>(
onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | null | undefined,
onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null | undefined,
): MockAPIPromise<TResult1 | TResult2> {
const result = super.then(onfulfilled, onrejected);
const apiPromise = new MockAPIPromise<TResult1 | TResult2>((resolve, reject) => {
result.then(resolve, reject);
}, this._response);
return apiPromise;
}
}

interface FullOpenAIClient {
chat: {
completions: {
create: (params: ChatCompletionParams) => Promise<ChatCompletionResponse>;
create: (params: ChatCompletionParams) => MockAPIPromise<ChatCompletionResponse>;
parse: (params: ParseCompletionParams) => Promise<ParseCompletionResponse>;
};
};
embeddings: {
create: (params: EmbeddingsParams) => MockAPIPromise<EmbeddingsResponse>;
};
}
interface ChatCompletionParams {
model: string;
Expand Down Expand Up @@ -48,6 +91,18 @@ interface ParseCompletionResponse {
parsed: { name: string; age: number };
}

interface EmbeddingsParams {
model: string;
input: string | string[];
}

interface EmbeddingsResponse {
object: string;
model: string;
data: Array<{ embedding: number[]; index: number }>;
usage: { prompt_tokens: number; total_tokens: number };
}

/**
* Mock OpenAI client that simulates the private field behavior
* that causes the "Cannot read private member" error
Expand All @@ -59,9 +114,15 @@ class MockOpenAIClient implements FullOpenAIClient {
// Simulate instrumented methods
chat = {
completions: {
create: async (params: ChatCompletionParams): Promise<ChatCompletionResponse> => {
create: (params: ChatCompletionParams): MockAPIPromise<ChatCompletionResponse> => {
this.#buildURL('/chat/completions');
return { id: 'test', model: params.model, choices: [{ message: { content: 'Hello!' } }] };
return new MockAPIPromise(resolve => {
resolve({
id: 'test',
model: params.model,
choices: [{ message: { content: 'Hello!' } }],
});
});
},

// This is NOT instrumented
Expand All @@ -84,6 +145,20 @@ class MockOpenAIClient implements FullOpenAIClient {
},
};

embeddings = {
create: (params: EmbeddingsParams): MockAPIPromise<EmbeddingsResponse> => {
this.#buildURL('/embeddings');
return new MockAPIPromise(resolve => {
resolve({
object: 'list',
model: params.model,
data: [{ embedding: [0.1, 0.2, 0.3], index: 0 }],
usage: { prompt_tokens: 10, total_tokens: 10 },
});
});
},
};

constructor() {
MockOpenAIClient.#privateData.set(this, {
apiKey: 'test-key',
Expand Down Expand Up @@ -208,3 +283,72 @@ describe('OpenAI Integration Private Field Fix', () => {
expect(typeof instrumentedClient.chat.completions.parse).toBe('function');
});
});

describe('OpenAI Integration APIPromise Preservation', () => {
let mockClient: MockOpenAIClient;
let instrumentedClient: FullOpenAIClient & OpenAiClient;

beforeEach(() => {
mockClient = new MockOpenAIClient();
instrumentedClient = instrumentOpenAiClient(mockClient as unknown as OpenAiClient) as FullOpenAIClient &
OpenAiClient;
});

it('should preserve APIPromise.withResponse() method on chat.completions.create', async () => {
const apiPromise = instrumentedClient.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'test' }],
});

// The key test: withResponse() should exist and work
expect(typeof apiPromise.withResponse).toBe('function');

const { data, response } = await apiPromise.withResponse();

expect(data.model).toBe('gpt-4');
expect(data.choices[0]?.message?.content).toBe('Hello!');
expect(response.headers).toEqual({ 'x-request-id': 'test-request-id' });
});

it('should preserve APIPromise.withResponse() method on embeddings.create', async () => {
const apiPromise = instrumentedClient.embeddings.create({
model: 'text-embedding-3-small',
input: 'test input',
});

// The key test: withResponse() should exist and work
expect(typeof apiPromise.withResponse).toBe('function');

const { data, response } = await apiPromise.withResponse();

expect(data.model).toBe('text-embedding-3-small');
expect(data.data[0]?.embedding).toEqual([0.1, 0.2, 0.3]);
expect(response.headers).toEqual({ 'x-request-id': 'test-request-id' });
});

it('should still work with regular await on instrumented methods', async () => {
// Ensure the basic Promise behavior still works
const result = await instrumentedClient.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'test' }],
});

expect(result.model).toBe('gpt-4');
expect(result.choices[0]?.message?.content).toBe('Hello!');
});

it('should preserve APIPromise through .then() chains', async () => {
const apiPromise = instrumentedClient.embeddings.create({
model: 'text-embedding-3-small',
input: 'test',
});

// Chain a .then() and verify withResponse still exists
const chainedPromise = apiPromise.then(data => data);

// After .then(), withResponse should still be available (if the original type is preserved)
// Note: This depends on handleCallbackErrors returning the original Promise type
const result = await chainedPromise;
expect(result.model).toBe('text-embedding-3-small');
});
});