資料流是具有某些額外特性的函式:其具有強類型、可串流、本機和遠端呼叫,且可完整觀測。Firebase Genkit 提供 CLI 和開發人員 UI 工具,以處理流程 (執行、偵錯等)。
定義流程
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
使用 zod
可以定義流程的輸入和輸出結構定義。
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
指定結構定義後,Genkit 就會驗證輸入和輸出的結構定義。
執行中的流程
使用 runFlow
函式執行資料流:
const response = await runFlow(menuSuggestionFlow, 'French');
此外,您也可以使用 CLI 執行流程:
genkit flow:run menuSuggestionFlow '"French"'
直播結束
以下的簡單範例可從資料流串流值:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
請注意,streamingCallback
可以未定義。只有在叫用用戶端要求串流回應時,才需定義此項目。
如要在串流模式下叫用資料流,請使用 streamFlow
函式:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
如果資料流未實作串流,streamFlow
的行為會與 runFlow
相同。
此外,您也可以使用 CLI 來串流資料流:
genkit flow:run menuSuggestionFlow '"French"' -s
部署流程
如果您想透過 HTTP 存取流程,請先部署流程。Genkit 可整合 Cloud Functions for Firebase 和 Express.js 主機 (例如 Cloud Run)。
已部署的流程支援與本機流程相同的所有功能 (例如串流和觀測能力)。
Firebase 專用 Cloud 函式
如要將流程與 Cloud Functions for Firebase 搭配使用,請使用 firebase
外掛程式,請將 defineFlow
替換為 onFlow
並加入 authPolicy
。
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
如要使用 Cloud Run 和類似服務部署流程,請使用 defineFlow
定義資料流,然後呼叫 startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
根據預設,startFlowsServer
會提供您在程式碼集定義的所有資料流,做為 HTTP 端點 (例如 http://localhost:3400/menuSuggestionFlow
)。
您可以選擇要透過流程伺服器公開的流程。您可以指定自訂通訊埠 (如果設定,系統會使用 PORT
環境變數)。你也可以配置 CORS 設定。
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
流程觀測能力
有時候,使用沒有檢測功能檢測的第三方 SDK 時,您可能會想在開發人員 UI 中將這些 SDK 顯示為獨立的追蹤步驟。只需將程式碼納入 run
函式即可。
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);