分片時間戳

如果集合包含具有順序索引值的文檔,Cloud Firestore 會將寫入速率限制為每秒 500 次寫入。本頁介紹如何對文件欄位進行分片以克服此限制。首先,讓我們定義「順序索引欄位」的含義,並澄清何時適用此限制。

順序索引字段

「順序索引欄位」指包含單調遞增或遞減索引欄位的任何文件集合。在許多情況下,這意味著timestamp字段,但任何單調遞增或遞減的字段值都可能觸發每秒 500 次寫入的寫入限制。

例如,如果應用程式指派如下userid值,則該限制適用於具有索引欄位useriduser文件集合:

  • 1281, 1282, 1283, 1284, 1285, ...

另一方面,並非所有timestamp字段都會觸發此限制。如果timestamp欄位追蹤隨機分佈的值,則寫入限制不適用。欄位的實際值也不重要,重要的是欄位單調遞增或遞減。例如,以下兩組單調遞增欄位值都會觸發寫入限制:

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

對時間戳字段進行分片

假設您的應用程式使用單調遞增的timestamp欄位。如果您的應用程式不在任何查詢中使用timestamp字段,您可以透過不對時間戳字段建立索引來消除每秒 500 次寫入的限制。如果您的查詢確實需要timestamp字段,則可以使用分片時間戳來解決該限制:

  1. timestamp字段旁邊新增一個shard字段。對shard欄位使用1..n不同的值。這會將集合的寫入限制提高到500*n ,但您必須聚合n查詢。
  2. 更新您的寫入邏輯,為每個文件隨機分配一個shard值。
  3. 更新您的查詢以聚合分片結果集。
  4. 停用shard欄位和timestamp欄位的單一欄位索引。刪除包含timestamp欄位的現有複合索引。
  5. 建立新的複合索引以支援更新的查詢。索引中欄位的順序很重要, shard欄位必須位於timestamp欄位之前。任何包含timestamp欄位的索引也必須包含shard欄位。

您應該只在持續寫入速率高於每秒 500 次寫入的用例中實施分片時間戳記。否則,這是一個不成熟的優化。對timestamp欄位進行分片可以消除每秒 500 次寫入的限制,但需要進行客戶端查詢聚合的權衡。

以下範例展示如何對timestamp欄位進行分片以及如何查詢分片結果集。

範例資料模型和查詢

舉個例子,想像一個對貨幣、普通股和 ETF 等金融工具進行近即時分析的應用程式。該應用程式將文件寫入instruments集合,如下所示:

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

此應用程式按timestamp字段運行以下查詢和訂單:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

經過一番研究後,您確定該應用程式每秒將收到 1,000 到 1,500 次儀器更新。這超過了包含帶有索引時間戳字段的文檔的集合所允許的每秒 500 次寫入。若要提高寫入吞吐量,您需要 3 個分片值, MAX_INSTRUMENT_UPDATES/500 = 3 。此範例使用分片值xyz 。您也可以使用數字或其他字元作為分片值。

新增分片字段

shard欄位新增至您的文件。將shard欄位設為值xyz ,這會將集合的寫入限制提高到每秒 1,500 次寫入。

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

查詢分片時間戳

新增shard欄位需要您更新查詢以聚合分片結果:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

更新索引定義

若要刪除每秒 500 次寫入的限制,請刪除現有的使用timestamp欄位的單一欄位和複合索引。

刪除複合索引定義

Firebase 控制台

  1. 在 Firebase 控制台中開啟Cloud Firestore 複合索引頁面。

    前往綜合指數

  2. 對於包含timestamp欄位的每個索引,按一下按鈕,然後按一下刪除

GCP 控制台

  1. 在 Google Cloud Platform Console 中,前往資料庫頁面。

    前往資料庫

  2. 從資料庫清單中選擇所需的資料庫。

  3. 在導覽功能表中,按一下「索引」 ,然後按一下「複合」標籤。

  4. 使用過濾器欄位搜尋包含timestamp字段的索引定義。

  5. 對於每個索引,按一下按鈕,然後按一下刪除

Firebase CLI

  1. 如果您尚未設定 Firebase CLI,請按照以下說明安裝 CLI 並執行firebase init命令。在init指令期間,請確保選擇Firestore: Deploy rules and create indexes for Firestore
  2. 在設定過程中,Firebase CLI 會將現有索引定義下載到預設名稱為firestore.indexes.json的檔案中。
  3. 刪除包含timestamp欄位的所有索引定義,例如:

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. 部署更新的索引定義:

    firebase deploy --only firestore:indexes
    

更新單一欄位索引定義

Firebase 控制台

  1. 在 Firebase 控制台中開啟Cloud Firestore 單一欄位索引頁面。

    轉到單一欄位索引

  2. 點擊新增豁免

  3. 對於集合 ID ,輸入instruments 。對於欄位路徑,輸入timestamp

  4. 「查詢範圍」下,選擇「集合」「集合組」

  5. 點選下一步

  6. 將所有索引設定切換為「已停用」 。按一下「儲存」

  7. shard欄位重複相同的步驟。

GCP 控制台

  1. 在 Google Cloud Platform Console 中,前往資料庫頁面。

    前往資料庫

  2. 從資料庫清單中選擇所需的資料庫。

  3. 在導覽功能表中,按一下「索引」 ,然後按一下「單一欄位」標籤。

  4. 按一下“單一欄位”標籤。

  5. 點擊新增豁免

  6. 對於集合 ID ,輸入instruments 。對於欄位路徑,輸入timestamp

  7. 「查詢範圍」下,選擇「集合」「集合組」

  8. 點選下一步

  9. 將所有索引設定切換為「已停用」 。按一下「儲存」

  10. shard欄位重複相同的步驟。

Firebase CLI

  1. 將以下內容新增至索引定義檔的fieldOverrides部分:

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. 部署更新的索引定義:

    firebase deploy --only firestore:indexes
    

建立新的複合索引

刪除所有包含timestamp的先前索引後,定義您的應用程式所需的新索引。任何包含timestamp欄位的索引也必須包含shard欄位。例如,若要支援上述查詢,請新增以下索引:

收藏已索引的字段查詢範圍
儀器分片、 價格.貨幣、 時間戳收藏
儀器分片、 交換、 時間戳收藏
儀器分片、 儀器類型、 時間戳收藏

錯誤訊息

您可以透過執行更新的查詢來建立這些索引。

每個查詢都會傳回錯誤訊息,其中包含用於在 Firebase 控制台中建立所需索引的連結。

Firebase CLI

  1. 將以下索引新增至索引定義檔:

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. 部署更新的索引定義:

    firebase deploy --only firestore:indexes
    

了解限制順序索引欄位的寫入

順序索引欄位寫入速率的限制來自 Cloud Firestore 儲存索引值和擴充索引寫入的方式。對於每個索引寫入,Cloud Firestore 都會定義一個鍵值條目,該條目連接文件名稱和每個索引欄位的值。 Cloud Firestore 將這些索引條目組織成稱為「tablet」的資料組。每台 Cloud Firestore 伺服器都裝有一台或多台平板電腦。當特定平板電腦的寫入負載變得過高時,Cloud Firestore 會透過將平板電腦分割為更小的平板電腦並將新平板電腦分佈在不同的 Cloud Firestore 伺服器上來進行水平擴展。

Cloud Firestore 將字典順序接近的索引條目放置在同一平板電腦上。如果tablet中的索引值太接近(例如時間戳欄位),Cloud Firestore無法有效地將tablet分割為較小的tablet。這會產生一個熱點,其中單一平板電腦接收過多流量,並且對該熱點的讀寫操作變得更慢。

透過對時間戳欄位進行分片,您可以讓 Cloud Firestore 有效地將工作負載指派到多個平板電腦上。儘管時間戳欄位的值可能保持靠近,但連接的分片和索引值為 Cloud Firestore 索引條目之間提供了足夠的空間,以便將條目拆分為多個平板電腦。

下一步是什麼