Tech and Media Labs
This site uses cookies to improve the user experience.


Stream Ops StreamStorageFS

Jakob Jenkov
Last update: 2019-07-21

The Stream Ops StreamStorageFS class provides disk storage for a data stream. More specifically, you can write an ordered sequence of records to disk, and later read the records from disk again in the exact same order.

The StreamStorageFS will break the records up into one or more files, depending on how big an amount of bytes the written records require. You can specify the maximum file size for each block of records on the StreamStorageFS instance. There is a default maximum file size, but we still need to do a bit of experimentation to find the optimal default size. So far we have used 64MB as default limit, but it depends on the use case what seems to fit best.

A data streams's files are stored in their own directory. This stream root directory does not contain files from any other streams. If you create a StreamStorageFS via a StreamStorageFSRoot instance, then the root directory of the stream will be a subdirectory of the directory passed to the StreamStorageFSRoot

StreamStorageFS Write Example

Before explaining each part of the StreamStorageFS functionality, here is a quick example of writing RION records to a stream via a StreamStorageFS instance:

byte[] rionBytesRecord1 = new byte[]{0x01, 0x08, 0,1,2,3,4,5,6,7};
byte[] rionBytesRecord2 = new byte[]{0x01, 0x08, 7,6,5,4,3,2,1,0};
byte[] rionBytesRecord3 = new byte[]{0x01, 0x08, 0,1,2,3,3,2,1,0};

String streamId      = "stream-1";
String streamRootDir = "/streams/stream-1";

StreamStorageFS streamStorageFS =
    new StreamStorageFS(streamId, streamRootDir);

streamStorageFS.openForAppend();
streamStorageFS.appendRecord(rionBytesRecord1, 0, rionBytesRecord1.length);
streamStorageFS.appendRecord(rionBytesRecord2, 0, rionBytesRecord2.length);
streamStorageFS.appendRecord(rionBytesRecord3, 0, rionBytesRecord3.length);
streamStorageFS.closeForAppend();

Create a StreamStorageFS Instance

To use the StreamStorageFS class you must first create a StreamStorageFS instance. You can create a StreamStorageFS instance directly via its constructor, like this:

String streamId      = "stream-1";
String streamRootDir = "/streams/stream-1";

StreamStorageFS streamStorageFS =
    new StreamStorageFS(streamId, streamRootDir);

You can also create a StreamStorageFS instance via a StreamStorageRootFS instance. Here is how you create a StreamStorageFS instance from a StreamStorageRootFS instance:

String streamsRootDir = "/streams";
StreamStorageRootFS streamStorageRootFS = new StreamStorageRootFS(streamsRootDir);

String streamId = "stream-1";
StreamStorageFS streamStorageFS = streamStorageRootFS.createStreamStorage(streamId);

Appending Records to a StreamStorageFS

Appending records to a StreamStorageFS instance follows these steps:

  • Open the StreamStorageFS for appending.
  • Append one or more records to the StreamStorageFS.
  • Close the StreamStorageFS for appending again.

Each of these steps will be explained in the following sections.

Open for Appending

To write records to a StreamStorageFS you must first open it for appending. You do so by calling the openForAppend() method. Here is an example of opening a StreamStorageFS instance for appending:

streamStorageFS.openForAppend();

Append Records

Once a StreamStorageFS instance is open for appending records, you can append a record to the stream via the append() method. The appended record will be written to the corresponding corresponding file. Here is an example of appending a record to a StreamStorageFS:

byte[] rionRecordData = ... // get a RION formatted record from somewhere

streamStorageFS.append(rionRecordDate, 0, rionRecord.length);

The record must be formatted as a single RION field!. RION is a binary data format we have created in Nanosai. RION will be covered elsewhere (link coming here soon). This doesn't mean that all data written to a stream needs to be RION. It just means, that the data has to be embedded in a RION record. It is possible to embed e.g. a JSON, XML, CSV, JPEG, MPEG etc. record inside a RION record, with just a few bytes overhead.

Close for Append When Finished

Once you have appended the records you want to write, you should close the StreamStorageFS instance again. The reason for that is to make sure that all data is flushed to the corresponding file, and to avoid having too many files open, in case you have hundreds or thousands of streams open at the same time. Of course, if you only have a single stream open, the situation is different.

You close the StreamStorageFS instance by calling the closeForAppend() method. Here is an example of closing a StreamStorageFS instance:

streamStorageFS.closeForAppend();

Reopening a Stream for Appending

You can always re-open a stream for appending again later, even if it has been closed for appending. You just call openForAppend() again. Just remember to close it again when you are finished.

Reading Records From a StreamStorageFS

At some point you will want to read the records you have written to the stream storage. To read the records you need to figure out what block file the records you want to read are stored in. There are several ways to do so. One method is to simply iterate through all the stream's block files to find the records you are looking for. Another method is to create an index that helps you narrow in on what file the records you are looking for, are stored in. Stream Ops offers solutions for both of these methods.

Stream Block Files

You can obtain a list of StreamStorageBlockFS instances from a StreamStorageFS by calling getStorageBlocks(). Here is an example of obtaining the StreamStorageBlockFS objects representing the storage block files from a StreamStorageFS object:

List<StreamStorageBlockFS> lists = streamStorageFS.getStorageBlocks();

Read From a Block File

The StreamStorageFS class contains a method to read bytes from a stream block file. This method is called readBytes(). Here is an example of reading bytes from a stream block file:

StreamStorageBlockFS storageBlock = streamStorageFS.getStorageBlocks().get(0);
long   fromByte  = 0;

byte[] dest      = new byte[1024 * 1024];
int    destIndex = 0;
int    length    = dest.length;

int bytesRead1 = streamStorageFS.readBytes(storageBlock, fromByte, dest, destIndex, length);

The readBytes() method reads bytes from the file represented by the given StreamStorageBlockFS instance, into the byte[] array passed as parameter. The readBytes() method will start fromByte bytes into the block file. The read bytes will be read into the destination byte[] array from index destIndex, and it will read at most length bytes. If there are fewer bytes in the file from fromByte to the end of the block file than the provided length, then only the bytes until the end of the block file are read. The readBytes() method does not read across storage block files.

Jakob Jenkov




Copyright  Jenkov Aps
Close TOC