Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
dd9abea
real time telemetry updates
huangjeff5 Oct 31, 2025
f23e5e6
Merge branch 'main' into jh-telemetry
huangjeff5 Oct 31, 2025
97160a5
updates
huangjeff5 Nov 13, 2025
bc40323
Merge branch 'main' into jh-telemetry
huangjeff5 Nov 17, 2025
9685232
Expose streamTrace endpoint
huangjeff5 Nov 18, 2025
7a8f6bf
Merge branch 'main' into jh-telemetry
huangjeff5 Nov 18, 2025
4dbcc96
expose header, fix conflcit
huangjeff5 Nov 18, 2025
5f88eba
Broadcast events instead of spans
huangjeff5 Nov 18, 2025
21d9387
sort spans before broadcasting
huangjeff5 Nov 19, 2025
d8b0420
dedupe list
huangjeff5 Nov 19, 2025
471eac4
Add cancelAction
huangjeff5 Nov 20, 2025
58166c3
Update genkit schema and cancel action added to trpc endpoitns
huangjeff5 Nov 20, 2025
ad93b2e
Merge branch 'main' into jh-telemetry
huangjeff5 Dec 2, 2025
d960f57
Fix conflicts
huangjeff5 Dec 2, 2025
489f4bb
fix bad merge
huangjeff5 Dec 3, 2025
529b182
fix
huangjeff5 Dec 3, 2025
8b9948c
format
huangjeff5 Dec 3, 2025
6e0d671
Address fedback
huangjeff5 Dec 4, 2025
aff698c
Update genkit-tools/common/src/types/trace.ts
huangjeff5 Dec 4, 2025
b9b07a7
format
huangjeff5 Dec 4, 2025
6886be5
Merge branch 'jh-telemetry' of https://github.com/firebase/genkit int…
huangjeff5 Dec 4, 2025
ae2cf07
Update genkit-tools/telemetry-server/src/file-trace-store.ts
huangjeff5 Dec 4, 2025
794ceda
Address comments
huangjeff5 Dec 4, 2025
5ffae2f
Merge branch 'jh-telemetry' of https://github.com/firebase/genkit int…
huangjeff5 Dec 4, 2025
8e435da
Format
huangjeff5 Dec 4, 2025
edc2e57
update genkit schema
huangjeff5 Dec 4, 2025
1b09d0d
remove sending Access-Control-Allow-Headers
huangjeff5 Dec 4, 2025
471b792
handle throwy cases
huangjeff5 Dec 4, 2025
10dfb55
fix
huangjeff5 Dec 5, 2025
5ac0738
fix type
huangjeff5 Dec 5, 2025
46c7897
use cors
huangjeff5 Dec 8, 2025
6a8779c
append data prefix, env var gate
huangjeff5 Dec 8, 2025
868e40e
Add realtime telemetry flag
huangjeff5 Dec 8, 2025
31ff56f
Merge branch 'main' into jh-telemetry
huangjeff5 Dec 8, 2025
a6268fd
Delete file store on cancel action
huangjeff5 Dec 9, 2025
fff4b43
fix build
huangjeff5 Dec 9, 2025
13bf7bc
feat(go): Add cancelAction and early trace ID to headers in go reflec…
huangjeff5 Dec 9, 2025
3dde26c
prevent auto instrumentation
huangjeff5 Dec 9, 2025
ade58ce
Merge branch 'jh-telemetry' of https://github.com/firebase/genkit int…
huangjeff5 Dec 9, 2025
47fa9be
revert changes to file trace store
huangjeff5 Dec 9, 2025
04e5c5c
flush headers at the right time
huangjeff5 Dec 10, 2025
d89ad26
Fix SSE parsing bug and flag
huangjeff5 Dec 10, 2025
affcf84
format
huangjeff5 Dec 10, 2025
fba76cb
return error in non streaming case
huangjeff5 Dec 10, 2025
d85c51f
handle race condition, fix test
huangjeff5 Dec 10, 2025
dde28a5
pass enableRealtimeTelemetry in RuntimeManager, control whether to se…
huangjeff5 Dec 10, 2025
2c01e41
fix
huangjeff5 Dec 12, 2025
d1a9874
rename to early trace id
huangjeff5 Dec 12, 2025
ba15152
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
71961bb
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
8b0e3b1
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
be25ff4
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
3085fa5
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
40ab1e0
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
7405bd7
Update js/testapps/flow-sample1/src/index.ts
huangjeff5 Dec 12, 2025
ed10c60
pass args directly
huangjeff5 Dec 12, 2025
4571fab
Merge branch 'jh-telemetry' of https://github.com/firebase/genkit int…
huangjeff5 Dec 12, 2025
d1125d4
format
huangjeff5 Dec 12, 2025
2208098
Merge branch 'main' into jh-telemetry
huangjeff5 Dec 12, 2025
39d131e
fix build
huangjeff5 Dec 12, 2025
3a40a88
return trace ID in runAction error response
huangjeff5 Dec 12, 2025
b0df38b
set attributes on span creation so they are exported on span start
huangjeff5 Dec 12, 2025
7ac7345
format
huangjeff5 Dec 15, 2025
da9a55c
Merge branch 'main' into jh-telemetry
huangjeff5 Dec 15, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion genkit-tools/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ interface RunOptions {
noui?: boolean;
port?: string;
open?: boolean;
experimentalRealtimeTelemetry?: boolean;
}

/** Command to run code in dev mode and/or the Dev UI. */
Expand All @@ -34,6 +35,10 @@ export const start = new Command('start')
.option('-n, --noui', 'do not start the Dev UI', false)
.option('-p, --port <port>', 'port for the Dev UI')
.option('-o, --open', 'Open the browser on UI start up')
.option(
'--experimental-realtime-telemetry',
'Enable real-time telemetry streaming (experimental)'
)
.action(async (options: RunOptions) => {
const projectRoot = await findProjectRoot();
if (projectRoot.includes('/.Trash/')) {
Expand All @@ -49,7 +54,8 @@ export const start = new Command('start')
const result = await startDevProcessManager(
projectRoot,
start.args[0],
start.args.slice(1)
start.args.slice(1),
{ enableRealtimeTelemetry: options.experimentalRealtimeTelemetry }
);
manager = result.manager;
processPromise = result.processPromise;
Expand Down
16 changes: 13 additions & 3 deletions genkit-tools/cli/src/utils/manager-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,31 @@ export async function startManager(
return manager;
}

export interface DevProcessManagerOptions {
enableRealtimeTelemetry?: boolean;
}

export async function startDevProcessManager(
projectRoot: string,
command: string,
args: string[]
args: string[],
options?: DevProcessManagerOptions
): Promise<{ manager: RuntimeManager; processPromise: Promise<void> }> {
const telemetryServerUrl = await resolveTelemetryServer(projectRoot);
const processManager = new ProcessManager(command, args, {
const envVars: Record<string, string> = {
GENKIT_TELEMETRY_SERVER: telemetryServerUrl,
GENKIT_ENV: 'dev',
});
};
if (options?.enableRealtimeTelemetry) {
envVars.GENKIT_ENABLE_REALTIME_TELEMETRY = 'true';
}
const processManager = new ProcessManager(command, args, envVars);
const manager = await RuntimeManager.create({
telemetryServerUrl,
manageHealth: true,
projectRoot,
processManager,
enableRealtimeTelemetry: options?.enableRealtimeTelemetry,
});
const processPromise = processManager.start();
return { manager, processPromise };
Expand Down
4 changes: 3 additions & 1 deletion genkit-tools/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
},
"dependencies": {
"@asteasolutions/zod-to-openapi": "^7.0.0",
"@inquirer/prompts": "^7.8.0",
"@trpc/server": "^10.45.2",
"adm-zip": "^0.5.12",
"ajv": "^8.12.0",
Expand All @@ -20,10 +21,10 @@
"colorette": "^2.0.20",
"commander": "^11.1.0",
"configstore": "^5.0.1",
"cors": "^2.8.5",
"express": "^4.21.0",
"get-port": "5.1.1",
"glob": "^10.3.12",
"@inquirer/prompts": "^7.8.0",
"js-yaml": "^4.1.0",
"json-2-csv": "^5.5.1",
"json-schema": "^0.4.0",
Expand All @@ -41,6 +42,7 @@
"@types/body-parser": "^1.19.5",
"@types/cli-color": "^2.0.6",
"@types/configstore": "^6.0.2",
"@types/cors": "^2.8.19",
"@types/express": "^4.17.21",
"@types/inquirer": "^8.1.3",
"@types/jest": "^29.5.12",
Expand Down
194 changes: 187 additions & 7 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ interface RuntimeManagerOptions {
projectRoot: string;
/** An optional process manager for the main application process. */
processManager?: ProcessManager;
/** Whether realtime telemetry streaming is enabled. */
enableRealtimeTelemetry?: boolean;
}

export class RuntimeManager {
readonly processManager?: ProcessManager;
readonly enableRealtimeTelemetry: boolean;
private filenameToRuntimeMap: Record<string, RuntimeInfo> = {};
private filenameToDevUiMap: Record<string, DevToolsInfo> = {};
private idToFileMap: Record<string, string> = {};
Expand All @@ -75,9 +78,11 @@ export class RuntimeManager {
readonly telemetryServerUrl: string | undefined,
private manageHealth: boolean,
readonly projectRoot: string,
processManager?: ProcessManager
processManager?: ProcessManager,
enableRealtimeTelemetry?: boolean
) {
this.processManager = processManager;
this.enableRealtimeTelemetry = enableRealtimeTelemetry ?? false;
}

/**
Expand All @@ -88,7 +93,8 @@ export class RuntimeManager {
options.telemetryServerUrl,
options.manageHealth ?? true,
options.projectRoot,
options.processManager
options.processManager,
options.enableRealtimeTelemetry
);
await manager.setupRuntimesWatcher();
await manager.setupDevUiWatcher();
Expand Down Expand Up @@ -197,7 +203,8 @@ export class RuntimeManager {
*/
async runAction(
input: apis.RunActionRequest,
streamingCallback?: StreamingCallback<any>
streamingCallback?: StreamingCallback<any>,
onTraceId?: (traceId: string) => void
): Promise<RunActionResponse> {
const runtime = input.runtimeId
? this.getRuntimeById(input.runtimeId)
Expand Down Expand Up @@ -226,6 +233,12 @@ export class RuntimeManager {
if (response.headers['x-genkit-version']) {
genkitVersion = response.headers['x-genkit-version'];
}

const traceId = response.headers['x-genkit-trace-id'];
if (traceId && onTraceId) {
onTraceId(traceId);
}

const stream = response.data;

let buffer = '';
Expand Down Expand Up @@ -280,20 +293,115 @@ export class RuntimeManager {
});
return promise;
} else {
// runAction should use chunked JSON streaming to send early headers
const response = await axios
.post(`${runtime.reflectionServerUrl}/api/runAction`, input, {
headers: {
'Content-Type': 'application/json',
},
responseType: 'stream', // Use stream to get early headers
})
.catch((err) =>
this.httpErrorHandler(err, `Error running action key='${input.key}'.`)
);
const resp = RunActionResponseSchema.parse(response.data);
if (response.headers['x-genkit-version']) {
resp.genkitVersion = response.headers['x-genkit-version'];

const traceId = response.headers['x-genkit-trace-id'];
if (traceId && onTraceId) {
onTraceId(traceId);
}
return resp;

return new Promise<RunActionResponse>((resolve, reject) => {
let buffer = '';

response.data.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
});

response.data.on('end', () => {
try {
const responseData = JSON.parse(buffer);

if (responseData.error) {
const err = new GenkitToolsError(
`Error running action key='${input.key}'.`
);
// massage the error into a shape dev ui expects
err.data = {
...responseData.error,
stack: (responseData.error?.details as any).stack,
data: {
genkitErrorMessage: responseData.error?.message,
genkitErrorDetails: responseData.error?.details,
},
};
reject(err);
return;
}

// Handle backward compatibility - add trace ID from header if not in body
if (!responseData.telemetry && traceId) {
responseData.telemetry = { traceId: traceId };
}

const parsed = RunActionResponseSchema.parse(responseData);
if (response.headers['x-genkit-version']) {
parsed.genkitVersion = response.headers['x-genkit-version'];
}
resolve(parsed);
} catch (err) {
reject(new GenkitToolsError(`Failed to parse response: ${err}`));
}
});

response.data.on('error', (err: Error) => {
reject(err);
});
});
}
}

/**
* Cancels an in-flight action by trace ID
*/
async cancelAction(input: {
traceId: string;
runtimeId?: string;
}): Promise<{ message: string }> {
const runtime = input.runtimeId
? this.getRuntimeById(input.runtimeId)
: this.getMostRecentRuntime();
if (!runtime) {
throw new Error(
input.runtimeId
? `No runtime found with ID ${input.runtimeId}.`
: 'No runtimes found. Make sure your app is running.'
);
}

try {
const response = await axios.post(
`${runtime.reflectionServerUrl}/api/cancelAction`,
{ traceId: input.traceId },
{
headers: {
'Content-Type': 'application/json',
},
}
);
return response.data;
} catch (err) {
const axiosError = err as AxiosError;
if (axiosError.response?.status === 404) {
const error = new GenkitToolsError(
'Action not found or already completed'
);
error.data = {
message: 'Action not found or already completed',
} as any;
(error.data as any).statusCode = 404;
throw error;
}
throw this.httpErrorHandler(axiosError);
}
}

Expand Down Expand Up @@ -347,6 +455,78 @@ export class RuntimeManager {
return response.data as TraceData;
}

/**
* Streams trace updates in real-time from the telemetry server.
* Connects to the telemetry server's SSE endpoint and forwards updates via callback.
*/
async streamTrace(
input: apis.StreamTraceRequest,
streamingCallback: StreamingCallback<any>
): Promise<void> {
const { traceId } = input;

if (!this.telemetryServerUrl) {
throw new Error(
'Telemetry server URL not configured. Cannot stream trace updates.'
);
}

const response = await axios
.get(`${this.telemetryServerUrl}/api/traces/${traceId}/stream`, {
headers: {
Accept: 'text/event-stream',
},
responseType: 'stream',
})
.catch((err) =>
this.httpErrorHandler(
err,
`Error streaming trace for traceId='${traceId}'`
)
);

const stream = response.data;
let buffer = '';

// Return a promise that resolves when the stream ends
return new Promise<void>((resolve, reject) => {
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();

// Process complete messages (ending with \n\n)
while (buffer.includes('\n\n')) {
const messageEnd = buffer.indexOf('\n\n');
const message = buffer.substring(0, messageEnd).trim();
buffer = buffer.substring(messageEnd + 2);

// Skip empty messages
if (!message) {
continue;
}
// Parse SSE data line - strip "data: " prefix
try {
const jsonData = message.startsWith('data: ')
? message.slice(6)
: message;
const parsed = JSON.parse(jsonData);
streamingCallback(parsed);
} catch (err) {
logger.error(`Error parsing stream data: ${err}`);
}
}
});

stream.on('end', () => {
resolve();
});

stream.on('error', (err: Error) => {
logger.error(`Stream error for traceId='${traceId}': ${err}`);
reject(err);
});
});
}

/**
* Adds a trace to the trace store
*/
Expand Down
15 changes: 8 additions & 7 deletions genkit-tools/common/src/server/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { GenkitToolsError, type RuntimeInfo } from '../manager/types';
import { TraceDataSchema } from '../types';
import type { Action } from '../types/action';
import * as apis from '../types/apis';
import { CancelActionRequestSchema } from '../types/apis';
import type { EnvironmentVariable } from '../types/env';
import * as evals from '../types/eval';
import type { PromptFrontmatter } from '../types/prompt';
Expand Down Expand Up @@ -133,13 +134,6 @@ export const TOOLS_SERVER_ROUTER = (manager: RuntimeManager) =>
return manager.listActions(input);
}),

/** Runs an action. */
runAction: loggedProcedure
.input(apis.RunActionRequestSchema)
.mutation(async ({ input }) => {
return manager.runAction(input);
}),

/** Generate a .prompt file from messages and model config. */
createPrompt: loggedProcedure
.input(apis.CreatePromptRequestSchema)
Expand Down Expand Up @@ -328,6 +322,13 @@ export const TOOLS_SERVER_ROUTER = (manager: RuntimeManager) =>
await manager.processManager?.kill();
return true;
}),

/** Cancels a long-running action. */
cancelAction: loggedProcedure
.input(CancelActionRequestSchema)
.mutation(async ({ input }) => {
return manager.cancelAction(input);
}),
});

export type ToolsServerRouter = ReturnType<typeof TOOLS_SERVER_ROUTER>;
Loading
Loading