数据流是一些具备一些附加特征的函数: 类型化、可流式传输、本地和远程调用以及完全可观察。 Firebase Genkit 提供用于处理流程的 CLI 和开发者界面工具 (运行、调试等)。
定义 flow
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
可以使用 zod
定义流的输入和输出架构。
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
指定架构后,Genkit 会验证输入和输出的架构。
运行 flow
使用 runFlow
函数运行该数据流:
const response = await runFlow(menuSuggestionFlow, 'French');
您还可以使用 CLI 运行 flow:
genkit flow:run menuSuggestionFlow '"French"'
流式
下面是一个简单的数据流示例,该数据流可从数据流中流式传输值:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
请注意,可以未定义 streamingCallback
。只有在调用客户端请求流式回答时才会定义。
如需在流处理模式下调用数据流,请使用 streamFlow
函数:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
如果数据流未实现流式处理,streamFlow
的行为将与 runFlow
相同。
您还可以使用 CLI 对 flow 进行流式处理:
genkit flow:run menuSuggestionFlow '"French"' -s
部署 flow
如果您希望能够通过 HTTP 访问您的数据流,则需要部署该数据流 。Genkit 与 Cloud Functions for Firebase 和 Express.js 主机(例如 Cloud Run)。
已部署的流支持与本地流相同的所有功能(例如流式传输和 可观测性)。
Cloud Functions for Firebase
如需将数据流与 Cloud Functions for Firebase 搭配使用,请使用 firebase
插件,将 defineFlow
替换为 onFlow
并包含 authPolicy
。
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
如需使用 Cloud Run 和类似服务部署数据流,请使用 defineFlow
定义数据流,然后调用 startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
默认情况下,startFlowsServer
会将您在代码库中定义的所有数据流作为 HTTP 端点(例如 http://localhost:3400/menuSuggestionFlow
)进行处理。
您可以选择通过流服务器公开哪些流。您可以指定自定义端口(如已设置,该端口将使用 PORT
环境变量)。您还可以设定 CORS 设置。
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
Flow 可观测性
有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run
函数中即可。
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);