Los flujos son funciones unidas con algunas características adicionales con respecto a las llamadas directas: se aplican tipos estrictos, se pueden transmitir, se pueden llamar de manera local y remota, y son totalmente observables. Firebase Genkit proporciona herramientas de CLI y de IU para desarrolladores para ejecutar y depurar flujos.
Define flujos
En su forma más sencilla, un flujo solo une una función:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
Hacerlo te permite ejecutar la función desde la CLI de Genkit y la IU para desarrolladores, y es un requisito para muchas de las funciones de Genkit, incluidas las funciones de observabilidad.
Una ventaja importante que tienen los flujos de Genkit en comparación con llamar directamente a una API de modelo es la seguridad de tipos de entrada y salida.
Los tipos de argumento y resultado de un flujo pueden ser valores simples o estructurados.
Genkit producirá esquemas JSON para estos valores con invopop/jsonschema
.
El siguiente flujo toma un string
como entrada y genera un struct
como resultado:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Flujos en ejecución
Para ejecutar un flujo en tu código, haz lo siguiente:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
También puedes usar la CLI para ejecutar flujos:
genkit flow:run menuSuggestionFlow '"French"'
Transmitido
El siguiente es un ejemplo sencillo de un flujo que puede transmitir valores:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
Ten en cuenta que la devolución de llamada de transmisión puede ser indefinida. Solo se define si el cliente que invoca solicita una respuesta transmitida.
Para invocar un flujo en modo de transmisión, sigue estos pasos:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
Si el flujo no implementa la transmisión, StreamFlow()
se comporta de manera idéntica a RunFlow()
También puedes usar la CLI para transmitir flujos:
genkit flow:run menuSuggestionFlow '"French"' -s
Implementa flujos
Si quieres acceder a tu flujo a través de HTTP, deberás implementarlo antes de empezar.
Para implementar flujos con Cloud Run y servicios similares, define tus flujos y, luego, llama a Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
inicia un servidor net/http
que expone tus flujos como extremos HTTP (por ejemplo, http://localhost:3400/menuSuggestionFlow
).
El segundo parámetro es un Options
opcional que especifica lo siguiente:
FlowAddr
: Es la dirección y el puerto que se escucharán. Si no se especifica, el servidor escucha en el puerto especificado por la variable de entorno PORT; si está vacío, usa el puerto predeterminado 3400.Flows
: Qué flujos se deben entregar. Si no se especifica,Init
publica todos los flujos definidos.
Si quieres entregar flujos en el mismo host y puerto que otros extremos, puedes establecer FlowAddr
en -
y, en su lugar, llamar a NewFlowServeMux()
para obtener un controlador para tus flujos de Genkit, que puedes multiplexar con tus otros controladores de rutas:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Observabilidad del flujo
A veces, cuando se usan SDKs de terceros que no están instrumentados para la observabilidad, es posible que quieras verlos como un paso de registro separado en la IU para desarrolladores. Todo lo que debes hacer es unir el código en la función run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})