Przepływy to funkcje o zaawansowanych właściwościach, które wyróżniają je na tle wywołań bezpośrednich: są silnie typowane, mogą być przesyłane strumieniowo, można je wywoływać lokalnie i zdalnie oraz są w pełni obserwowalne. Firebase Genkit udostępnia narzędzia wiersza poleceń i interfejsu dla deweloperów do uruchamiania i debugowania przepływów.
Definiowanie przepływów
W najprostszej postaci przepływ obejmuje tylko funkcję:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
Dzięki temu możesz uruchamiać funkcję z poziomu interfejsu wiersza poleceń Genkit i interfejsu użytkownika dewelopera. Jest to wymagane w przypadku wielu funkcji Genkit, w tym wdrażania i możliwości obserwacji.
Ważną zaletą przepływów Genkit w porównaniu z bezpośrednim wywołaniem interfejsu API modelu jest ochrona przed błędami typu w przypadku zarówno danych wejściowych, jak i wyjściowych.
Typy argumentów i wyników przepływu mogą być proste lub strukturalne.
Genkit wygeneruje schematy JSON dla tych wartości za pomocą invopop/jsonschema
.
Ten przepływ danych przyjmuje jako dane wejściowe element string
, a jako dane wyjściowe zwraca element struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Uruchomione przepływy
Aby uruchomić przepływ w kodzie:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
Do uruchamiania przepływów możesz też używać interfejsu wiersza poleceń:
genkit flow:run menuSuggestionFlow '"French"'
Transmisja zakończona
Oto prosty przykład przepływu, który może przesyłać strumieniowo wartości:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
Pamiętaj, że funkcja wywołania zwrotnego strumieniowego przesyłania danych może być niezdefiniowana. Jest on zdefiniowany tylko wtedy, gdy wywołujący klient żąda odpowiedzi strumieniowej.
Aby wywołać przepływ w trybie strumieniowania:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
Jeśli przepływ nie obsługuje strumieniowego przesyłania danych, element StreamFlow()
działa tak samo jak element RunFlow()
.
Do strumieniowego przesyłania przepływów możesz też użyć interfejsu wiersza poleceń:
genkit flow:run menuSuggestionFlow '"French"' -s
Wdrażanie przepływów
Jeśli chcesz uzyskać dostęp do przepływu przez HTTP, musisz go najpierw wdrożyć.
Aby wdrożyć przepływy za pomocą Cloud Run i podobnych usług, zdefiniuj przepływy, a następnie wywołaj Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
uruchamia serwer net/http
, który ujawnia przepływy jako punkty końcowe HTTP (np. http://localhost:3400/menuSuggestionFlow
).
Drugi parametr to opcjonalny parametr Options
, który określa:
FlowAddr
: adres i port, na których ma być nasłuchiwany ruch. Jeśli nie jest ona określona, serwer nasłuchuje na porcie określonym przez zmienną środowiskową PORT. Jeśli jest ona pusta, serwer używa domyślnego portu 3400.Flows
: które przepływy mają być wyświetlane. Jeśli go nie podasz,Init
obsługuje wszystkie zdefiniowane przepływy.
Jeśli chcesz obsługiwać przepływy na tym samym hoście i porcie co inne punkty końcowe, możesz ustawić wartość FlowAddr
na -
i zamiast tego wywołać NewFlowServeMux()
, aby uzyskać moduł obsługi przepływów Genkit, który możesz zmultipleksować z innymi modułami obsługi trasy:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Obserwowalność przepływu
Czasami, gdy używasz zewnętrznych pakietów SDK, które nie są wyposażone w obserwowalność, możesz chcieć zobaczyć je jako osobny krok śledzenia w interfejsie dewelopera. Wystarczy, że spróbujesz umieścić kod w funkcji run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})