AWS CloudWatch Metric Streams with Amazon Data Firehose
Using Amazon CloudWatch Metric Streams and Amazon Data Firehose, you can get CloudWatch metrics into Scout Collector with only a two to three minute latency. This is significantly faster than polling approach
Step 1: Creating a S3 Bucket
First, We'll create a s3 to bucket to store the metrics
1. Go to S3 Dashboard
2. Click on Create bucket
button
3. Enter the bucket name as cloudwatch-metrics-stream-bucket
leave all the other settings to default options.
4. Scroll down and click on Create bucket
Step 2: Creating a Kinesis Firehose stream
Now, we'll create a kinesis stream which cloudwatch can use to stream metrics
1. Go to Kinsis Firehose Dashboard
2. Click on Create Firehose Stream
button
3. Set up the Sources
- Select
Direct PUT
as the input source andS3
as the output. - Select the S3 bucket name we created.
Format is
s3://<your-bucket-name>
- Enable
New Line Delimiter
and leave everything else as default settings. - Scroll down and click on
Create Firehose Stream
.
Step 3: Creating a Metrics Stream pipeline
Now, we'll configure cloudwatch to use the kinesis firehose stream to stream metrics to S3
1. Navigate to Cloudwatch dashboard and
Select streams under Metrics
2. Click on Create Metrics Stream
3. Configuring the Stream
- Select
Custom Setup with Firehose
. - Change output format to
opentelemetry 1.0
- Select the required metrics.
- Give a name to the pipeline.
Click on
Create Metrics Stream`.
Good Job, Now the Cloudwatch metrics are streaming to a S3 bucket.
Step 4: Creating a lambda function
Now, let's create a lambda function to read from the s3 and send it to otel collector
1. Create a layer with all the necessary packages
mkdir python
# move into that directory
cd python
# install requests module
pip install --target . requests opentelemetry-proto protobuf
# zip the contents under the name dependencies.zip
zip -r dependencies.zip ../python
2. Navigate to AWS Lambda dashboard and click on Layers
- Click on
Create layer
button
3. Fill the necessary detials and update the zip file
4. Naviagte to functions page and Click on Create function
button
-
Select
Author from scratch
. -
Give a funciton name.
-
Choose
python x.x
as the runtime. -
Select
x86_64
as the Architecture. -
Once the function is created, follow the below steps to configure it,
-
Click on the
Configuration
tab and then click onpermissions
. -
Click on the Role name and give S3 Full access for the above created bucket.
-
Click on
Code
and scroll to add a new layer. -
Click on
Add Layer
. -
Select
Custom Layer
and choose the layer that we created. -
Navigate back to the code and click on
Add trigger
. -
Select
S3
as the source and select the bucket from dropdown. -
Click on
Add
. -
Navigate to
Configuration
and then toEnvironment variables
. -
Click on
edit
and these two environment variables with correct values. (OTEL_COLLECTOR_URL
,S3_BUCKET_NAME
,OTEL_SERVICE_NAME
).
Now the actual part, copy the below code into the code source
in your lambda function.
import boto3
import requests
import os
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
from google.protobuf.internal.decoder import _DecodeVarint32
s3 = boto3.client('s3')
client_id=os.environ.get('CLIENT_ID')
client_secret=os.environ.get('CLIENT_SECRET')
token_url=os.environ.get('TOKEN_URL')
endpoint_url=os.environ.get('ENDPOINT_URL')
def lambda_handler(event, context):
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
file_key = record['s3']['object']['key']
print(f"Processing file: {file_key}")
file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
buffer = file_obj['Body'].read()
pos = 0
buffer_len = len(buffer)
request_count = 0
success_count = 0
headers = {}
# Ensure the content-type is set
headers["Content-Type"] = "application/x-protobuf"
# Process all messages in the buffer
while pos < buffer_len:
# Decode the varint (message length)
msg_len, new_pos = _DecodeVarint32(buffer, pos)
pos = new_pos
# Extract the message bytes
msg_buf = buffer[pos : pos + msg_len]
# Parse the message to validate it (optional)
request = ExportMetricsServiceRequest()
request.ParseFromString(msg_buf)
for resource_metric in request.resource_metrics:
if resource_metric.resource:
resource_metric.resource.attributes.add(
key="service.name",
value={"string_value": "aws-cloudwatch-stream18may"}
)
resource_metric.resource.attributes.add(
key="service.uname",
value={"string_value": "aws-cloudwatch-stream18may"}
)
# Send the raw serialized message
token_response = requests.post(
token_url,
data={
"grant_type": "client_credentials",
"audience": "b14collector", # Optional, based on your provider
},
auth=(client_id, client_secret),
verify=False, # Optional: set to False to skip TLS cert verification (NOT RECOMMENDED for prod)
)
token_response.raise_for_status()
access_token = token_response.json()["access_token"]
headers["Authorization"] = f"Bearer {access_token}"
response = requests.post(
endpoint_url,
data=request.SerializeToString(),
headers=headers,
verify=False,
)
if resource.status_code == 200:
print("Metrics successfully forwarded.")
else:
print(
f"Failed to send metrics. Status: {response.status_code}, "
f"Response: {response.text}"
)
pos += msg_len
- Click on the
Deploy
That's it, you're done
Head back to the Scout dashboards to view all your AWS Services metrics.