Skip to content

Latest commit

 

History

History

eventhubs-streamanalytics-eventhubs

topic languages products statusNotificationTargets
sample
azurecli
json
sql
azure
azure-container-instances
azure-event-hubs
azure-stream-analytics
algattik@microsoft.com

Streaming at Scale with Azure Event Hubs and Stream Analytics

This sample uses Stream Analytics to process streaming data from EventHub and uses another Event Hub as a sink to store JSON data. This is done to analyze pure streaming performances of Stream Analytics; no aggregation is done and data is passed as fast as possible from the input to the output. Data is augmented by adding additional fields.

To support very high throughput, two different Event Hubs namespaces are deployed by the template. Event Hubs can support streaming workloads from a few MB/s to several GB/s scale. Therefore, you need to choose the appropriate SKU and create the namespace as per your data volume needs.

The provided scripts will create an end-to-end solution complete with load test client.

Running the Scripts

Please note that the scripts have been tested on Ubuntu 18 LTS, so make sure to use that environment to run the scripts. You can run it using Docker, WSL or a VM:

The following tools/languages are also needed:

  • Azure CLI
    • Install: sudo apt install azure-cli
  • jq
    • Install: sudo apt install jq

Setup Solution

Make sure you are logged into your Azure account:

az login

and also make sure you have the subscription you want to use selected

az account list

if you want to select a specific subscription use the following command

az account set --subscription <subscription_name>

once you have selected the subscription you want to use just execute the following command

./create-solution.sh -d <solution_name>

then solution_name value will be used to create a resource group that will contain all resources created by the script. It will also be used as a prefix for all resource create so, in order to help to avoid name duplicates that will break the script, you may want to generated a name using a unique prefix. Please also use only lowercase letters and numbers only, since the solution_name is also used to create a storage account, which has several constraints on characters usage:

Storage Naming Conventions and Limits

to have an overview of all the supported arguments just run

./create-solution.sh

Note To make sure that name collisions will be unlikely, you should use a random string to give name to your solution. The following script will generated a 7 random lowercase letter name for you:

./generate-solution-name.sh

Created resources

The script will create the following resources:

  • Azure Container Instances to host Spark Load Test Clients: by default one client will be created, generating a load of 1000 events/second
  • Event Hubs Namespace, Hub and Consumer Group: to ingest data incoming from test clients and to store data generated by Stream Analytics
  • Stream Analytics: to process analytics on streaming data

Streamed Data

Streamed data simulates an IoT device sending the following JSON data:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 57.739726013343247,
        "moreData1": 52.230732688620829,
        "moreData2": 57.497518587807189,
        "moreData3": 81.32211656749469,
        "moreData4": 54.412361539409427,
        "moreData5": 75.36416309399911,
        "moreData6": 71.53407865773488,
        "moreData7": 45.34076957651598,
        "moreData8": 51.3068118685458,
        "moreData9": 44.44672606436184,
        [...]
    },
    "value": 49.02278128887753,
    "deviceId": "contoso-device-id-000154",
    "deviceSequenceNumber": 0,
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Duplicate event handling

The solution does not perform event deduplication. In order to illustrate the effect of this, the event simulator is configured to randomly duplicate a small fraction of the messages (0.1% on average). Those duplicate events will be present in the destination Event Hub.

Anomaly detection

By default, the solution will set up a simple data movement job. Optionally, you can configure the job to perform anomaly detection:

./create-solution.sh -a anomalydetection

The sample uses Stream Analytics to perform anomaly detection on the streaming data.

The deployed Stream Analytics solution performs spike and dip detection using the built-in AnomalyDetection_SpikeAndDip function. You can also change the anomaly detection function to AnomalyDetection_ChangePoint by editing the query in stream-analytics-job-anomalydetection-arm-template.json.

Data is made available in the created Event Hub output.

Solution customization

If you want to change some setting of the solution, like number of load test clients, event hubs TU and so on, you can do it right in the create-solution.sh script, by changing any of these values:

export EVENTHUB_PARTITIONS=2
export EVENTHUB_CAPACITY=2
export PROC_STREAMING_UNITS=3
export SIMULATOR_INSTANCES=1

The above settings have been chosen to sustain a 1,000 msg/s stream. The script also contains settings for 5,000 msg/s and 10,000 msg/s.

Monitor performance

Please use Metrics pane in Stream Analytics, see "Input/Output Events" for throughput and "Watermark Delay" metric to see if the job is keeping up with the input rate. You can also use Event Hub "Metrics" pane to see if there are any "Throttled Requests" and adjust the Threshold Units accordingly. "Watermark Delay" is one of the key metric that will help you to understand if Stream Analytics is keeping up with the incoming data. If delay is constantly increasing, you need to take a look at the destination to see if it can keep up with the speed or check if you need to increase SU: https://azure.microsoft.com/en-us/blog/new-metric-in-azure-stream-analytics-tracks-latency-of-your-streaming-pipeline/.

The deployment script will also report performance, by default every minute for 30 minutes:

***** [M] Starting METRICS reporting
Event Hub capacity: 2 throughput units (this determines MAX VALUE below).
Reporting aggregate metrics per minute, offset by 2 minutes, for 30 minutes.
                        Event Hub #   IncomingMessages  IncomingBytes  OutgoingMessages   OutgoingBytes  ThrottledRequests
                        -----------   ----------------  -------------  ----------------   -------------  -----------------
              MAX VALUE                         120000      120000000            491520       240000000                  -
                        -----------   ----------------  -------------  ----------------   -------------  -----------------
    2019-10-03T07:57:00           1                  0              0                 0               0                  0
    2019-10-03T07:57:00           2                  0              0                 0               0                  0
    2019-10-03T07:58:00           1              24050       22809797             24050        22809797                  0
    2019-10-03T07:58:00           2                  0              0                 0               0                  0
    2019-10-03T07:59:01           1              60037       56940526             60037        56940526                  0
    2019-10-03T07:59:01           2                341       62393762                 0               0                  0
    2019-10-03T08:00:00           1              60090       56989878             60090        56989878                  0
    2019-10-03T08:00:00           2                375       65683281                 0               0                  0
    2019-10-03T08:01:00           1              60036       56940643             60036        56940643                  0
    2019-10-03T08:01:00           2                376       65708824                 0               0                  0

In column "Event Hub #", 1 refers to the Event Hub used as input to Stream Analytics, and 2 to the Event Hub used as output. After a few minutes of ramp-up, the metrics for Event Hub 1 will show around 60k events/min (depending on selected event rate, here 1k events/s). As Stream Analytics batches up messages when outputting to Event Hubs, the rate in events/minute on Event Hub 2 will be much lower, but you can see from the Incoming Bytes metric that the data rate on both event hubs is similar.

Stream Analytics

Note that the solution configurations have been verified with compatibility level 1.2. The deployed Stream Analytics solution doesn't do any analytics or projection, but it just inject an additional field using a simple Javascript UDF:

select 
    *, 
    UDF.GetCurrentDateTime('') AS ASAProcessedUtcTime
from 
    inputEventHub partition by PartitionId

Query Data

Data is available in the created Event Hub output. You can use the tool available in ./tools/eh-asa-perfmon to measure the performance in terms of latency between data received and data sent to the output.

Clean up

To remove all the created resource, you can just delete the related resource group

az group delete -n <resource-group-name>