Потоки

Потоки — это обернутые функции с некоторыми дополнительными характеристиками по сравнению с прямыми вызовами: они строго типизированы, потокоемки, локально и удаленно вызываются и полностью наблюдаемы. Firebase Genkit предоставляет инструменты CLI и пользовательского интерфейса разработчика для запуска и отладки потоков.

Определение потоков

В своей простейшей форме поток просто оборачивает функцию:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

Это позволит вам запускать функцию из интерфейса командной строки Genkit и пользовательского интерфейса разработчика, а также является обязательным требованием для многих функций Genkit, включая развертывание и возможность наблюдения.

Важным преимуществом потоков Genkit перед прямым вызовом API модели является безопасность типов как входных, так и выходных данных. Типы аргументов и результатов потока могут быть простыми или структурированными значениями. Genkit создаст схемы JSON для этих значений, используя invopop/jsonschema .

Следующий поток принимает string в качестве входных данных и выводит struct :

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

Запуск потоков

Чтобы запустить поток в вашем коде:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Вы также можете использовать CLI для запуска потоков:

genkit flow:run menuSuggestionFlow '"French"'

Потоковое

Вот простой пример потока, который может передавать значения:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

Обратите внимание, что обратный вызов потоковой передачи может быть неопределенным. Он определяется только в том случае, если вызывающий клиент запрашивает потоковый ответ.

Чтобы вызвать поток в потоковом режиме:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

Если поток не реализует потоковую передачу, StreamFlow() ведет себя идентично RunFlow() .

Вы также можете использовать CLI для потоковой передачи потоков:

genkit flow:run menuSuggestionFlow '"French"' -s

Развертывание потоков

Если вы хотите иметь доступ к своему потоку через HTTP, вам необходимо сначала его развернуть. Чтобы развернуть потоки с помощью Cloud Run и подобных сервисов, определите свои потоки, а затем вызовите Init() :

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init запускает сервер net/http , который предоставляет ваши потоки как конечные точки HTTP (например, http://localhost:3400/menuSuggestionFlow ).

Второй параметр — это необязательные Options , которые определяют следующее:

  • FlowAddr : адрес и порт для прослушивания. Если не указано, сервер прослушивает порт, указанный в переменной среды PORT; если он пуст, используется порт 3400 по умолчанию.
  • Flows : какие потоки обслуживать. Если не указано, Init обслуживает все определенные вами потоки.

Если вы хотите обслуживать потоки на том же хосте и порту, что и другие конечные точки, вы можете установить для FlowAddr значение - и вместо этого вызвать NewFlowServeMux() , чтобы получить обработчик для ваших потоков Genkit, который вы можете мультиплексировать с другими обработчиками маршрутов:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Наблюдаемость потока

Иногда при использовании сторонних SDK, которые не поддерживают возможность наблюдения, вам может потребоваться увидеть их как отдельный этап трассировки в пользовательском интерфейсе разработчика. Все, что вам нужно сделать, это обернуть код в функцию run .

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})