Datastore-based Method

Overview

Usage records can be collected by Paigo's deep integration with data storage systems. Paigo supports many popular systems such as cloud object/blob store (AWS S3, Azure Blob Storage), queue-based datastore (Kafka, AWS Kinesis), file-based datastore (Log File, Archive), etc. Paigo can connect directly to a wide range of data stores to subscribe to real-time events of SaaS applications, extract usage records by doing server-side transformations, and save usage data for billing and analytical purposes. Among all usage measurement and collection methods, the datastore-based method has several advantages:

  • Change-nothing architecture: SaaS business does not need to allocate additional infrastructure to store data or to send data, but leverage any existing architecture of the SaaS application.

  • Native support: Datastore-based integration is more suitable for batch usage ingestion as datastores can be used to sink data dumps.

  • Performance and scalability: The ingestion of usage records can harness the power and features of the underlying datastore, such as async processing and high throughput of the queue-based systems

For comparison with other usage measurement and collection methods, see Meter Usage Data at Scale chapter for the complete documentation.

How It Works

Within Paigo's Usage Measurement and Collection engine, there is a component called Datastore Connector. Datastore connector subscribes to a wide range of storage system for updates, and extract usage based on predefined data schema.

As an example, consider a typical architecture pattern in SaaS applications: event-driven architecture. In the backend of SaaS applications, there is usually a central event bus hosted by popular event streaming platforms such as Kafka. Assume this is an API SaaS application, and their customers are billed based on millions of API calls.

  1. As part of the datastore-based measurement, Paigo subscribes to topics in Kafka with read-only permissions.

  2. Whenever a customer makes an API call, an event is emitted on the central event bus for the API call containing metadata such as the API caller and timestamp.

  3. As a subscriber to the Kafka topic, Paigo listens to the event as it is emitted, parses the event body, extracts usage data, updates the counter of API calls (usage count), and persists usage record in the backend journal.

Millions of events can be emitted to the central event bus and taken in by Paigo without concerning throughputs or performance.

Different data stores work slightly differently with Paigo, but this control flow demonstrates the high-level idea of the datastore-based method.

Data Record Schema and Metadata

Usage Data in S3 (Measurement Template)

AWS S3 is an object store for data persistence. Paigo collects usage data in S3 with the following flow.

  1. At a predefined interval, Paigo Usage Measurement and Collection Engine discovers all new objects saved in S3 buckets hosted by Paigo and downloads those objects and their metadata (such as prefix, path, etc.)

  2. Paigo takes in objects as raw byte streams and decodes them as plain text in newline delimited JSON (NDJSON).

  3. Paigo parses each JSON object in NDJSON and validates with the data record schema as defined in Data Record Schema and Metadata.

  4. For validated usage records, Paigo ingests the record into the backend usage journal. For invalid or unprocessed usage records, Paigo will drop a message to the Dead Letter Queue (DLQ) bucket hosted by Paigo.

Setup Measurement

Navigate to the Metering tab and click New Measurement to view the Measurement Template Table. Select Usage Data in S3 to create a new measurement. In the New Measurement form, some fields are prefilled for the template. Provide a value for Measurement Name. Lastly, in the Account ID field, provide an AWS account number that will be able to assume an AWS IAM role with access to the S3 bucket for data ingestion. For context on assuming an IAM role, see AWS Document on Creating a role to delegate permissions to an IAM user. Click Submit to create a measurement.

Once the measurement is successfully created, click on the measurement from the Measurement Table. The below information is provided for ingesting usage data using S3.

  • IAM Role Arn: The IAM role that can be assumed by the AWS account as specified in the measurement configuration.

  • External ID: The additional field required when assuming the IAM role provided above. Code examples to assume the IAM roles are provided below. For best security purposes, Paigo will always generate an external ID every time a measurement is created.

  • Region: The AWS region to initialize the S3 client with.

  • Ingestion: The S3 bucket path to ingest usage data.

  • DLQ: The S3 bucket path to read messages in Dead Letter Queue.

The S3 bucket for data ingestion is hosted and fully managed by Paigo. The SaaS business will be granted access to ingest data into the S3 bucket.

The same set of operations to set up and get measurements can also be performed by Paigo API.

Ingest Data into S3

{"timestamp":"string","customerId":"string","dimensionId":"string","recordValue":"0","metadata":{}}
{"timestamp":"string","customerId":"string","dimensionId":"string","recordValue":"0","metadata":{}}
{"timestamp":"string","customerId":"string","dimensionId":"string","recordValue":"0","metadata":{}}

Note that pretty-formatted, multiple-line JSON is not a valid NDJSON format.

Below are examples of using the roles and access information provided by Paigo to write data into S3 buckets. For more information on the topic, see AWS Documentation Uploading objects.

Below is a code example of assuming a role:

AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
     .roleArn(iamRoleArn) // Provided in measurement
     .withExternalId(externalId) // provided in measurement
     .roleSessionName(roleSessionName) // Any string, such as "mySession"
     .build();
 
AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
Credentials myCreds = roleResponse.credentials();

See AWS Official Documentation for full code examples of the above snippet.

Below is a code example of uploading an file object to S3 bucket:

public static String putS3Object(S3Client s3, String bucketName, String objectKey, String objectPath) {
    try {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("x-amz-meta-myVal", "test");
        PutObjectRequest putOb = PutObjectRequest.builder()
            .bucket(bucketName)
            .key(objectKey)
            .metadata(metadata)
            .build();
   
        PutObjectResponse response = s3.putObject(putOb, RequestBody.fromBytes(getObjectFile(objectPath)));
        return response.eTag();

    } catch (S3Exception e) {
        System.err.println(e.getMessage());
        System.exit(1);
    }
   
    return "";
}

See AWS Official GitHub Repository for a full code example of the above snippet.

SaaS business has full control over the structure within the S3 bucket with granted access. Paigo Datastore Connector is unaware of the hierarchy of folders/prefixes. SaaS business also has full control to read/write/manage the objects within the given S3 bucket.

Please note that the same object is never read or parsed twice by Paigo Datastore Connector. The objects that appeared in the S3 bucket are treated as an event emitted.

Here is an example of prefix schema that can be used to organize objects in buckets:

/customerId/yyyy/mm/dd/hh/MM/ss/uniqueId

Manage Measurement

Measurements for S3 usage data ingestion can be updated with different account ID. The permission (IAM Role Policy) will be updated accordingly.

Measurements for S3 usage data ingestion can be deleted. The access to S3 buckets will be revoked with the deletion.

Error Handling and Dead Letter Queue

For unprocessed or invalid usage records, Paigo will put a message in a Dead Letter Queue (DLQ) S3 bucket hosted and fully managed by Paigo. The DLQ bucket information is also returned as part of the measurement information. The same set of IAM Role Arn and External ID can be used to gain access and read from DLQ bucket.

Paigo puts an object decodable as plain text into DLQ bucket for every S3 file failed to be processed entirely. Note that Paigo ingests as many usage records as possible on a best-effort basis. So if the original S3 NDJSON file contains more than one usage records, and only one of them is malformatted, Paigo ingests all the other ones and discards the malformatted one. But there will be a DLQ message for this malformatted usage record.

As an example, if there is an input file with path /customerA/2023/01/01/13/41/56/8fh923f.txt that contains three JSON line as NDJSON, and one of them is malformated. Paigo ingests two usage records into backend usage journal, and puts a new message file in DLQ bucket with prefix /customerA/2023/01/01/13/41/56/8fh923f.txt.9a1eaab.message.txt . The message contains the following information:

  • Timestamp of processing usage record, in UTC time

  • Which usage record is invalid

  • Reason of failed processing

  • Process result: ingested, transformed discard, or some other actions taken

Below are examples of using the roles and access information provided by Paigo to read message from DLQ buckets. For more information on the topic, see AWS Documentation Downloading an object.

Below is a code example of assuming a role:

AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
     .roleArn(iamRoleArn) // Provided in measurement
     .withExternalId(externalId) // provided in measurement
     .roleSessionName(roleSessionName) // Any string, such as "mySession"
     .build();
 
AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
Credentials myCreds = roleResponse.credentials();

See AWS Official Documentation for full code examples of the above snippet. Below is a code example of reading from S3 bucket.

public static void getObjectBytes (S3Client s3, String bucketName, String keyName, String path) {
    try {
        GetObjectRequest objectRequest = GetObjectRequest
            .builder()
            .key(keyName)
            .bucket(bucketName)
            .build();
 
        ResponseBytes<GetObjectResponse> objectBytes = s3.getObjectAsBytes(objectRequest);
        byte[] data = objectBytes.asByteArray();
 
        // Write the data to a local file.
        File myFile = new File(path );
        OutputStream os = new FileOutputStream(myFile);
        os.write(data);
        System.out.println("Successfully obtained bytes from an S3 object");
        os.close();

    } catch (IOException ex) {
        ex.printStackTrace();
    } catch (S3Exception e) {
        System.err.println(e.awsErrorDetails().errorMessage());
        System.exit(1);
    }
}

See AWS Official GitHub Repository for a full code example of the above snippet.

Usage Data in Kafka

Kafka is a popular solution for event based architectures, it enables an extremely scalable high throughput system. Paigo can be given access to a Kafka topic and pull measurement data to be used for billing and usage tracking with the following flow.

  1. Paigo Consumer registers with host via SASL SSL Username and Password granted through the measurement on paigo backend (See Setup Measurement). The consumerId will take the format of paigo-${measurementId}

  2. Paigo begins to pull usage data from its assigned topic and process it. Paigo takes in each event as a byte stream and expects each to be a valid JSON object.

  3. Usage data is loaded into the backend journal and queryable via the API or in the UI data explorer.

Setup Measurement

Navigate to Metering tab and click New Measurement to view the Measurement Template Table. Select Usage Data in Kafka to create a new measurement. In the New Measurement form, some fields are prefilled for the template. Provide a value for Measurement Name. Provide a Username and Password for your specific topic. In confluent the Username is the API key and the Password is the secret for a given cluster. Fill in the Bootstrap Server and Topic / DLQ Topic names respectively. All of these should be created ahead of time otherwise the measurement will fail to initialize. In the Confluent UI you can find your Bootstrap server under the Cluster Settings tab. Topics are also separated on the left side of the UI. See confluent documentation for more information.

Initial Testing

After successfully creating a measurement in Paigo, please wait 15 minutes for Paigo to initialize the consumer, you should see a new consumer register with your cluster afterwards. Once initialized, load JSON data onto the topic with the following format

{"timestamp":"string","customerId":"string","dimensionId":"string","recordValue":"0","metadata":{}}

If successful you should see the usage data populate for your customer in the Paigo UI or via an API call to GET /customers/:customerId/usage

Manage Kafka Measurement

To deregister the consumer, simply delete the measurement. By navigating back to the Metering tab selecting the measurement consumer and Deleting Note

Deleting will not happen instantaneously, the consumer will deregister over the course of the next fifteen minutes. You can verify the completion of the dregistration in your Kafka Cluster by noting the Paigo consumer no longer appearing. It is recommended that you halt adding records to your topic before deregistering in order to be sure that the last record was successfully processed.

Error Handling and Topic DLQ

As apart of the measurement creation, you must expose a DLQ topic for Paigo to handle cases where a record failed to be processed. Paigo requires that the DLQ topic and ingestion Topic are apart of the same Cluster, meaning they share the same bootstrap host. For the API credentials Paigo will need read and write permissions. Specifically read for the ingestion and write for the DLQ topic. Errors written to the DLQ topic contain the following information and are a valid JSON object.

  • Which usage record is invalid

  • Timestamp of processing usage record, in UTC time

  • Reason of failed processing

  • Process result: ingested, transformed discard, or some other actions taken

Downstream of the DLQ it is recommended to have an automated consumer responsible for parsing and taking the correct action, such as storing the event, notifying relevant team members, or reprocessing the event with any corrections required. Such as putting in a missing customerId into the usage record.

Last updated