Kinesis, DynamoDB Streams and .NET

Screen Shot 2017-09-08 at 10.12.37.png

In this article i’m going to see how AWS streaming services (Kinesis and DynamoDB Streams) compare to SQS. I’m also going to look at the level of API support for .NET Core, specifically on the consumption side.

DynamoDB Streams very much shares the Kinesis model, so the biggest leap is in comparing SQS to Kinesis, and how streams are different to traditional message queues.

Comparing Kinesis to SQS

I’ve nabbed some great diagrams from the following blogs/slides which are worth a look:

Simple Queuing Service

Amazon SQS offers a reliable, highly scalable hosted queue for storing messages as they travel between computers. By using Amazon SQS, you can simply move data between distributed application components performing different tasks, without losing messages or requiring each component to be always available.

sqs

Kinesis

Amazon Kinesis is a service for real-time processing of streaming big data. You can push data from many data producers, rapidly and continuously which can be delivered to multiple processing applications. Furthermore, all data is stored in the stream for 24 hours, so you can replay old data back into your applications.

kinesis2.png

Service Limitations

You can get an idea of some of the use cases applicable to each service by looking at their respective limitations:

Screen Shot 2017-09-08 at 10.28.05.png

This Amazon page helps describe some of the use cases for Kinesis:

We recommend Amazon Kinesis Streams for use cases with requirements that are similar to the following:

  • Routing related records to the same record processor (as in streaming MapReduce). For example, counting and aggregation are simpler when all records for a given key are routed to the same record processor.
  • Ordering of records. For example, you want to transfer log data from the application host to the processing/archival host while maintaining the order of log statements.
  • Ability for multiple applications to consume the same stream concurrently. For example, you have one application that updates a real-time dashboard and another that archives data to Amazon Redshift. You want both applications to consume data from the same stream concurrently and independently.
  • Ability to consume records in the same order a few hours later. For example, you have a billing application and an audit application that runs a few hours behind the billing application. Because Amazon Kinesis Streams stores data for up to 7 days, you can run the audit application up to 7 days behind the billing application.

Consumer APIs

I’ve taken some diagrams for this section from the following slides:

Kinesis Streams API

As with all AWS services Amazon provide an SDK for Kinesis in many languages including .NET Core. When consuming from the low level Kinesis Streams API, you must indicate a position in the stream from which to start iterating:

Screen Shot 2017-09-08 at 10.30.13.png

 

 

 

 

 

 

After you have retrieved the initial shard iterator you can keep making calls to GetRecords (up to 10,000 records at a time). Each GetRecordsResponse includes the NextShardIterator that can be used in the subsequent request.

Below is a sample of .NET code using this API:

Screen Shot 2017-09-08 at 10.38.36

The above example however is fairly naive in particular as, although the number of shards is initially configured on creation, it can actually change. When you find that the volume of data grows you may need to consider splitting shards so that data is partitioned across more of them, and vice versa if volume drops away.

The way Kinesis models the splitting and merging of shards is shown below:

Screen Shot 2017-09-08 at 10.41.02.png

Kinesis Client Library

While the low level Kinesis Streams API helps you manage creating streams, resharding, putting and getting records, the KCL provides a layer of abstraction specifically for processing data in a consumer role.

As described in this Amazon page, the KCL performs the following tasks:

  • Connects to the stream
  • Enumerates the shards
  • Coordinates shard associations with other workers (if any)
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the stream
  • Pushes the records to the corresponding record processor
  • Checkpoints processed records
  • Balances shard-worker associations when the worker instance count changes
  • Balances shard-worker associations when shards are split or merged

The balancing of shard-worker associations is managed by maintaining a DynamoDB table of leases. Within a single KCL instance it will use a thread for each worker (one per shard).

To illustrate this, if you have a Kinesis stream with two shards, and you run a single instance of a KCL app, two workers in separate threads will each register their leases in DynamoDB.

If you started running a second instance of the KCL app (there’s a config setting to indicate which ‘app’ is being run), it will (over a number of steps) de-register one of the original workers and re-assign that shard to a worker in the second app. Each KCL instance would then have one thread/worker each.

If you were to run 3 instances in this scenario, one of the instances would sit around waiting for a Worker on one of the other instances to lose its lease.

kcl_dynamo.png

The KCL and .NET

Unfortunately for .NET developers, the KCL is a Java application.

kinesis-libs-are-java

However Amazon have thought of this and included an interface class in the Java KCL called the MultiLangDaemon. This abstracts away all of the management code from the record processing, and allows the record processing code to be written in any language.

The way it does this is to use standard input/output and the following protocol:

multilangdaemon

Despite separating the management of workers and shards, there’s still a bit of boilerplate code that is needed to implement the above protocol. So Amazon have also provided some sample code in .NET that implements this and provides an interface that defines where processing code needs to be implemented:

Screen Shot 2017-09-11 at 14.49.21

DynamoDB Streams

DynamoDB Streams is a feature of DynamoDB (not a separate service) that makes use of the Kinesis model to capture a time-ordered sequence of item-level modifications in any DynamoDB table. Applications can access this log and view the data items as they appeared before and after they were modified, in near real time.

Configuring DynamoDB Streams is much simpler than with Kinesis – since the number of shards (and the merging and splitting of them) is handled behind the scene – so you only have the following options:

  • StreamEnabled—specifies whether a stream is enabled.
  • StreamViewType—specifies the information that will be written to the stream:
    • KEYS_ONLY—only the key attributes of the modified item.
    • NEW_IMAGE—the entire item, as it appears after it was modified.
    • OLD_IMAGE—the entire item, as it appeared before it was modified.
    • NEW_AND_OLD_IMAGES—both the new and the old images of the item.

Although both the Kinesis and DynamoDB Streams low level APIs are very similar they are not 100% identical. However the DynamoDB Streams Kinesis Adapter implements the Kinesis Streams interface, so that the KCL can be used for consuming and processing records from DynamoDB Streams, as the diagram below illustrates:

streams-kinesis-adapter.png

Managing Java dependencies in Visual Studio

Maven

“Apache Maven is a software project management and comprehension tool. Based on the concept of a project object model (POM), Maven can manage a project’s build, reporting and documentation from a central piece of information. Maven comes with a mechanism that your project’s clients can use to download any JARs required for building your project from a central JAR repository.”

Screen Shot 2017-11-07 at 21.16.14
The example above declares a dependency on dynamodb-streams-kinesis-adapter, for vanilla Kinesis we can use amazon-kinesis-client. Note the DynamoDB Streams adapter itself uses v1.7.5 of this.

Visual Studio

So after installing Maven we can use the following command as a post-build step in Visual Studio:
  • mvn dependency:copy-dependencies -DoutputDirectory=bin/Debug/jars
Then to run the Java application (which executes our .NET code as a sub-process) you can use the following command:
  • -cp jars/*::. com.amazonaws.services.dynamodbv2.streamsadapter.StreamsMultiLangDaemon kcl.properties

The -cp option sets the Class Path to include all of the jar files in that directory. Note that on Windows the separator is ‘;’ instead of ‘:’.
The next parameter indicates which class should act as the entry point or main class.
The last parameter is a path to a config file for the KCL.

The kcl.properties file contains many configuration settings required for the KCL, including the executable that it must run as a sub-process. Additionally Java looks by default for a commons-logging.properties file, and uses default settings if none exist. You can specify the name and location of this file with the following java command line option:

  • -Djava.util.logging.config.file=kcl-logging.properties

The kcl-logging.properties file contains a number of options including which library to use and all the other expected formatting options.

And that’s it for now.

There’s a lot more to consider when writing a KCL application, but hopefully this post gives you an idea of how you could make use of Kinesis from a .NET Core context.

 

Leave a comment