NATS JetStream ObjectStore with NodeJS/Typescript
- nats
- jetstream
- nodejs
- typescript
12 Jun 2024
For an overview over the full series about NATS, see NATS edge to cloud.
NATS JetStream, the persistence layer of NATS, provides not only a way to store messages, but also a way to store other data via its Object Store. This blog post focuses on storing data in NATS in a NodeJS application.
Storing big files somewhere in NodeJS can be done by loading the whole file into memory and sending it somewhere or by streaming a file from one place to another. The latter approach is the more desirable one because it saves a lot of memory when dealing with big files. Good thing is, the NATS.js library favours the usage of streams: The methods to interact with it require ReadableStream
and WritableStream
typed arguments.
You can read more about it here:
Storing a file in NATS JetStream Object Store
import fs from "fs";
import { connect } from "nats";
export const readFileIntoReadableStream = (filePath: string) => {
const readStream = fs.createReadStream(filePath);
return new ReadableStream<Uint8Array>({
start(controller) {
readStream.on("data", (chunk: Buffer) => {
controller.enqueue(chunk);
});
readStream.on("end", () => {
controller.close();
});
readStream.on("error", (error) => {
controller.error(error);
});
},
});
};
export const storeFile = async (
fileName: string,
filePath: string,
objectStoreName: string,
) => {
const natsConnection = await connect();
const jetstream = natsConnection.jetstream();
const objectStore = await jetstream.views.os(objectStoreName);
const file = await objectStore.put(
{ name: fileName },
readFileIntoReadableStream(filePath),
);
return file;
};
Retrieving a file from NATS JetStream Object Store
import fs from "fs";
import { connect } from "nats";
export const writableStreamForFile = (filePath: string) => {
const writeStream = fs.createWriteStream(filePath);
const writableStream = new WritableStream<Uint8Array>({
write(chunk) {
writeStream.write(chunk);
},
close() {
writeStream.close();
},
abort(reason) {
console.error(reason);
},
});
return writableStream;
};
export const retrieveFile = async (
natsName: string,
filePath: string,
objectStoreName: string,
) => {
const natsConnection = await connect();
const jetstream = natsConnection.jetstream();
const objectStore = await jetstream.views.os(objectStoreName);
const writeStream = writableStreamForFile(filePath);
const natsFile = await objectStore.get(natsName);
if (natsFile === null) {
console.error(`File not found: ${objectStoreName} ${natsName}`);
return
}
await natsFile.data.pipeTo(writeStream);
};