流程

数据流是一些具备一些附加特征的函数: 类型化、可流式传输、本地和远程调用以及完全可观察。 Firebase Genkit 提供用于处理流程的 CLI 和开发者界面工具 (运行、调试等)。

定义 flow

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

可以使用 zod 定义流的输入和输出架构。

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

指定架构后,Genkit 会验证输入和输出的架构。

运行 flow

使用 runFlow 函数运行该数据流:

const response = await runFlow(menuSuggestionFlow, 'French');

您还可以使用 CLI 运行 flow:

genkit flow:run menuSuggestionFlow '"French"'

流式

下面是一个简单的数据流示例,该数据流可从数据流中流式传输值:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

请注意,可以未定义 streamingCallback。只有在调用客户端请求流式回答时才会定义。

如需在流处理模式下调用数据流,请使用 streamFlow 函数:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

如果数据流未实现流式处理,streamFlow 的行为将与 runFlow 相同。

您还可以使用 CLI 对 flow 进行流式处理:

genkit flow:run menuSuggestionFlow '"French"' -s

部署 flow

如果您希望能够通过 HTTP 访问您的数据流,则需要部署该数据流 。Genkit 与 Cloud Functions for Firebase 和 Express.js 主机(例如 Cloud Run)。

已部署的流支持与本地流相同的所有功能(例如流式传输和 可观测性)。

Cloud Functions for Firebase

如需将数据流与 Cloud Functions for Firebase 搭配使用,请使用 firebase 插件,将 defineFlow 替换为 onFlow 并包含 authPolicy

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

如需使用 Cloud Run 和类似服务部署数据流,请使用 defineFlow 定义数据流,然后调用 startFlowsServer()

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

默认情况下,startFlowsServer 会将您在代码库中定义的所有数据流作为 HTTP 端点(例如 http://localhost:3400/menuSuggestionFlow)进行处理。

您可以选择通过流服务器公开哪些流。您可以指定自定义端口(如已设置,该端口将使用 PORT 环境变量)。您还可以设定 CORS 设置。

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Flow 可观测性

有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run 函数中即可。

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);