如果某个集合中包含值已依序编入索引的文档,则 Cloud Firestore 会将写入速率限制为每秒 500 次写入。本页面介绍了如何通过将文档字段分片来克服这项限制。首先,我们将定义“依序编入索引的字段”,并阐明这项限制适用的情况。
依序编入索引的字段
“依序编入索引的字段”是指包含以单调递增或递减方式编入索引的字段的任何文档集合。在许多情况下,依序编入索引的字段都是指 timestamp
字段,但任何单调递增或递减的字段值都会触发写入限制(即每秒 500 次写入)。
例如,如果应用按如下所示为字段 userid
分配值,那么包含编入索引的字段 userid
的 user
文档集合就会应用这项限制:
1281, 1282, 1283, 1284, 1285, ...
另一方面,并非所有 timestamp
字段都会触发此限制。如果某个 timestamp
字段跟踪的是随机分布值,那么这项写入限制就不适用。该字段的实际值也无关紧要,唯一紧要的是其值是单调递增或递减的情况。例如,以下两组单调递增的字段值都会触发写入限制:
100000, 100001, 100002, 100003, ...
0, 1, 2, 3, ...
将 timestamp 字段分片
假设您的应用使用单调递增的 timestamp
字段。
如果您的应用未在任何查询中使用 timestamp
字段,您可以不将 timestamp 字段编入索引,从而移除每秒 500 次写入这项限制。如果您的查询确实需要 timestamp
字段,您可以使用分片时间戳来解决此限制问题:
- 在
timestamp
字段的旁边添加shard
字段。在shard
字段中使用1..n
的不同的值。这样做可以将该集合的写入限制提高至500*n
,但是您需要对n
次查询进行聚合。 - 更新您的写入逻辑,以便为每个文档随机分配一个
shard
值。 - 更新您的查询以聚合各分片结果集。
- 对
shard
字段和timestamp
字段停用单字段索引。删除包含timestamp
字段的现有复合索引。 - 创建新的复合索引以用于支持更新后的查询。索引中各字段的顺序非常重要,其中
shard
字段必须位于timestamp
字段之前。任何包含timestamp
字段的索引都必须包含shard
字段。
分片时间戳实现应仅适用于持续写入速率超过每秒 500 次写入的使用场景。在其他使用场景中,这种实现属于过早优化。将 timestamp
字段分片可消除每秒 500 次写入这项限制,但需要对客户端查询进行聚合操作。
以下示例展示了如何将 timestamp
字段分片以及如何查询分片结果集。
示例数据模型和查询
例如,假设有一个应用可以对货币、普通股和 ETF 等金融工具进行近乎实时的分析。该应用会将文档写入 instruments
集合,具体如下所示:
Node.js
async function insertData() { const instruments = [ { symbol: 'AAA', price: { currency: 'USD', micros: 34790000 }, exchange: 'EXCHG1', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.010Z')) }, { symbol: 'BBB', price: { currency: 'JPY', micros: 64272000000 }, exchange: 'EXCHG2', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.101Z')) }, { symbol: 'Index1 ETF', price: { currency: 'USD', micros: 473000000 }, exchange: 'EXCHG1', instrumentType: 'etf', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.001Z')) } ]; const batch = fs.batch(); for (const inst of instruments) { const ref = fs.collection('instruments').doc(); batch.set(ref, inst); } await batch.commit(); }
该应用按 timestamp
字段运行以下查询和命令:
Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) { return fs.collection('instruments') .where(fieldName, fieldOperator, fieldValue) .orderBy('timestamp', 'desc') .limit(limit) .get(); } function queryCommonStock() { return createQuery('instrumentType', '==', 'commonstock'); } function queryExchange1Instruments() { return createQuery('exchange', '==', 'EXCHG1'); } function queryUSDInstruments() { return createQuery('price.currency', '==', 'USD'); }
insertData() .then(() => { const commonStock = queryCommonStock() .then( (docs) => { console.log('--- queryCommonStock: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const exchange1Instruments = queryExchange1Instruments() .then( (docs) => { console.log('--- queryExchange1Instruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const usdInstruments = queryUSDInstruments() .then( (docs) => { console.log('--- queryUSDInstruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); return Promise.all([commonStock, exchange1Instruments, usdInstruments]); });
经过一番研究之后,您可以确定该应用每秒会收到 1000 至 1500 次工具更新。这一数量远远超过了带有编入索引的 timestamp 字段的文档集合所允许的每秒 500 次写入这项限制。为提高写入吞吐量,您需要 3 个分片值(即 MAX_INSTRUMENT_UPDATES/500 = 3
)。此示例使用分片值 x
、y
和 z
。您也可以将分片值设为数字或其他字符。
添加 shard 字段
向您的文档添加一个 shard
字段。将 shard
字段的值设置为 x
、y
或 z
,以便将该集合的写入限制提高至每秒 1500 次写入。
Node.js
// Define our 'K' shard values const shards = ['x', 'y', 'z']; // Define a function to help 'chunk' our shards for use in queries. // When using the 'in' query filter there is a max number of values that can be // included in the value. If our number of shards is higher than that limit // break down the shards into the fewest possible number of chunks. function shardChunks() { const chunks = []; let start = 0; while (start < shards.length) { const elements = Math.min(MAX_IN_VALUES, shards.length - start); const end = start + elements; chunks.push(shards.slice(start, end)); start = end; } return chunks; } // Add a convenience function to select a random shard function randomShard() { return shards[Math.floor(Math.random() * Math.floor(shards.length))]; }
async function insertData() { const instruments = [ { shard: randomShard(), // add the new shard field to the document symbol: 'AAA', price: { currency: 'USD', micros: 34790000 }, exchange: 'EXCHG1', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.010Z')) }, { shard: randomShard(), // add the new shard field to the document symbol: 'BBB', price: { currency: 'JPY', micros: 64272000000 }, exchange: 'EXCHG2', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.101Z')) }, { shard: randomShard(), // add the new shard field to the document symbol: 'Index1 ETF', price: { currency: 'USD', micros: 473000000 }, exchange: 'EXCHG1', instrumentType: 'etf', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.001Z')) } ]; const batch = fs.batch(); for (const inst of instruments) { const ref = fs.collection('instruments').doc(); batch.set(ref, inst); } await batch.commit(); }
查询分片时间戳
如需添加一个 shard
字段,您需要更新查询以对分片结果进行聚合:
Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) { // For each shard value, map it to a new query which adds an additional // where clause specifying the shard value. return Promise.all(shardChunks().map(shardChunk => { return fs.collection('instruments') .where('shard', 'in', shardChunk) // new shard condition .where(fieldName, fieldOperator, fieldValue) .orderBy('timestamp', 'desc') .limit(limit) .get(); })) // Now that we have a promise of multiple possible query results, we need // to merge the results from all of the queries into a single result set. .then((snapshots) => { // Create a new container for 'all' results const docs = []; snapshots.forEach((querySnapshot) => { querySnapshot.forEach((doc) => { // append each document to the new all container docs.push(doc); }); }); if (snapshots.length === 1) { // if only a single query was returned skip manual sorting as it is // taken care of by the backend. return docs; } else { // When multiple query results are returned we need to sort the // results after they have been concatenated. // // since we're wanting the `limit` newest values, sort the array // descending and take the first `limit` values. By returning negated // values we can easily get a descending value. docs.sort((a, b) => { const aT = a.data().timestamp; const bT = b.data().timestamp; const secondsDiff = aT.seconds - bT.seconds; if (secondsDiff === 0) { return -(aT.nanoseconds - bT.nanoseconds); } else { return -secondsDiff; } }); return docs.slice(0, limit); } }); } function queryCommonStock() { return createQuery('instrumentType', '==', 'commonstock'); } function queryExchange1Instruments() { return createQuery('exchange', '==', 'EXCHG1'); } function queryUSDInstruments() { return createQuery('price.currency', '==', 'USD'); }
insertData() .then(() => { const commonStock = queryCommonStock() .then( (docs) => { console.log('--- queryCommonStock: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const exchange1Instruments = queryExchange1Instruments() .then( (docs) => { console.log('--- queryExchange1Instruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const usdInstruments = queryUSDInstruments() .then( (docs) => { console.log('--- queryUSDInstruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); return Promise.all([commonStock, exchange1Instruments, usdInstruments]); });
更新索引定义
如需移除每秒 500 次写入这项限制条件,请删除使用 timestamp
字段的现有单字段索引和复合索引。
删除复合索引定义
Firebase 控制台
打开 Firebase 控制台中的 Cloud Firestore“复合索引”页面。
对于包含
timestamp
字段的每个索引,点击 按钮,然后点击删除。
GCP Console
在 Google Cloud 控制台中,转到数据库页面。
从数据库列表中选择所需的数据库。
在导航菜单中,点击索引,然后点击复合标签页。
使用过滤条件字段搜索包含
timestamp
字段的索引定义。对于其中每个索引,点击
按钮,然后点击删除。
Firebase CLI
- 如果您尚未设置 Firebase CLI,请按照此处的说明安装 CLI 并运行
firebase init
命令。在init
命令运行期间,请务必选择Firestore: Deploy rules and create indexes for Firestore
。 - 在设置过程中,Firebase CLI 会默认将您现有的索引定义下载到一个名为
firestore.indexes.json
的文件中。 移除所有包含
timestamp
字段的索引定义,例如:{ "indexes": [ // Delete composite index definition that contain the timestamp field { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "exchange", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "instrumentType", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "price.currency", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, ] }
部署更新后的索引定义:
firebase deploy --only firestore:indexes
更新单字段索引定义
Firebase 控制台
打开 Firebase 控制台中的 Cloud Firestore“单字段索引”页面。
点击添加豁免项。
在集合 ID 中,输入
instruments
。在字段路径中,输入timestamp
。在查询范围下,同时选择集合和集合组。
点击下一步
将所有索引设置切换为已停用。点击保存。
对
shard
字段重复上述相同的步骤。
GCP Console
在 Google Cloud 控制台中,转到数据库页面。
从数据库列表中选择所需的数据库。
在导航菜单中,点击索引,然后点击单个字段标签页。
点击单字段标签页。
点击添加豁免项。
在集合 ID 中,输入
instruments
。在字段路径中,输入timestamp
。在查询范围下,同时选择集合和集合组。
点击下一步
将所有索引设置切换为已停用。点击保存。
对
shard
字段重复上述相同的步骤。
Firebase CLI
在索引定义文件的
fieldOverrides
部分中添加以下内容:{ "fieldOverrides": [ // Disable single-field indexing for the timestamp field { "collectionGroup": "instruments", "fieldPath": "timestamp", "indexes": [] }, ] }
部署更新后的索引定义:
firebase deploy --only firestore:indexes
创建新的复合索引
移除所有包含 timestamp
的旧索引之后,定义您的应用所需的新索引。任何包含 timestamp
字段的索引还必须包含 shard
字段。例如,如需支持上述查询,请添加以下索引:
集合 | 编入索引的字段 | 查询范围 |
---|---|---|
instruments | shard、 price.currency、 timestamp | 集合 |
instruments | shard、 exchange、 timestamp | 集合 |
instruments | shard、 instrumentType、 timestamp | 集合 |
错误消息
您可以通过运行更新后的查询来构建这些索引。
每个查询都会返回一条错误消息,并提供一个链接,用于在 Firebase 控制台中创建所需的索引。
Firebase CLI
将以下索引添加到索引定义文件中:
{ "indexes": [ // New indexes for sharded timestamps { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "exchange", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "instrumentType", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "price.currency", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, ] }
部署更新后的索引定义:
firebase deploy --only firestore:indexes
了解依序编入索引的字段的写入限制
依序编入索引的字段之所以存在写入速率限制,是因为 Cloud Firestore 存储索引值和扩缩索引写入次数的方式。对于每次索引写入操作,Cloud Firestore 都会定义一个键值对条目,该条目用于将文档名称和每个编入索引的字段值串联起来。Cloud Firestore 会将这些索引条目整理为数据组(称为“片”)。每个 Cloud Firestore 服务器可容纳一个或多个这样的片。如果某一特定片的写入负载过高,Cloud Firestore 会将该片拆分成若干较小的片,并将这些新片分布到不同的 Cloud Firestore 服务器,从而实现横向扩容。
Cloud Firestore 会按字典顺序将各索引条目紧挨着放入同一片上。如果某个片中的各索引值之间靠得太近(例如对于 timestamp 字段),则 Cloud Firestore 无法将该片有效拆分为若干较小的片。在这种情况下,单个片会收到过多流量,从而形成热点,而对该热点执行读写操作的速度会更慢。
通过将 timestamp 字段分片,您可以让 Cloud Firestore 在多个片之间高效分配工作负载。尽管各 timestamp 字段值之间可能仍然靠得很近,但串联的分片和索引值会为各索引条目提供足够的间隔空间,从而使 Cloud Firestore 能够将各条目拆分到多个片中。
后续步骤
- 阅读可伸缩设计最佳实践
- 如果单个文档具有较高的写入速率,请参阅分布式计数器
- 请参阅 Cloud Firestore 的标准限制