实现 S3 对象迁移功能,添加对象复制和元数据获取逻辑
This commit is contained in:
11
.env.example
11
.env.example
@@ -0,0 +1,11 @@
|
|||||||
|
O_S3_ACCESS_KEY_ID=you_old_access
|
||||||
|
O_S3_SECRET_ACCESS_KEY=***
|
||||||
|
O_S3_REGION=us-east-1
|
||||||
|
O_S3_BUCKET_NAME=resources
|
||||||
|
O_S3_ENDPOINT=http://localhost:9000
|
||||||
|
|
||||||
|
S3_ACCESS_KEY_ID=access_key
|
||||||
|
S3_SECRET_ACCESS_KEY=secret_key
|
||||||
|
S3_REGION=cn-shanghai
|
||||||
|
S3_BUCKET_NAME=envision
|
||||||
|
S3_ENDPOINT=https://tos-s3-cn-shanghai.volces.com
|
||||||
@@ -4,7 +4,7 @@
|
|||||||
"description": "",
|
"description": "",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "echo \"Error: no test specified\" && exit 1"
|
"migrate": "tsx src/copy.ts"
|
||||||
},
|
},
|
||||||
"keywords": [],
|
"keywords": [],
|
||||||
"author": "",
|
"author": "",
|
||||||
|
|||||||
156
src/copy.ts
156
src/copy.ts
@@ -1,4 +1,4 @@
|
|||||||
import { S3Client, ListObjectsV2Command, GetBucketMetadataConfigurationCommand, HeadObjectCommand, CopyObjectCommand } from '@aws-sdk/client-s3';
|
import { S3Client, ListObjectsV2Command, HeadObjectCommand, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
|
||||||
import dotenv from 'dotenv';
|
import dotenv from 'dotenv';
|
||||||
dotenv.config();
|
dotenv.config();
|
||||||
|
|
||||||
@@ -21,6 +21,156 @@ const newS3Client = new S3Client({
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
const copyS3Object = async () => {
|
/**
|
||||||
//
|
* Get object metadata (ETag) from a bucket
|
||||||
|
*/
|
||||||
|
const getObjectMetadata = async (client: S3Client, bucket: string, key: string): Promise<{ exists: boolean; etag?: string }> => {
|
||||||
|
try {
|
||||||
|
const response = await client.send(
|
||||||
|
new HeadObjectCommand({
|
||||||
|
Bucket: bucket,
|
||||||
|
Key: key,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
return { exists: true, etag: response.ETag?.replace(/"/g, '') };
|
||||||
|
} catch (error: any) {
|
||||||
|
if (error.name === 'NotFound' || error.$metadata?.httpStatusCode === 404) {
|
||||||
|
return { exists: false };
|
||||||
}
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy a single object from source to destination
|
||||||
|
*/
|
||||||
|
const copyObject = async (key: string): Promise<void> => {
|
||||||
|
const sourceBucket = process.env.O_S3_BUCKET_NAME!;
|
||||||
|
const destBucket = process.env.S3_BUCKET_NAME!;
|
||||||
|
|
||||||
|
console.log(`Copying: ${key}`);
|
||||||
|
|
||||||
|
// Get object from source
|
||||||
|
const getObjectResponse = await oldS3Client.send(
|
||||||
|
new GetObjectCommand({
|
||||||
|
Bucket: sourceBucket,
|
||||||
|
Key: key,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Convert stream to buffer
|
||||||
|
const bodyBytes = await getObjectResponse.Body?.transformToByteArray();
|
||||||
|
if (!bodyBytes) {
|
||||||
|
throw new Error(`Failed to read object: ${key}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put object to destination
|
||||||
|
await newS3Client.send(
|
||||||
|
new PutObjectCommand({
|
||||||
|
Bucket: destBucket,
|
||||||
|
Key: key,
|
||||||
|
Body: bodyBytes,
|
||||||
|
ContentType: getObjectResponse.ContentType,
|
||||||
|
ContentEncoding: getObjectResponse.ContentEncoding,
|
||||||
|
Metadata: getObjectResponse.Metadata,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
console.log(`✓ Copied: ${key}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all objects in source bucket
|
||||||
|
*/
|
||||||
|
const listAllObjects = async (): Promise<string[]> => {
|
||||||
|
const objects: string[] = [];
|
||||||
|
let continuationToken: string | undefined = undefined;
|
||||||
|
const sourceBucket = process.env.O_S3_BUCKET_NAME!;
|
||||||
|
|
||||||
|
do {
|
||||||
|
const response = await oldS3Client.send(
|
||||||
|
new ListObjectsV2Command({
|
||||||
|
Bucket: sourceBucket,
|
||||||
|
ContinuationToken: continuationToken,
|
||||||
|
})
|
||||||
|
) as { Contents?: Array<{ Key?: string }>; NextContinuationToken?: string };
|
||||||
|
|
||||||
|
if (response.Contents) {
|
||||||
|
for (const object of response.Contents) {
|
||||||
|
if (object.Key) {
|
||||||
|
objects.push(object.Key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
continuationToken = response.NextContinuationToken;
|
||||||
|
} while (continuationToken);
|
||||||
|
|
||||||
|
return objects;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main migration function - copy objects from source to destination
|
||||||
|
* Compares ETags: if different, copy again; if same, skip
|
||||||
|
*/
|
||||||
|
const copyS3Object = async (): Promise<void> => {
|
||||||
|
console.log('Starting S3 migration...');
|
||||||
|
console.log(`Source: ${process.env.O_S3_ENDPOINT}/${process.env.O_S3_BUCKET_NAME}`);
|
||||||
|
console.log(`Destination: ${process.env.S3_ENDPOINT}/${process.env.S3_BUCKET_NAME}`);
|
||||||
|
console.log('');
|
||||||
|
|
||||||
|
const objects = await listAllObjects();
|
||||||
|
console.log(`Found ${objects.length} objects in source bucket`);
|
||||||
|
|
||||||
|
let copiedCount = 0;
|
||||||
|
let skippedCount = 0;
|
||||||
|
let updatedCount = 0;
|
||||||
|
let errorCount = 0;
|
||||||
|
const sourceBucket = process.env.O_S3_BUCKET_NAME!;
|
||||||
|
const destBucket = process.env.S3_BUCKET_NAME!;
|
||||||
|
|
||||||
|
for (const key of objects) {
|
||||||
|
try {
|
||||||
|
const sourceMeta = await getObjectMetadata(oldS3Client, sourceBucket, key);
|
||||||
|
const destMeta = await getObjectMetadata(newS3Client, destBucket, key);
|
||||||
|
|
||||||
|
if (!destMeta.exists) {
|
||||||
|
// Destination doesn't exist, copy it
|
||||||
|
await copyObject(key);
|
||||||
|
copiedCount++;
|
||||||
|
} else if (sourceMeta.etag !== destMeta.etag) {
|
||||||
|
// ETags differ, update the file
|
||||||
|
console.log(`Updating (ETag changed): ${key}`);
|
||||||
|
console.log(` Source ETag: ${sourceMeta.etag}`);
|
||||||
|
console.log(` Dest ETag: ${destMeta.etag}`);
|
||||||
|
await copyObject(key);
|
||||||
|
updatedCount++;
|
||||||
|
} else {
|
||||||
|
// Same ETag, skip
|
||||||
|
console.log(`- Skip (same ETag): ${key}`);
|
||||||
|
skippedCount++;
|
||||||
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error(`✗ Error copying ${key}:`, error.message);
|
||||||
|
errorCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('');
|
||||||
|
console.log('Migration complete!');
|
||||||
|
console.log(`Total objects: ${objects.length}`);
|
||||||
|
console.log(`Copied (new): ${copiedCount}`);
|
||||||
|
console.log(`Updated (ETag changed): ${updatedCount}`);
|
||||||
|
console.log(`Skipped (same): ${skippedCount}`);
|
||||||
|
console.log(`Errors: ${errorCount}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Run if this file is executed directly
|
||||||
|
if (require.main === module) {
|
||||||
|
copyS3Object().catch((error) => {
|
||||||
|
console.error('Migration failed:', error);
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export { copyS3Object };
|
||||||
Reference in New Issue
Block a user