编写 Genkit 遥测插件

OpenTelemetry 支持收集跟踪记录、指标和日志。通过编写用于配置 Node.js SDK 的遥测插件,对 Firebase Genkit 进行扩展,可将所有遥测数据导出到任何支持 OpenTelemetry 的系统。

配置

如需控制遥测数据导出,插件的 PluginOptions 必须提供符合 Genkit 配置中的 telemetry 块的 telemetry 对象。

export interface InitializedPlugin {
  ...
  telemetry?: {
    instrumentation?: Provider<TelemetryConfig>;
    logger?: Provider<LoggerConfig>;
  };
}

此对象可以提供两种单独的配置:

  • instrumentation:为 TracesMetrics 提供 OpenTelemetry 配置。
  • logger:提供 Genkit 用于写入结构化日志数据(包括 Genkit 流的输入和输出)的底层日志记录器。

目前必须进行这种分离,因为 Node.js OpenTelemetry SDK 的日志记录功能仍处于开发中。日志记录单独提供,以便插件可以控制明确写入数据的位置。

import { genkitPlugin, Plugin } from '@genkit-ai/core';

...

export interface MyPluginOptions {
  // [Optional] Your plugin options
}

export const myPlugin: Plugin<[MyPluginOptions] | []> = genkitPlugin(
  'myPlugin',
  async (options?: MyPluginOptions) => {
    return {
      telemetry: {
        instrumentation: {
          id: 'myPlugin',
          value: myTelemetryConfig,
        },
        logger: {
          id: 'myPlugin',
          value: myLogger,
        },
      },
    };
  }
);

export default myPlugin;

有了上述代码块,插件现在将为 Genkit 提供可供开发者使用的遥测配置。

插桩

如需控制跟踪记录和指标的导出,插件必须在 telemetry 对象上提供符合 TelemetryConfig 接口的 instrumentation 属性:

interface TelemetryConfig {
  getConfig(): Partial<NodeSDKConfiguration>;
}

这会提供一个 Partial<NodeSDKConfiguration>,Genkit 框架将使用它来启动 NodeSDK。这样,插件就可以完全控制 Genkit 如何使用 OpenTelemetry 集成。

例如,以下遥测配置提供了一个简单的内存中跟踪记录和指标导出器:

import { AggregationTemporality, InMemoryMetricExporter, MetricReader, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { AlwaysOnSampler, BatchSpanProcessor, InMemorySpanExporter } from '@opentelemetry/sdk-trace-base';
import { NodeSDKConfiguration } from '@opentelemetry/sdk-node';
import { Resource } from '@opentelemetry/resources';
import { TelemetryConfig } from '@genkit-ai/core';

...

const myTelemetryConfig: TelemetryConfig = {
  getConfig(): Partial<NodeSDKConfiguration> {
    return {
      resource: new Resource({}),
      spanProcessor: new BatchSpanProcessor(new InMemorySpanExporter()),
      sampler: new AlwaysOnSampler(),
      instrumentations: myPluginInstrumentations,
      metricReader: new PeriodicExportingMetricReader({
        exporter: new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE),
      }),
    };
  },
};

Logger

如需控制 Genkit 框架用于写入结构化日志数据的日志记录器,插件必须在 telemetry 对象上提供符合 LoggerConfig 接口的 logger 属性:

interface LoggerConfig {
  getLogger(env: string): any;
}
{
  debug(...args: any);
  info(...args: any);
  warn(...args: any);
  error(...args: any);
  level: string;
}

大多数常用的日志记录框架都符合此要求。其中一种框架是 winston,它允许配置能够直接将日志数据推送到所选位置的传输器。

例如,如需提供将日志数据写入控制台的 Winston 日志记录器,您可以将插件日志记录器更新为使用以下内容:

import * as winston from 'winston';

...

const myLogger: LoggerConfig = {
  getLogger(env: string) {
    return winston.createLogger({
      transports: [new winston.transports.Console()],
      format: winston.format.printf((info): string => {
        return `[${info.level}] ${info.message}`;
      }),
    });
  }
};

关联日志和跟踪记录

通常,建议您将日志语句与插件导出的 OpenTelemetry 跟踪记录相关联。由于 OpenTelemetry 框架不会直接导出日志语句,因此不会开箱即用。幸运的是,OpenTelemetry 支持将跟踪记录和 span ID 复制到 winstonpino 等常用日志记录框架的日志语句中的插桩。通过使用 @opentelemetry/auto-instrumentations-node 软件包,您可以自动配置这些(及其他)插桩,但在某些情况下,您可能需要控制跟踪记录和 span 的字段名称和值。为此,您需要向 TelemetryConfig 提供的 NodeSDK 配置提供自定义 LogHook 插桩:

import { Instrumentation } from '@opentelemetry/instrumentation';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston';
import { Span } from '@opentelemetry/api';

const myPluginInstrumentations: Instrumentation[] =
  getNodeAutoInstrumentations().concat([
    new WinstonInstrumentation({
      logHook: (span: Span, record: any) => {
        record['my-trace-id'] = span.spanContext().traceId;
        record['my-span-id'] = span.spanContext().spanId;
        record['is-trace-sampled'] = span.spanContext().traceFlags;
      },
    }),
  ]);

该示例为 OpenTelemetry NodeSDK 启用了所有自动插桩,然后提供了一个自定义 WinstonInstrumentation,用于将跟踪记录和 span ID 写入日志消息中的自定义字段。

Genkit 框架将保证在插件的 LoggerConfig 之前初始化插件的 TelemetryConfig,但您必须注意确保在 LoggerConfig 初始化之前不会导入底层日志记录器。例如,可以按如下方式修改上述 loggingConfig:

const myLogger: LoggerConfig = {
  async getLogger(env: string) {
    // Do not import winston before calling getLogger so that the NodeSDK
    // instrumentations can be registered first.
    const winston = await import('winston');

    return winston.createLogger({
      transports: [new winston.transports.Console()],
      format: winston.format.printf((info): string => {
        return `[${info.level}] ${info.message}`;
      }),
    });
  },
};

完整示例

以下是在上文中创建的遥测插件的完整示例。如需查看实际示例,请查看 @genkit-ai/google-cloud 插件。

import {
  genkitPlugin,
  LoggerConfig,
  Plugin,
  TelemetryConfig,
} from '@genkit-ai/core';
import { Span } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston';
import { Resource } from '@opentelemetry/resources';
import {
  AggregationTemporality,
  InMemoryMetricExporter,
  PeriodicExportingMetricReader,
} from '@opentelemetry/sdk-metrics';
import { NodeSDKConfiguration } from '@opentelemetry/sdk-node';
import {
  AlwaysOnSampler,
  BatchSpanProcessor,
  InMemorySpanExporter,
} from '@opentelemetry/sdk-trace-base';

export interface MyPluginOptions {
  // [Optional] Your plugin options
}

const myPluginInstrumentations: Instrumentation[] =
  getNodeAutoInstrumentations().concat([
    new WinstonInstrumentation({
      logHook: (span: Span, record: any) => {
        record['my-trace-id'] = span.spanContext().traceId;
        record['my-span-id'] = span.spanContext().spanId;
        record['is-trace-sampled'] = span.spanContext().traceFlags;
      },
    }),
  ]);

const myTelemetryConfig: TelemetryConfig = {
  getConfig(): Partial<NodeSDKConfiguration> {
    return {
      resource: new Resource({}),
      spanProcessor: new BatchSpanProcessor(new InMemorySpanExporter()),
      sampler: new AlwaysOnSampler(),
      instrumentations: myPluginInstrumentations,
      metricReader: new PeriodicExportingMetricReader({
        exporter: new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE),
      }),
    };
  },
};

const myLogger: LoggerConfig = {
  async getLogger(env: string) {
    // Do not import winston before calling getLogger so that the NodeSDK
    // instrumentations can be registered first.
    const winston = await import('winston');

    return winston.createLogger({
      transports: [new winston.transports.Console()],
      format: winston.format.printf((info): string => {
        return `[${info.level}] ${info.message}`;
      }),
    });
  },
};

export const myPlugin: Plugin<[MyPluginOptions] | []> = genkitPlugin(
  'myPlugin',
  async (options?: MyPluginOptions) => {
    return {
      telemetry: {
        instrumentation: {
          id: 'myPlugin',
          value: myTelemetryConfig,
        },
        logger: {
          id: 'myPlugin',
          value: myLogger,
        },
      },
    };
  }
);

export default myPlugin;

问题排查

如果您无法让数据显示在预期的位置,OpenTelemetry 提供了一个有用的诊断工具,可帮助您找到问题根源。