Streaming Logs From Azure Event Grid to Log Analytics Workspace

Streaming Logs From Azure Event Grid to Log Analytics Workspace
Photo by Dorelys Smits / Unsplash

I use Azure Sentinel and the way that Sentinel is able to analyze logs is through a Log Analytics Workspace. Some third-party platforms allow log streaming to Event Grid. So I needed a way to get my logs from Event Grid into the Log Analytics workspace.

You might think this would be a simple task…moving data from one Azure resource to another. But it turns out there is no native way to do this. Additionally, I was not able to find much documentation or any guides for this process either, so here I am writing this up for anyone who cares.

I based this flow largely on the GitHub Scan Audit connector that is available via the Sentinel Content Hub.

Overview

In a nutshell, the process is to create an Event Subscription on the Event Grid that will trigger a Function App that will deliver logs to the Log Analytics Workspace API.

Third-Party Platform --> Event Grid --> Function App --> Log Analytics Workspace

Assumptions

This guide does not cover actually getting logs from a third-party platform to Event Grid. In most cases, the third-party vendor will have documentation for this process.

Your permission model or other proclivities of your Azure environment may make some of these steps look a little different.

Process

Log Analytics Workspace and Sentinel

If you do not already have these resources deployed, do so. The deployment of those resources is beyond the scope of this document.

Under Settings > Tables, create a new table (MMA-Based). This will require a sample log output. Hopefully you can get this via the third-part platform's documentation but you may also have to mock up an API call to get the log format. Save the format as a file and upload. Set the appropriate delimiter; in most cases "new line" will suffice. For the collection path we can use a dummy path, so select Linux and set the path to /tmp. Set an appropriate log name and create.

To get the log sample, as an alternative method, you can create the Function as described below and utilize the default code that is generated to print out logs. You can then view this output via the Portal in the Function's Monitor tab.

Function App

First you need to create a function app, this is the top lever resource that will hold the actual function that will be shipping the logs. This can be done following the process below in the Function section via the Microsoft Guide or done via the Portal.

Then, navigate to your Sentinel Log Analytics Workspace and then to the Agents menu under Settings. Click the "Log Analytics agent instructions" dropdown and note the Workspace ID and Primary Key.

Within the Function App that you created, under Settings > Configuration, create two new Application Settings WorkspaceID and WorkspaceKey and input the respective values noted previously.

Function Deployment

To deploy the actual function, utilize this guide to set up a development environment in Visual Studio Code. While this may seem onerous, this was the only way that I found that I was able to meet all the necessary requirements for deployment.

As a note for the above, be sure to pay attention to what panel you are in on the side panel in VSCode. Edit in the Editor panel, deploy in the Azure panel.

In the editor, you will need to do two things.

  1. Create a requirements.txt file in the root of the folder
  2. Add the parsing code in __init__.py. Replace the sentinel_log_type variable with your table name from the LAW minus the _CL.

Both sets of code are below.

Once the code is updated, deploy the Function via the steps in the linked guide.

Configure Event Grid Subscription

With the Function deployed, we now need to get events to the Function. Navigate to the Event Grid that is receiving logs and then to Entities > Event Subscriptions. Create a new event subscription and select Azure Function as the Endpoint Type. Click the "Select an endpoint" link and drill down to your newly created function. Create.

At this stage, all the necessary plumbing should be in place to ship your logs as desired.

Testing

Navigate to your Function App > Functions > Your Function > Monitor to see execution status and any emitted errors. Perform normal troubleshooting on your code and redeploy as needed based on the process above. Be sure to have a way to trigger events in your third-party platform if they are not frequent. Once you have successful execution, then confirm your logs are available in your Log Analytics Workspace.

Code

# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
azure-functions
requests~=2.31.0
requirements.txt
import datetime
import requests
import logging
import os
import json
import hashlib
import hmac
import base64
import azure.functions as func
import re

sentinel_customer_id = os.environ.get('WorkspaceID')
sentinel_shared_key = os.environ.get('WorkspaceKey')
# Input the name of the table from your LAW here, minus the _CL
sentinel_log_type =  'nameFromAnalyticsWorkspace'
logging.info("Sentinel Logtype:{}".format(sentinel_log_type))

# this function app is fired based on the Event Grid
# it is used to capture all the events from Third Party Platform
def main(event: func.EventGridEvent):
    logging.info('Info: Third Party Platform Event Grid data connector started')
    logging.info("Sentinel Logtype:{}".format(sentinel_log_type))
    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })
    body = json.dumps(event.get_json())
    logging.info("Info:Converted input json to dict and further to json")
    logging.info(body)
    try:
        post_data(sentinel_customer_id, sentinel_shared_key, body, sentinel_log_type)
        logging.info("Info: Third Party Platform Event Grid data connector execution completed successfully.")
    except Exception as err:
        logging.error("Something wrong. Exception error text: {}".format(err))
        logging.error( "Error: Third Party Platform Event Grid data connector execution failed with an internal server error.")
        raise
    
#####################
######Functions######
#####################
# Build the API signature
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
    x_headers = 'x-ms-date:' + date
    string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
    bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
    decoded_key = base64.b64decode(shared_key)
    encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
    authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)
    return authorization
    
# Build and send a request to the POST API
def post_data(customer_id, shared_key, body, log_type):
    method = 'POST'
    content_type = 'application/json'
    resource = '/api/logs'
    currentdate = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    content_length = len(body)
    signature = build_signature(customer_id, shared_key, currentdate, content_length, method, content_type, resource)
    uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01'
    headers = {
        'content-type': content_type,
        'Authorization': signature,
        'Log-Type': log_type,
        'x-ms-date': currentdate
    }
    
    response = requests.post(uri,data=body, headers=headers)
    
    if (response.status_code >= 200 and response.status_code <= 299):
    	print('Info:Event was injected into Third Party Platform')
    	logging.info('Info:Event was injected into Third Party Platform')
    elif (response.status_code == 401):
    	logging.error("The authentication credentials are incorrect or missing. Error code: {}".format(response.status_code))
    	raise Exception("Invalid or missing credentials")
    else:
    	print("Response code: {}".format(response.status_code))
    	logging.info("Info:Response code: {}, {}".format(response.status_code, response.text))
    	raise Exception("Something went wrong.")
__init__.py