|
|
@@ -25,48 +25,81 @@ if (migrationModules.length === 0) {
|
|
|
|
|
|
/** @type {ReplaceLatestRevisions} */
|
|
|
function replaceLatestRevisions(body, migrationModules) {
|
|
|
- var replacedBody = body;
|
|
|
- migrationModules.forEach((migrationModule) => {
|
|
|
- replacedBody = migrationModule(replacedBody);
|
|
|
- });
|
|
|
- return replacedBody;
|
|
|
+ return migrationModules.reduce((replacedBody, module) => module(replacedBody), body);
|
|
|
}
|
|
|
|
|
|
-/** @type {Operations} */
|
|
|
-var operations = [];
|
|
|
-pagesCollection.find({}).forEach((/** @type {any} */ doc) => {
|
|
|
- if (doc.revision) {
|
|
|
- try {
|
|
|
- var revision = revisionsCollection.findOne({ _id: doc.revision });
|
|
|
+var pipeline = [
|
|
|
+ // Join pages with revisions
|
|
|
+ {
|
|
|
+ $lookup: {
|
|
|
+ from: 'revisions',
|
|
|
+ localField: 'revision',
|
|
|
+ foreignField: '_id',
|
|
|
+ as: 'revisionDoc',
|
|
|
+ },
|
|
|
+ },
|
|
|
+ // Unwind the revision array
|
|
|
+ {
|
|
|
+ $unwind: '$revisionDoc',
|
|
|
+ },
|
|
|
+ // Project only needed fields
|
|
|
+ {
|
|
|
+ $project: {
|
|
|
+ _id: '$revisionDoc._id',
|
|
|
+ body: '$revisionDoc.body',
|
|
|
+ },
|
|
|
+ },
|
|
|
+];
|
|
|
|
|
|
- if (revision == null || revision.body == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
- var replacedBody = replaceLatestRevisions(revision.body, [...migrationModules]);
|
|
|
- var operation = {
|
|
|
+try {
|
|
|
+ /** @type {Operations} */
|
|
|
+ var operations = [];
|
|
|
+ var processedCount = 0;
|
|
|
+
|
|
|
+ var cursor = pagesCollection.aggregate(pipeline, {
|
|
|
+ allowDiskUse: true,
|
|
|
+ cursor: { batchSize },
|
|
|
+ });
|
|
|
+
|
|
|
+ while (cursor.hasNext()) {
|
|
|
+ var doc = cursor.next();
|
|
|
+
|
|
|
+ if (doc == null || doc.body == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ var replacedBody = replaceLatestRevisions(doc.body, [...migrationModules]);
|
|
|
+
|
|
|
+ operations.push({
|
|
|
updateOne: {
|
|
|
- filter: { _id: revision._id },
|
|
|
+ filter: { _id: doc._id },
|
|
|
update: {
|
|
|
$set: { body: replacedBody },
|
|
|
},
|
|
|
},
|
|
|
- };
|
|
|
- operations.push(operation);
|
|
|
+ });
|
|
|
+
|
|
|
+ processedCount++;
|
|
|
|
|
|
- // bulkWrite per 100 revisions
|
|
|
- if (operations.length > (batchSize - 1)) {
|
|
|
+ if (operations.length >= batchSize || !cursor.hasNext()) {
|
|
|
revisionsCollection.bulkWrite(operations);
|
|
|
- // sleep time can be set from env var
|
|
|
- sleep(batchSizeInterval);
|
|
|
+ if (batchSizeInterval > 0) {
|
|
|
+ sleep(batchSizeInterval);
|
|
|
+ }
|
|
|
+
|
|
|
operations = [];
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
- print(`Error in updating revision ${doc.revision}: ${err}`);
|
|
|
+ print(`Error processing document ${doc?._id}: ${err}`);
|
|
|
}
|
|
|
}
|
|
|
-});
|
|
|
-revisionsCollection.bulkWrite(operations);
|
|
|
|
|
|
-print('migration complete!');
|
|
|
+ print('Migration complete!');
|
|
|
+}
|
|
|
+catch (err) {
|
|
|
+ print(`Fatal error during migration: ${err}`);
|
|
|
+ throw err;
|
|
|
+}
|