Skip to main content

AWS CloudWatch Metric Streams with Amazon Data Firehose

Using Amazon CloudWatch Metric Streams and Amazon Data Firehose, you can get CloudWatch metrics into Scout Collector with only a two to three minute latency. This is significantly faster than polling approach

Step 1: Creating a S3 Bucket

First, We'll create a s3 to bucket to store the metrics

1. Go to S3 Dashboard

S3 Search in Console

2. Click on Create bucket button

S3 Dashboard ScreenShot

3. Enter the bucket name as cloudwatch-metrics-stream-bucket

leave all the other settings to default options.

S3 config page screenshot

4. Scroll down and click on Create bucket

Step 2: Creating a Kinesis Firehose stream

Now, we'll create a kinesis stream which cloudwatch can use to stream metrics

1. Go to Kinsis Firehose Dashboard

Amazon Kinesis Firehose Search in Console

2. Click on Create Firehose Stream button

AWS Kinesis Firehose Dashboard

3. Set up the Sources

  • Select Direct PUT as the input source and S3 as the output.
  • Select the S3 bucket name we created.

Format is s3://<your-bucket-name>

  • Enable New Line Delimiter and leave everything else as default settings.
  • Scroll down and click on Create Firehose Stream. Firehose source config

Step 3: Creating a Metrics Stream pipeline

Now, we'll configure cloudwatch to use the kinesis firehose stream to stream metrics to S3

1. Navigate to Cloudwatch dashboard and

Select streams under Metrics

cloudwatch dashboard

2. Click on Create Metrics Stream

cloudwatch metrics stream dashboard

3. Configuring the Stream

  • Select Custom Setup with Firehose.
  • Change output format to opentelemetry 1.0
  • Select the required metrics.
  • Give a name to the pipeline. Click onCreate Metrics Stream`.

Good Job, Now the Cloudwatch metrics are streaming to a S3 bucket.

Step 4: Creating a lambda function

Now, let's create a lambda function to read from the s3 and send it to otel collector

1. Create a layer with all the necessary packages

mkdir python 
# move into that directory
cd python

# install requests module
pip install --target . requests opentelemetry-proto protobuf
# zip the contents under the name dependencies.zip
zip -r dependencies.zip ../python

2. Navigate to AWS Lambda dashboard and click on Layers

lambda dashboard

  • Click on Create layer button

3. Fill the necessary detials and update the zip file

create lambda page

4. Naviagte to functions page and Click on Create function button

lambda functions page

  • Select Author from scratch.

  • Give a funciton name.

  • Choose python x.x as the runtime.

  • Select x86_64 as the Architecture.

  • Once the function is created, follow the below steps to configure it,

  • Click on the Configuration tab and then click on permissions.

  • Click on the Role name and give S3 Full access for the above created bucket.

  • Click on Code and scroll to add a new layer.

  • Click on Add Layer.

  • Select Custom Layer and choose the layer that we created.

  • Navigate back to the code and click on Add trigger.

  • Select S3 as the source and select the bucket from dropdown.

  • Click on Add.

  • Navigate to Configuration and then to Environment variables.

  • Click on edit and these two environment variables with correct values. (OTEL_COLLECTOR_URL, S3_BUCKET_NAME, OTEL_SERVICE_NAME).

Now the actual part, copy the below code into the code source in your lambda function.

import boto3
import requests
import os
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
from google.protobuf.internal.decoder import _DecodeVarint32


s3 = boto3.client('s3')
client_id=os.environ.get('CLIENT_ID')
client_secret=os.environ.get('CLIENT_SECRET')
token_url=os.environ.get('TOKEN_URL')
endpoint_url=os.environ.get('ENDPOINT_URL')

def lambda_handler(event, context):
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
file_key = record['s3']['object']['key']
print(f"Processing file: {file_key}")
file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
buffer = file_obj['Body'].read()
pos = 0
buffer_len = len(buffer)

request_count = 0
success_count = 0
headers = {}

# Ensure the content-type is set
headers["Content-Type"] = "application/x-protobuf"
# Process all messages in the buffer
while pos < buffer_len:
# Decode the varint (message length)
msg_len, new_pos = _DecodeVarint32(buffer, pos)
pos = new_pos

# Extract the message bytes
msg_buf = buffer[pos : pos + msg_len]
# Parse the message to validate it (optional)
request = ExportMetricsServiceRequest()
request.ParseFromString(msg_buf)

for resource_metric in request.resource_metrics:
if resource_metric.resource:
resource_metric.resource.attributes.add(
key="service.name",
value={"string_value": "aws-cloudwatch-stream18may"}
)
resource_metric.resource.attributes.add(
key="service.uname",
value={"string_value": "aws-cloudwatch-stream18may"}
)

# Send the raw serialized message


token_response = requests.post(
token_url,
data={
"grant_type": "client_credentials",
"audience": "b14collector", # Optional, based on your provider
},
auth=(client_id, client_secret),
verify=False, # Optional: set to False to skip TLS cert verification (NOT RECOMMENDED for prod)
)
token_response.raise_for_status()
access_token = token_response.json()["access_token"]
headers["Authorization"] = f"Bearer {access_token}"

response = requests.post(
endpoint_url,
data=request.SerializeToString(),
headers=headers,
verify=False,
)
if resource.status_code == 200:
print("Metrics successfully forwarded.")
else:
print(
f"Failed to send metrics. Status: {response.status_code}, "
f"Response: {response.text}"
)
pos += msg_len
  • Click on the Deploy

That's it, you're done

Head back to the Scout dashboards to view all your AWS Services metrics.