流程

Flow 是封装函数,与直接函数相比,具有一些其他特性 电话类型:强类型、可流式传输、支持本地和远程呼叫; 完全可观察。 Firebase Genkit 提供用于运行和调试流程的 CLI 和开发者界面工具。

定义数据流

最简单的形式是,Flow 只封装一个函数:

Go

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

这样,您就可以从 Genkit CLI 和开发者界面运行该函数, Genkit 的许多功能,包括部署和 可观测性。

与直接调用模型 API 相比,Genkit 流程的一个重要优势是 输入和输出的类型安全:

Go

数据流的参数和结果类型可以是简单值或结构化值。 Genkit 会使用 invopop/jsonschema.

以下数据流将 string 作为输入并输出 struct

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

正在运行的数据流

如需在代码中运行 Flow,请执行以下操作:

Go

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

您也可以使用 CLI 运行流:

genkit flow:run menuSuggestionFlow '"French"'

直播

下面是一个可流式传输值的数据流的简单示例:

Go

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

请注意,流式传输回调可以未定义。只有在 正在请求流式传输的响应。

如需在流处理模式下调用流,请执行以下操作:

Go

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

如果数据流未实现流式传输,则 StreamFlow() 的行为方式与 RunFlow().

您也可以使用 CLI 对流进行流式传输:

genkit flow:run menuSuggestionFlow '"French"' -s

部署流

如果您希望能够通过 HTTP 访问您的数据流,则需要部署该数据流 。

Go

如需使用 Cloud Run 和类似服务部署流,请定义您的流,以及 然后调用 Init()

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init 会启动 net/http 服务器,以 HTTP 的形式公开您的流 endpoints(例如 http://localhost:3400/menuSuggestionFlow)。

第二个参数是可选 Options,用于指定以下内容:

  • FlowAddr:要监听的地址和端口。如果未指定, 服务器监听 PORT 环境变量指定的端口; 如果此项为空,则系统会使用默认的 3400 端口
  • Flows:要处理的数据流。如果未指定,则 Init 会投放所有 定义的流。

如果您想在与其他端点相同的主机和端口上处理流, 可以将 FlowAddr 设置为 -,并改为调用 NewFlowServeMux() 来获取处理程序 Genkit 流程,您可以将其与其他路由处理程序进行多路复用:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

数据流可观测性

有时,在使用未针对可观测性进行插桩的第三方 SDK 时, 您可能希望在开发者界面中将它们视为单独的跟踪步骤。全部 只需将代码封装在 run 函数中即可。

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })