Потоки

Потоки представляют собой обернутые функции с некоторыми дополнительными характеристиками по сравнению с прямыми вызовами: они строго типизированы, потокоемки, локально и удаленно вызываются и полностью наблюдаемы. Firebase Genkit предоставляет инструменты CLI и пользовательского интерфейса разработчика для запуска и отладки потоков.

Определение потоков

В своей простейшей форме поток просто оборачивает функцию:

Идти

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

Это позволит вам запускать функцию из интерфейса командной строки Genkit и пользовательского интерфейса разработчика, а также является обязательным требованием для многих функций Genkit, включая развертывание и возможность наблюдения.

Важным преимуществом потоков Genkit перед прямым вызовом API модели является безопасность типов как входных, так и выходных данных:

Идти

Типы аргументов и результатов потока могут быть простыми или структурированными значениями. Genkit создаст схемы JSON для этих значений, используя invopop/jsonschema .

Следующий поток принимает string в качестве входных данных и выводит struct :

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

Запуск потоков

Чтобы запустить поток в вашем коде:

Идти

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Вы также можете использовать CLI для запуска потоков:

genkit flow:run menuSuggestionFlow '"French"'

Потоковое

Вот простой пример потока, который может передавать значения:

Идти

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

Обратите внимание, что обратный вызов потоковой передачи может быть неопределенным. Он определяется только в том случае, если вызывающий клиент запрашивает потоковый ответ.

Чтобы вызвать поток в потоковом режиме:

Идти

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

Если поток не реализует потоковую передачу, StreamFlow() ведет себя идентично RunFlow() .

Вы также можете использовать CLI для потоковой передачи потоков:

genkit flow:run menuSuggestionFlow '"French"' -s

Развертывание потоков

Если вы хотите иметь доступ к своему потоку через HTTP, вам необходимо сначала его развернуть.

Идти

Чтобы развернуть потоки с помощью Cloud Run и подобных сервисов, определите свои потоки, а затем вызовите Init() :

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init запускает сервер net/http , который предоставляет ваши потоки как конечные точки HTTP (например, http://localhost:3400/menuSuggestionFlow ).

Второй параметр — это необязательные Options , которые определяют следующее:

  • FlowAddr : адрес и порт для прослушивания. Если не указано, сервер прослушивает порт, указанный в переменной среды PORT; если он пуст, используется порт 3400 по умолчанию.
  • Flows : какие потоки обслуживать. Если не указано, Init обслуживает все определенные вами потоки.

Если вы хотите обслуживать потоки на том же хосте и порту, что и другие конечные точки, вы можете установить для FlowAddr значение - и вместо этого вызвать NewFlowServeMux() , чтобы получить обработчик для ваших потоков Genkit, который вы можете мультиплексировать с другими обработчиками маршрутов:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Наблюдаемость потока

Иногда при использовании сторонних SDK, которые не поддерживают возможность наблюдения, вы можете захотеть увидеть их как отдельный этап трассировки в пользовательском интерфейсе разработчика. Все, что вам нужно сделать, это обернуть код в функцию run .

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })