Przepływy

Przepływy to funkcje o zaawansowanych właściwościach, które wyróżniają je na tle wywołań bezpośrednich: są silnie typowane, mogą być przesyłane strumieniowo, można je wywoływać lokalnie i zdalnie oraz są w pełni obserwowalne. Firebase Genkit udostępnia narzędzia wiersza poleceń i interfejsu dla deweloperów do uruchamiania i debugowania przepływów.

Definiowanie przepływów

W najprostszej postaci przepływ obejmuje tylko funkcję:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

Dzięki temu możesz uruchamiać funkcję z poziomu interfejsu wiersza poleceń Genkit i interfejsu użytkownika dewelopera. Jest to wymagane w przypadku wielu funkcji Genkit, w tym wdrażania i możliwości obserwacji.

Ważną zaletą przepływów Genkit w porównaniu z bezpośrednim wywołaniem interfejsu API modelu jest ochrona przed błędami typu w przypadku zarówno danych wejściowych, jak i wyjściowych. Typy argumentów i wyników przepływu mogą być proste lub strukturalne. Genkit wygeneruje schematy JSON dla tych wartości za pomocą invopop/jsonschema.

Ten przepływ danych przyjmuje jako dane wejściowe element string, a jako dane wyjściowe zwraca element struct:

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

Uruchomione przepływy

Aby uruchomić przepływ w kodzie:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Do uruchamiania przepływów możesz też używać interfejsu wiersza poleceń:

genkit flow:run menuSuggestionFlow '"French"'

Transmisja zakończona

Oto prosty przykład przepływu, który może przesyłać strumieniowo wartości:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

Pamiętaj, że funkcja wywołania zwrotnego strumieniowego przesyłania danych może być niezdefiniowana. Jest on zdefiniowany tylko wtedy, gdy wywołujący klient żąda odpowiedzi strumieniowej.

Aby wywołać przepływ w trybie strumieniowania:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

Jeśli przepływ nie obsługuje strumieniowego przesyłania danych, element StreamFlow() działa tak samo jak element RunFlow().

Do strumieniowego przesyłania przepływów możesz też użyć interfejsu wiersza poleceń:

genkit flow:run menuSuggestionFlow '"French"' -s

Wdrażanie przepływów

Jeśli chcesz uzyskać dostęp do przepływu przez HTTP, musisz go najpierw wdrożyć. Aby wdrożyć przepływy za pomocą Cloud Run i podobnych usług, zdefiniuj przepływy, a następnie wywołaj Init():

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init uruchamia serwer net/http, który ujawnia przepływy jako punkty końcowe HTTP (np. http://localhost:3400/menuSuggestionFlow).

Drugi parametr to opcjonalny parametr Options, który określa:

  • FlowAddr: adres i port, na których ma być nasłuchiwany ruch. Jeśli nie jest ona określona, serwer nasłuchuje na porcie określonym przez zmienną środowiskową PORT. Jeśli jest ona pusta, serwer używa domyślnego portu 3400.
  • Flows: które przepływy mają być wyświetlane. Jeśli go nie podasz, Init obsługuje wszystkie zdefiniowane przepływy.

Jeśli chcesz obsługiwać przepływy na tym samym hoście i porcie co inne punkty końcowe, możesz ustawić wartość FlowAddr na - i zamiast tego wywołać NewFlowServeMux(), aby uzyskać moduł obsługi przepływów Genkit, który możesz zmultipleksować z innymi modułami obsługi trasy:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Obserwowalność przepływu

Czasami, gdy używasz zewnętrznych pakietów SDK, które nie są wyposażone w obserwowalność, możesz chcieć zobaczyć je jako osobny krok śledzenia w interfejsie dewelopera. Wystarczy, że spróbujesz umieścić kod w funkcji run.

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})