feat(api): migrate S3 client with structured logs
This commit is contained in:
@@ -1,49 +1,113 @@
|
|||||||
import { S3 } from 'aws-sdk';
|
import {
|
||||||
import { Readable } from 'stream';
|
S3Client as AwsS3Client,
|
||||||
|
HeadObjectCommand,
|
||||||
|
GetObjectCommand,
|
||||||
|
} from "@aws-sdk/client-s3";
|
||||||
|
import { Readable } from "stream";
|
||||||
|
import { Logger } from "pino";
|
||||||
|
|
||||||
export interface GetObjectStreamResult {
|
export interface GetObjectStreamResult {
|
||||||
stream: Readable;
|
stream: Readable;
|
||||||
contentLength: number; // length of the returned body
|
contentLength: number; // length of the returned body
|
||||||
totalLength: number; // full object size
|
contentType?: string;
|
||||||
contentType?: string;
|
contentRange?: string;
|
||||||
contentRange?: string;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export default class S3Client {
|
export default class S3Client {
|
||||||
private s3: S3;
|
private s3: AwsS3Client;
|
||||||
|
private logger: Logger;
|
||||||
|
|
||||||
constructor() {
|
constructor(logger: Logger) {
|
||||||
this.s3 = new S3({
|
this.logger = logger;
|
||||||
region: process.env.AWS_REGION,
|
|
||||||
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
|
const accessKeyId = process.env.AWS_ACCESS_KEY_ID;
|
||||||
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
|
const secretAccessKey = process.env.AWS_SECRET_ACCESS_KEY;
|
||||||
s3ForcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true',
|
|
||||||
endpoint: process.env.S3_ENDPOINT || undefined,
|
if (!accessKeyId || !secretAccessKey) {
|
||||||
});
|
throw new Error(
|
||||||
|
"AWS credentials not configured. Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async headObject(bucket: string, key: string) {
|
const config = {
|
||||||
return this.s3.headObject({ Bucket: bucket, Key: key }).promise();
|
region: process.env.AWS_REGION,
|
||||||
|
credentials: {
|
||||||
|
accessKeyId,
|
||||||
|
secretAccessKey,
|
||||||
|
},
|
||||||
|
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === "true",
|
||||||
|
endpoint: process.env.S3_ENDPOINT || undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
this.logger.info(
|
||||||
|
{
|
||||||
|
region: config.region,
|
||||||
|
endpoint: config.endpoint,
|
||||||
|
forcePathStyle: config.forcePathStyle,
|
||||||
|
},
|
||||||
|
"S3Client: Initializing with config",
|
||||||
|
);
|
||||||
|
|
||||||
|
this.s3 = new AwsS3Client(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
async headObject(bucket: string, key: string) {
|
||||||
|
this.logger.info({ bucket, key }, `S3Client: headObject`);
|
||||||
|
try {
|
||||||
|
const command = new HeadObjectCommand({ Bucket: bucket, Key: key });
|
||||||
|
const result = await this.s3.send(command);
|
||||||
|
this.logger.info(
|
||||||
|
{ size: result.ContentLength, type: result.ContentType },
|
||||||
|
`S3Client: headObject success`,
|
||||||
|
);
|
||||||
|
return result;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error({ error, bucket, key }, `S3Client: headObject error`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getObjectStream(
|
||||||
|
bucket: string,
|
||||||
|
key: string,
|
||||||
|
totalLength: number,
|
||||||
|
contentType: string | undefined,
|
||||||
|
range?: { start: number; end: number },
|
||||||
|
): Promise<GetObjectStreamResult> {
|
||||||
|
this.logger.info(
|
||||||
|
{ bucket, key, range: range ? `${range.start}-${range.end}` : "none" },
|
||||||
|
`S3Client: getObjectStream`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const params = {
|
||||||
|
Bucket: bucket,
|
||||||
|
Key: key,
|
||||||
|
Range: undefined as string | undefined,
|
||||||
|
};
|
||||||
|
let contentRange: string | undefined;
|
||||||
|
if (range && totalLength > 0) {
|
||||||
|
params.Range = `bytes=${range.start}-${range.end}`;
|
||||||
|
contentRange = `bytes ${range.start}-${range.end}/${totalLength}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
async getObjectStream(bucket: string, key: string, range?: { start: number; end: number }): Promise<GetObjectStreamResult> {
|
try {
|
||||||
const head = await this.headObject(bucket, key);
|
const command = new GetObjectCommand(params);
|
||||||
const totalLength = head.ContentLength || 0;
|
const result = await this.s3.send(command);
|
||||||
const contentType = head.ContentType;
|
const stream = result.Body as Readable;
|
||||||
|
|
||||||
const params: S3.GetObjectRequest = { Bucket: bucket, Key: key };
|
const contentLength = result.ContentLength || 0;
|
||||||
let contentRange: string | undefined;
|
|
||||||
if (range && totalLength > 0) {
|
|
||||||
params.Range = `bytes=${range.start}-${range.end}`;
|
|
||||||
contentRange = `bytes ${range.start}-${range.end}/${totalLength}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
const req = this.s3.getObject(params);
|
this.logger.info(
|
||||||
const stream = req.createReadStream();
|
{ contentLength, totalLength },
|
||||||
|
`S3Client: getObjectStream success`,
|
||||||
// When a range is requested, S3 returns partial content length (end-start+1)
|
);
|
||||||
const contentLength = range && totalLength > 0 ? range.end - range.start + 1 : totalLength;
|
return { stream, contentLength, contentType, contentRange };
|
||||||
|
} catch (error) {
|
||||||
return { stream, contentLength, totalLength, contentType, contentRange };
|
this.logger.error(
|
||||||
|
{ error, bucket, key },
|
||||||
|
`S3Client: getObjectStream error`,
|
||||||
|
);
|
||||||
|
throw error;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,57 +1,101 @@
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from "stream";
|
||||||
import S3Client, { GetObjectStreamResult } from './s3Client';
|
import S3Client, { GetObjectStreamResult } from "./s3Client";
|
||||||
import { rangeParser } from '../utils/rangeParser';
|
import { rangeParser } from "../utils/rangeParser";
|
||||||
|
import { Logger } from "pino";
|
||||||
|
|
||||||
export interface StreamResult {
|
export interface StreamResult {
|
||||||
status: number;
|
status: number;
|
||||||
headers: Record<string, string>;
|
headers: Record<string, string>;
|
||||||
body: Readable;
|
body: Readable;
|
||||||
}
|
}
|
||||||
|
|
||||||
export default class StreamService {
|
export default class StreamService {
|
||||||
private s3: S3Client;
|
private s3: S3Client;
|
||||||
private bucket: string;
|
private bucket: string;
|
||||||
private prefix: string;
|
private prefix: string;
|
||||||
|
private logger: Logger;
|
||||||
|
|
||||||
constructor(bucket = process.env.S3_BUCKET || '', prefix = process.env.S3_PREFIX || '') {
|
constructor(
|
||||||
this.s3 = new S3Client();
|
logger: Logger,
|
||||||
this.bucket = bucket;
|
bucket = process.env.S3_BUCKET || "",
|
||||||
this.prefix = (prefix || '').replace(/^\/+|\/+$/g, ''); // trim leading/trailing '/'
|
prefix = process.env.S3_PREFIX || "",
|
||||||
|
) {
|
||||||
|
this.s3 = new S3Client(logger);
|
||||||
|
this.bucket = bucket;
|
||||||
|
this.prefix = (prefix || "").replace(/^\/+|\/+$/g, ""); // trim leading/trailing '/'
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
async streamFromS3({
|
||||||
|
key,
|
||||||
|
range,
|
||||||
|
}: {
|
||||||
|
key: string;
|
||||||
|
range?: string;
|
||||||
|
}): Promise<StreamResult> {
|
||||||
|
if (!this.bucket) throw new Error("S3_BUCKET not configured");
|
||||||
|
|
||||||
|
const actualKey = this.buildKey(key);
|
||||||
|
this.logger.info(
|
||||||
|
`StreamService: Attempting to stream from bucket: ${this.bucket}, key: ${actualKey}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Determine byte range
|
||||||
|
let parsedRange: { start: number; end: number } | undefined;
|
||||||
|
// We need object size to parse range properly, so head first
|
||||||
|
this.logger.info(`StreamService: Getting head object for ${actualKey}`);
|
||||||
|
const head = await this.s3.headObject(this.bucket, actualKey);
|
||||||
|
const total = head.ContentLength || 0;
|
||||||
|
const contentType = head.ContentType;
|
||||||
|
this.logger.info(
|
||||||
|
`StreamService: Object size: ${total}, content-type: ${contentType}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (range) {
|
||||||
|
const r = rangeParser(range, total);
|
||||||
|
if (r) parsedRange = r;
|
||||||
|
this.logger.info(
|
||||||
|
`StreamService: Parsed range: ${r ? `${r.start}-${r.end}` : "invalid"}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(`StreamService: Getting object stream for ${actualKey}`);
|
||||||
|
const { stream, contentLength, contentRange }: GetObjectStreamResult =
|
||||||
|
await this.s3.getObjectStream(
|
||||||
|
this.bucket,
|
||||||
|
actualKey,
|
||||||
|
total,
|
||||||
|
contentType,
|
||||||
|
parsedRange,
|
||||||
|
);
|
||||||
|
|
||||||
|
const isPartial = Boolean(parsedRange);
|
||||||
|
const status = isPartial ? 206 : 200;
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
"Content-Type": contentType || "audio/mp4",
|
||||||
|
"Accept-Ranges": "bytes",
|
||||||
|
"Content-Length": String(contentLength),
|
||||||
|
};
|
||||||
|
if (isPartial && contentRange) headers["Content-Range"] = contentRange;
|
||||||
|
|
||||||
|
this.logger.info(
|
||||||
|
{ headers, status },
|
||||||
|
`StreamService: Successfully prepared stream`,
|
||||||
|
);
|
||||||
|
return { status, headers, body: stream };
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(
|
||||||
|
{ error, actualKey },
|
||||||
|
`StreamService: Error in streamFromS3`,
|
||||||
|
);
|
||||||
|
throw error;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async streamFromS3({ key, range }: { key: string; range?: string }): Promise<StreamResult> {
|
private buildKey(key: string): string {
|
||||||
if (!this.bucket) throw new Error('S3_BUCKET not configured');
|
const cleanKey = (key || "").replace(/^\/+/, "");
|
||||||
|
if (!this.prefix) return cleanKey;
|
||||||
const actualKey = this.buildKey(key);
|
return `${this.prefix}/${cleanKey}`;
|
||||||
|
}
|
||||||
// Determine byte range
|
|
||||||
let parsedRange: { start: number; end: number } | undefined;
|
|
||||||
// We need object size to parse range properly, so head first
|
|
||||||
const head = await this.s3.headObject(this.bucket, actualKey);
|
|
||||||
const total = head.ContentLength || 0;
|
|
||||||
if (range) {
|
|
||||||
const r = rangeParser(range, total);
|
|
||||||
if (r) parsedRange = r;
|
|
||||||
}
|
|
||||||
|
|
||||||
const { stream, contentLength, totalLength, contentType, contentRange }: GetObjectStreamResult =
|
|
||||||
await this.s3.getObjectStream(this.bucket, actualKey, parsedRange);
|
|
||||||
|
|
||||||
const isPartial = Boolean(parsedRange);
|
|
||||||
const status = isPartial ? 206 : 200;
|
|
||||||
const headers: Record<string, string> = {
|
|
||||||
'Content-Type': contentType || 'audio/mp4',
|
|
||||||
'Accept-Ranges': 'bytes',
|
|
||||||
'Content-Length': String(contentLength),
|
|
||||||
};
|
|
||||||
if (isPartial && contentRange) headers['Content-Range'] = contentRange;
|
|
||||||
|
|
||||||
return { status, headers, body: stream };
|
|
||||||
}
|
|
||||||
|
|
||||||
private buildKey(key: string): string {
|
|
||||||
const cleanKey = (key || '').replace(/^\/+/, '');
|
|
||||||
if (!this.prefix) return cleanKey;
|
|
||||||
return `${this.prefix}/${cleanKey}`;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user