Microsoft have recently released into public preview workspace data collection rules that allow basic KQL queries to be applied to the incoming data before it is stored in the workspace. This provides an ideal way to drop unwanted fields and rows where filtering at the source either isn’t practical or isn’t possible.
The table transformations aren’t limited to filtering data, you can also create custom columns for specific information. This should speed up querying for certain commonly queried fields that you may otherwise have to parse out of a dynamic array at query time.
This article is going to focus on optimising the storage of Palo Alto CEF logs but this applied to most CEF log sources in Sentinel.
CEF Introduction
CEF is a log format that defines a common dictionary to map multiple different log formats into a common schema. You can see how CEF fields map into the CommonSecurityLog table in Sentinel here (https://docs.microsoft.com/en-us/azure/sentinel/cef-name-mapping). The problem we are addressing is the custom fields, CEF defines a number of custom string, integer and other fields that log vendors can choose how to map. This is achieved by using a label field and the field that consumes the data.
As an example, the Palo Alto rule field gets mapped into DeviceCustomString1. This results in the DeviceCustomString1Label being “Rule” for nearly every log inflating the billed size. As most of the time these fields are mapped into the same or predictable values for a given vendor, we can write a transform to parse these fields into custom fields. This has the added benefit of making the fields more intuitive for the analysts using the logs as well as providing them with a descriptive name rather than the generic DeviceCustomString1 title.
Limitations of Transforms
Currently, the query operators supported by the transforms is limited and in some cases may operate differently than they typically do in the log analytics workspace.For example, pack requires constants you can’t reference other columns. This makes the solution we’ve used much more complicated than it would have been if all the operators were available. Microsoft provides a useful reference of supported functionality here (https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/data-collection-transformations-structure#kql-limitations).
Implementing Transform
We’re not going to go over how to implement the transform step by step as Microsoft has its’ own guide and this is likely going to change with preview functionality (https://docs.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-workspace-transformations-portal) However, we’ve included an example of one we’ve previously done for Palo Alto below. The way this transform has been written means that any CEF fields that aren’t mapped will fall back to the DeviceCustomX / DeviceCustomXLabel pairing.
You can use a query similar to this one to monitor the difference it has made. In Bridewell’s experience, we’ve found a 17% reduction in ingest size for one of our customers which is a considerable saving for just stripping out redundant metadata.
CommonSecurityLog
| where DeviceVendor contains “Palo”
| where TimeGenerated > ago(1h)
| extend IngestTime= ingestion_time()
| extend Diff = IngestTime – TimeGenerated
| summarize AvgSize=avg(_BilledSize), avgIngestTime=avg(Diff), MaxIngestTime=max(Diff), TotalSizeGB=sum(_BilledSize)/1024/1024/1024, LogCount=count() by bin(TimeGenerated, 5m)
For more information, please get in touch with one of our team. More information can also be found directly from Microsoft here: https://docs.microsoft.com/en-us/azure/sentinel/data-transformation
Transform Appendix
// We define the known keys that we explicitly parse into custom fields, this is used later on to “zero” the values of any of the ones we’ve mapped.
let known_keys = parse_json(‘[“Rule”, “Category”, “Source_Zone“, “Destination_Zone“, “Direction”, “Virtual System”, “Flags”, “SessionID“, “Packets”, “Elapsed_time_in_seconds“, “Destloc“, “Srcloc“, “Destination Zone”, “Source Zone”, “Data Source Name”, “Factor Type”, “URL Category”, “Data Source”, “Data Source Type”, “Factor Number”, “Virtual System ID”, “Timeout Threshold”, “CS1”, “CS2”, “CS3”, “CS4”, “CS5”, “CS6”, “CN1”, “CN2”, “CN3”, “FS1”, “FS2”]’);
source
// as we can’t use pack we have to make sure each dict key when we manually concat is unique.
| extend DeviceCustomString1Label = iff(isempty(DeviceCustomString1Label), “CS1”, DeviceCustomString1Label)
| extend DeviceCustomString2Label = iff(isempty(DeviceCustomString2Label), “CS2”, DeviceCustomString2Label)
| extend DeviceCustomString3Label = iff(isempty(DeviceCustomString3Label), “CS3”, DeviceCustomString3Label)
| extend DeviceCustomString4Label = iff(isempty(DeviceCustomString4Label), “CS4”, DeviceCustomString4Label)
| extend DeviceCustomString5Label = iff(isempty(DeviceCustomString5Label), “CS5”, DeviceCustomString5Label)
| extend DeviceCustomString6Label = iff(isempty(DeviceCustomString6Label), “CS6”, DeviceCustomString6Label)
| extend DeviceCustomNumber1Label = iff(isempty(DeviceCustomNumber1Label), “CN1”, DeviceCustomNumber1Label)
| extend DeviceCustomNumber2Label = iff(isempty(DeviceCustomNumber2Label), “CN2”, DeviceCustomNumber2Label)
| extend DeviceCustomNumber3Label = iff(isempty(DeviceCustomNumber3Label), “CN3”, DeviceCustomNumber3Label)
| extend FlexString1Label = iff(isempty(FlexString1Label), “FS1”, FlexString1Label)
| extend FlexString2Label = iff(isempty(FlexString2Label), “FS2”, FlexString2Label)
// We create a json string with strcat to work around limitations with pack in this context (pack requires string constants in this runtime)
| extend f = strcat(‘{“‘, columnifexists(“DeviceCustomString1Label”, “CS1″), ‘”:”‘, DeviceCustomString1, ‘”,”‘, columnifexists(“DeviceCustomString2Label”, “CS2″), ‘”:”‘, DeviceCustomString2, ‘”,”‘, columnifexists(“DeviceCustomString3Label”, “CS3″), ‘”:”‘, DeviceCustomString3, ‘”,”‘, columnifexists(“DeviceCustomString4Label”, “CS4″), ‘”:”‘, DeviceCustomString4, ‘”,”‘, columnifexists(“DeviceCustomString5Label”, “CS5″), ‘”:”‘, DeviceCustomString5, ‘”,”‘, columnifexists(“DeviceCustomString6Label”, “CS6″), ‘”:”‘, DeviceCustomString6, ‘”,”‘, columnifexists(“DeviceCustomNumber1Label”, “CN1″), ‘”:”‘, DeviceCustomNumber1, ‘”,”‘, columnifexists(“DeviceCustomNumber2Label”, “CN2″), ‘”:”‘, DeviceCustomNumber2, ‘”,”‘, columnifexists(“DeviceCustomNumber3Label”, “CN3″), ‘”:”‘, DeviceCustomNumber3, ‘”,”‘, columnifexists(“FlexString1Label”, “FS1″), ‘”:”‘, FlexString1, ‘”,”‘, columnifexists(“FlexString2Label”, “FS2″), ‘”:”‘, FlexString2, ‘”}’)
// parses the json string into a dynamic object
| extend f = parse_json(f)
// Maps custom columns to the keys in the dict
| extend Rule_CF = tostring(f[“Rule”])
| extend Category_CF = tostring(f[“Category”])
| extend UrlCategory_CF = tostring(f[“URL Category”])
| extend SourceZone_CF=strcat(f[“Source_Zone“], f[“Source Zone”])
| extend DestinationZone_CF = strcat(f[“Destination_Zone“], f[“Destination Zone”])
| extend Direction_CF = tostring(f[“Direction”])
| extend VirtualSystem_CF = tostring(f[“Virtual System”])
| extend Flags_CF = tostring(f[“Flags”])
| extend SessionID_CF = toint(f[“SessionID“])
| extend PacketCount_CF = toint(f[“Packets”])
| extend Elapsed_time_in_seconds_CF = toint(f[“Elapsed_time_in_seconds“])
| extend Destloc_CF = tostring(f[“Destloc”])
| extend Srcloc_CF = tostring(f[“Srcloc“])
| extend FactorType_CF = tostring(f[“Factor Type”])
| extend DataSoureName_CF = tostring(f[“Data Source Name”])
| extend DataSource_CF = tostring(f[“Data Source”])
| extend DataSourceType_CF = tostring(f[“Data Source Type”])
| extend FactorNumber_CF = tostring(f[“Factor Number”])
| extend VirtualSystemID_CF = tostring(f[“Virtual System ID”])
| extend TimeoutThreshold_CF = tostring(f[“Timeout Threshold”])
| extend LogProfile_CF = tostring(f[“LogProfile“])
// If we have parsed into it’s own field zero the relevant columns, if it’s a new column / not mapped we’ll keep the original value
| extend DeviceCustomString1 = iff(DeviceCustomString1Label in (known_keys), “”, DeviceCustomString1)
| extend DeviceCustomString2 = iff(DeviceCustomString2Label in (known_keys), “”, DeviceCustomString2)
| extend DeviceCustomString3 = iff(DeviceCustomString3Label in (known_keys), “”, DeviceCustomString3)
| extend DeviceCustomString4 = iff(DeviceCustomString4Label in (known_keys), “”, DeviceCustomString4)
| extend DeviceCustomString5 = iff(DeviceCustomString5Label in (known_keys), “”, DeviceCustomString5)
| extend DeviceCustomString6 = iff(DeviceCustomString6Label in (known_keys), “”, DeviceCustomString6)
// iff requires both values to be string, tried to create another collumn and fails to save with these lines.
//| extend DeviceCustomNumber1 = iff(DeviceCustomNumber1Label in (known_keys), “”, tostring(DeviceCustomNumber1))
//| extend DeviceCustomNumber2 = iff(DeviceCustomNumber2Label in (known_keys), “”, tostring(DeviceCustomNumber2))
//| extend DeviceCustomNumber3 = iff(DeviceCustomNumber3Label in (known_keys), “”, tostring(DeviceCustomNumber3))
| extend FlexString1 = iff(FlexString1Label in (known_keys), “”, FlexString1)
| extend FlexString2 = iff(FlexString2Label in (known_keys), “”, FlexString2)
| extend DeviceCustomString1Label = iff(DeviceCustomString1Label in (known_keys), “”, DeviceCustomString1Label)
| extend DeviceCustomString2Label = iff(DeviceCustomString2Label in (known_keys), “”, DeviceCustomString2Label)
| extend DeviceCustomString3Label = iff(DeviceCustomString3Label in (known_keys), “”, DeviceCustomString3Label)
| extend DeviceCustomString4Label = iff(DeviceCustomString4Label in (known_keys), “”, DeviceCustomString4Label)
| extend DeviceCustomString5Label = iff(DeviceCustomString5Label in (known_keys), “”, DeviceCustomString5Label)
| extend DeviceCustomString6Label = iff(DeviceCustomString6Label in (known_keys), “”, DeviceCustomString6Label)
| extend DeviceCustomNumber1Label = iff(DeviceCustomNumber1Label in (known_keys), “”, tostring(DeviceCustomNumber1Label))
| extend DeviceCustomNumber2Label = iff(DeviceCustomNumber2Label in (known_keys), “”, tostring(DeviceCustomNumber2Label))
| extend DeviceCustomNumber3Label = iff(DeviceCustomNumber3Label in (known_keys), “”, tostring(DeviceCustomNumber3Label))
| extend FlexString1Label = iff(FlexString1Label in (known_keys), “”, FlexString1Label)
| extend FlexString2Label = iff(FlexString2Label in (known_keys), “”, FlexString2Label)
| project-away f
For more information on how Bridewell’s services can support your organisation, get in touch with our team for a confidential conversation.