stream.ts 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import { Transform } from 'stream';
  2. export const convertStreamToBuffer = (stream: any): Promise<Buffer> => {
  3. return new Promise((resolve, reject) => {
  4. const buffer: Uint8Array[] = [];
  5. stream.on('data', (chunk: Uint8Array) => {
  6. buffer.push(chunk);
  7. });
  8. stream.on('end', () => resolve(Buffer.concat(buffer)));
  9. stream.on('error', err => reject(err));
  10. });
  11. };
  12. export const getBufferToFixedSizeTransform = (size: number): Transform => {
  13. let buffer = Buffer.alloc(size);
  14. let filledBufferSize = 0;
  15. return new Transform({
  16. transform(chunk: Buffer, encoding, callback) {
  17. let offset = 0;
  18. while (offset < chunk.length) {
  19. // The data size to add to buffer.
  20. // - If the remaining chunk size is smaller than the remaining buffer size:
  21. // - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
  22. // - If the remaining chunk size is larger than the remaining buffer size:
  23. // - Fill the buffer, and upload => dataSize is the remaining buffer size
  24. // - The remaining chunk after upload will be added to buffer in the next iteration
  25. const dataSize = Math.min(size - filledBufferSize, chunk.length - offset);
  26. // Add chunk data to buffer
  27. chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
  28. filledBufferSize += dataSize;
  29. // When buffer reaches size, push to next stream
  30. if (filledBufferSize === size) {
  31. this.push(buffer);
  32. // Reset buffer after push
  33. buffer = Buffer.alloc(size);
  34. filledBufferSize = 0;
  35. }
  36. offset += dataSize;
  37. }
  38. callback();
  39. },
  40. flush(callback) {
  41. // push the final buffer
  42. if (filledBufferSize > 0) {
  43. this.push(buffer.slice(0, filledBufferSize));
  44. }
  45. callback();
  46. },
  47. });
  48. };