Flow 是封装函数,与直接函数相比,具有一些其他特性 电话类型:强类型、可流式传输、支持本地和远程呼叫; 完全可观察。 Firebase Genkit 提供用于运行和调试流程的 CLI 和开发者界面工具。
定义数据流
最简单的形式是,Flow 只封装一个函数:
Go
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
这样,您就可以从 Genkit CLI 和开发者界面运行该函数, Genkit 的许多功能,包括部署和 可观测性。
与直接调用模型 API 相比,Genkit 流程的一个重要优势是 输入和输出的类型安全:
Go
数据流的参数和结果类型可以是简单值或结构化值。
Genkit 会使用
invopop/jsonschema
.
以下数据流将 string
作为输入并输出 struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
正在运行的数据流
如需在代码中运行 Flow,请执行以下操作:
Go
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
您也可以使用 CLI 运行流:
genkit flow:run menuSuggestionFlow '"French"'
直播
下面是一个可流式传输值的数据流的简单示例:
Go
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
请注意,流式传输回调可以未定义。只有在 正在请求流式传输的响应。
如需在流处理模式下调用流,请执行以下操作:
Go
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
} else {
return false
}
})
如果数据流未实现流式传输,则 StreamFlow()
的行为方式与
RunFlow()
.
您也可以使用 CLI 对流进行流式传输:
genkit flow:run menuSuggestionFlow '"French"' -s
部署流
如果您希望能够通过 HTTP 访问您的数据流,则需要部署该数据流 。
Go
如需使用 Cloud Run 和类似服务部署流,请定义您的流,以及
然后调用 Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
会启动 net/http
服务器,以 HTTP 的形式公开您的流
endpoints(例如 http://localhost:3400/menuSuggestionFlow
)。
第二个参数是可选 Options
,用于指定以下内容:
FlowAddr
:要监听的地址和端口。如果未指定, 服务器监听 PORT 环境变量指定的端口; 如果此项为空,则系统会使用默认的 3400 端口Flows
:要处理的数据流。如果未指定,则Init
会投放所有 定义的流。
如果您想在与其他端点相同的主机和端口上处理流,
可以将 FlowAddr
设置为 -
,并改为调用 NewFlowServeMux()
来获取处理程序
Genkit 流程,您可以将其与其他路由处理程序进行多路复用:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
数据流可观测性
有时,在使用未针对可观测性进行插桩的第三方 SDK 时,
您可能希望在开发者界面中将它们视为单独的跟踪步骤。全部
只需将代码封装在 run
函数中即可。
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})