Flow 是具有一些额外特性的函数:它们是强类型、可流式传输、可在本地和远程调用,并且可完全观测。Firebase Genkit 提供用于处理流程的 CLI 和开发者界面工具 (运行、调试等)。
定义 flow
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
可以使用 zod
定义流的输入和输出架构。
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
指定架构后,Genkit 会验证输入和输出的架构。
运行 flow
使用 runFlow
函数运行流程:
const response = await runFlow(menuSuggestionFlow, 'French');
您还可以使用 CLI 运行 flow:
genkit flow:run menuSuggestionFlow '"French"'
流式
下面是一个简单的数据流示例,该数据流可从数据流中流式传输值:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
请注意,可以未定义 streamingCallback
。只有在调用客户端请求流式回答时才会定义。
如需在流处理模式下调用数据流,请使用 streamFlow
函数:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
如果流未实现流式处理,则 streamFlow
的行为与 runFlow
相同。
您还可以使用 CLI 对 flow 进行流式处理:
genkit flow:run menuSuggestionFlow '"French"' -s
部署 flow
如果您希望能够通过 HTTP 访问 flow,则需要先部署 flow。Genkit 与 Cloud Functions for Firebase 和 Express.js 主机(例如 Cloud Run)。
已部署的流支持与本地流相同的所有功能(例如流式传输和 可观测性)。
Cloud Functions for Firebase
如需将流程与 Cloud Functions for Firebase 搭配使用,请使用 firebase
插件,将 defineFlow
替换为 onFlow
,并添加 authPolicy
。
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
如需使用 Cloud Run 和类似服务部署 flow,请使用 defineFlow
定义 flow,然后调用 startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
默认情况下,startFlowsServer
会处理您在代码库中定义为 HTTP 端点(例如 http://localhost:3400/menuSuggestionFlow
)的所有流程。您可以通过 POST 请求调用流程,如下所示:
curl -X POST "http://localhost:3400/menuSuggestionFlow" -H "Content-Type: application/json" -d '{"data": "banana"}'
如果需要,您可以自定义流服务器以提供特定的流列表,如下所示。您还可以指定自定义端口(如果设置了 PORT
环境变量,则会使用该变量)或指定 CORS 设置。
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
Flow 可观测性
有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run
函数中即可。
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);