- Notifications
You must be signed in to change notification settings - Fork 513
mimecast: log processing stage and improve document fingerprinting #9078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e0f6ff3 to b54dd4d Compare 🚀 Benchmarks reportTo see the full report comment with |
| Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
| I've added some Go code I used to determine each Mimecast log type to this support ticket #01554878. I used more than 1 field for redundancy in case the field was missing. This code was running for 3-4 years and we didn't spot any problems. Journal logs are important as they are for internal emails between staff, but journal logs aren't documented on the "Understanding SIEM logs" page. The code I've provided shows how to detect journal logs. Thanks |
Where is this? |
In support ticket #01554878 or emailed to Jamie. |
| Summary: |
| Hi, Just seen they have added a few more log types which makes this more complicated. It's more work but it does make the logs easier to use. I'll email mimecast to ask if they will add the log type field, but they normally ignore customer requests. headerFrom|SpamLimit|Error -> receipt I recommend full regression testing using the sample logs on this page. We use Mimecast to support Data Loss Prevention and phishing incident response processes so we need to guarantee there's no data loss. https://integrations.mimecast.com/documentation/tutorials/understanding-siem-logs/ |
| We were getting all the logs listed in the Understanding SIEM logs page through this endpoint > /api/audit/get-siem-logs However I can see mimecast also have several other endpoints for url, impersonation and attachment protect. If using those endpoints then setting the log type field can be done in the respective ingest pipeline. However there siem logs endpoint also might send the same TTP logs. It's not clear from their documentation. https://integrations.mimecast.com/documentation/endpoint-reference/logs-and-statistics/ I've got a python script I can use to try and work this out, and maybe provide some sample data. Might be later today. |
| Test case generator here https://go.dev/play/p/U6YWZHJHKHJ |
chrisberkhout left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the log_type value is extracted from the file name in the Content-Disposition header of the response. We request compressed data and can get a zip file of json log files, which HTTP JSON will merge, losing those log file names and with them their log types. So then we use heuristics to determine the type, and store that in stage.
It would have been nice if they had a type field (and possibly a stage field) in the contents of their log messages.
If I was building this I would strongly consider not requesting zip files, so we always get the type from the file name. There could still be http compression.
Although three "stages" are discussed in the documentation, those stages can be inferred from the type, and we're setting stage to a larger number of values that seem to be types.
Should our stage field really be separate from type?
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
| The background for the change is largely in the issue. This is where the justification for the larger number of categorisations lives. |
I skimmed the issue before but re-reading it I'm still not sure. In the following table of what I could see in the PR and the documentation, aren't we only adding I just want to be clear about whether they're trying to identify the same categories or not. If they are the same thing, it might still be good to keep both because they are populated using different methods and results may vary slightly.
Update: the table above isn't quite right. Please note:
|
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
packages/mimecast/data_stream/siem_logs/elasticsearch/ingest_pipeline/default.yml Outdated Show resolved Hide resolved
| @efd6 Mimecast have given us access to their API. If you need access to test against, happy to provide. |
| Just wondering if this is fixed? |
| This has not been merged yet. |
The current ingest pipeline does not effectively distinguish documents that have come from the three stages of Mimecast email processing[1]: receipt, process and delivery. This can result in documents from different stages being given the same document fingerprint and a subsequent ingest version error. So detect the stage of the event and use it and a small set of distinguishing fields to ensure we don't collide documents. The heuristics for stage detection and the set of fields chosen for fingerprinting are based on the documentation at [1]. [1]https://integrations.mimecast.com/documentation/tutorials/understanding-siem-logs/
New test cases constructed with the following code: package main import ( "bufio" "bytes" "encoding/json" "log" "os" "strconv" "strings" ) func main() { // Obtained from https://integrations.mimecast.com/documentation/tutorials/understanding-siem-logs/ // Omit error cases as they cannot be included in tests. const messages = ` datetime=2017-05-26T16:47:41+0100|aCode=7O7I7MvGP1mj8plHRDuHEA|acc=C0A0|SpamLimit=0|IP=123.123.123.123|Dir=Internal|MsgId=<messageId@messageId>|Subject=\message subject\|headerFrom=from@mimecast.com|Sender=from@mimecast.com|Rcpt=auser@mimecast.com|SpamInfo=[]|Act=Acc|TlsVer=TLSv1|Cphr=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA|SpamProcessingDetail={"spf":{"info":"SPF_FAIL","allow":true},"dkim":{"info":"DKIM_UNKNOWN","allow":true}}|SpamScore=1 #datetime=2017-05-26T17:01:36+0100|aCode=cx9u0J0pOJGscX_KPpilkg|acc=C0A0|IP=123.123.123.123|RejType=\Invalid Recipient Address\|Error=\Failed Known address verification\|RejCode=550|Dir=Inbound|headerFrom=|Sender=from@domain.com|Rcpt=auser@mimecast.com|Act=Rej|RejInfo=\Invalid Recipient\|TlsVer=TLSv1|Cphr=TLS_DHE_RSA_WITH_AES_256_CBC_SHA datetime=2017-05-26T19:36:48+0100|aCode=BY81J52RPjSmp7MrubnlZg|acc=C0A0|AttSize=1267|Act=Acc|AttCnt=2|AttNames=\"filename.docx", "filename2.xlsx"\|MsgSize=2116|MsgId=messageId@mssageId datetime=2017-05-26T19:36:48+0100|aCode=BY81J52RPjSmp7MrubnlZg|acc=C0A0|AttSize=0|Act=Acc|AttCnt=0|AttNames=|MsgSize=2116|MsgId=messageId@mssageId datetime=2017-05-26T19:24:18+0100|aCode=015vTYvNN-Wn30v7M5MzNw|acc=C0A0|Hld=Spm|AttSize=0|Act=Hld|IPNewDomain=false|IPReplyMismatch=false|AttCnt=0|IPInternalName=false|AttNames=|MsgSize=56442|MsgId=messageId@mssageId|IPThreadDict=false|IPSimilarDomain=false datetime=2017-05-26T19:40:33+0100|aCode=9q_HeIHHPYejZTBsnipWmQ|acc=C0A0|Delivered=true|IP=123.123.123.123|AttCnt=0|Dir=Inbound|ReceiptAck=\250 2.6.0 messageId@mssageId [InternalId=25473608] Queued mail for delivery\|MsgId=messageId@mssageId|Subject=\Auto Reply\|Latency=5618|Sender=from@domain.com|Rcpt=auser@mimecast.com|AttSize=0|Attempt=1|TlsVer=TLSv1|Cphr=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA|Snt=28237|UseTls=Yes|Route=\Mimecast Exchange Route #datetime=2017-05-26T19:40:06+0100|aCode=ClBDLlnTPH6-T-3KJayNew|acc=C0A0|Delivered=false|Err=\Connection timed out\|RejType=\Recipient server unavailable or busy\|AttCnt=0|Dir=Outbound|ReceiptAck=null|MsgId=messageId@mssageId|Subject=\message subject\|Latency=34848442|Sender=<>|Rcpt=auser@mimecast.com|AttSize=0|Attempt=14|Snt=0|UseTls=No datetime=2021-03-05T16:25:17+0000|acc=C0A0|MimecastIP=false|fileName=Invoice Attached for payment|sha256=efe51c2453821310c7a34dca3054021d0f6d453b7133c381d75e3140901efd12|Size=1648832|IP=0.0.0.0|Recipient=recipient@recipientdomain.tld|SenderDomain=senderdomain.tld|fileExt=xlsm|Subject=Invoice Attached for payment|MsgId=<85485.121030516250700527@mta.uk.somewhere.tld>|Sender=8jy0xzfjymioyjfjrajc@senderdomain.tld|Virus=Anomali:Phishing|sha1=816b013c8be6e5708690645964b5d442c085041e|SenderDomainInternal=false|fileMime=application/vnd.ms-excel.sheet.macroEnabled.12|CustomerIP=true|Route=Inbound|md5=4dbe9dbfb53438d9ce410535355cd973 datetime=2021-03-05T18:18:39+0000|aCode=azYwczFKNga_v1sYBuJOvA|acc=C0A0|Sender=sender@domain.tld|SourceIP=0.0.0.0|Recipient=recipient@adomain.tld|SenderDomain=bdomain.tld|Subject=Opportunity to become VP|MsgId=<ABCDEF@domain-GHIK>|Route=Inbound|headerFrom=sender@adomain datetime=2021-03-04T21:31:08+0000|aCode=vit87EEXMPaEyl22Lrb92A|acc=C46A75|Sender=sender@domain.tld|UrlCategory=Phishing & Fraud|ScanResultInfo=Blocked URL Category|Recipient=recipient@domain.tld|MsgId=<CWXP123MB37349110AF6F6A2BC94F702EC4979@CWXP123MB3734.GBRP123.PROD.domain.tld>|Subject=Coffee Briefing|URL=https://domain.com/login/|Route=Internal datetime=2020-07-27T00:39:59+0100|aCode=q4qBpkoTOt-iStR7G44w3g|acc=C0A0|Sender=sender@domain|Receipient=recipient@domain|IP=0.0.0.0|Subject=Opportunity to become VP|Definition=Default Impersonation Definition|Hits=1|Action=Hold|TaggedExternal=false|TaggedMalicious=true|MsgId=<ABCDEF@domain.tld>|InternalName=true|CustomName=false|NewDomain=false|SimilarInternalDomain=false|SimilarCustomExternalDomain=false|SimilarMimecastExternalDomain=false|ReplyMismatch=false|ThreatDictionary=false|CustomThreatDictionary=false|Route=Inbound datetime=2017-05-26T19:22:37+0100|acc=C0A0|reason=malicious|url=http://bgmtechnology.com.au|route=inbound|sourceIp=123.123.123.123|sender=from@domain.com|recipient=auser@mimecast.com|urlCategory=Blocked|senderDomain=domain.com datetime=2017-05-23T21:45:21+0100|acc=C1A1|fileName=1XCOLUMN.PVC|sha256=8746bb4b31ab6f03eb0a3b2c62ab7497658f0f85c8e7e82f042f9af0bb876d83|Size=378368|IP=123.123.123.123|Recipient=auser@mimecast.com|SenderDomain=domain.com|fileExt=doc|sha1=a27850da9e7adfc8e1a94dabf2509fc9d65ee7e2|Sender=from@domain.com|fileMime=application/vnd.ms-office|Route=Inbound|md5=7b52770644da336a9a59141c80807f37 ` misspelled := map[string]string{ "Receipient": "Recipient", } enc := json.NewEncoder(os.Stdout) enc.SetEscapeHTML(false) sc := bufio.NewScanner(strings.NewReader(messages)) for sc.Scan() { if len(sc.Bytes()) == 0 || bytes.HasPrefix(sc.Bytes(), []byte{'#'}) { continue } m := make(map[string]any) fields := strings.Split(sc.Text(), "|") for i, f := range fields { k, v, ok := strings.Cut(f, "=") if !ok { log.Fatalf("no kv sep: %s in %s", f, sc.Text()) } corrected, ok := misspelled[k] if ok { k = corrected } switch { case strings.HasPrefix(v, `{`): if !strings.HasSuffix(v, `}`) { log.Fatalf("invalid object: %s in %s", v, sc.Text()) } m[k] = json.RawMessage(v) case strings.HasPrefix(v, `\`): if i < len(fields)-1 && !strings.HasSuffix(v, `\`) { log.Fatalf("invalid quoted string: %q in %s", v, sc.Text()) } v = v[1 : len(v)-1] m[k] = v default: // Special cases galore. if v == "123.123.123.123" { m[k] = "81.2.69.144" // Replace with allowed geo ip. continue } if k != "Hits" { n, err := strconv.ParseInt(v, 10, 64) if err == nil { m[k] = n continue } } if k == "Delivered" { b, err := strconv.ParseBool(v) if err == nil { m[k] = b continue } } m[k] = v } } enc.Encode(m) } } Use data tables instead of handcrafted conditionals.
| I'll take a proper look at this on Thursday next week. Feel free to ignore these until I've done a proper review, but here's what I saw so far:
|
* use lower-case keys * pre-establish key set as lower-case to avoid repeated .toLoweCase calls
| Addressed all bar the naming consistency; given the information we have, I'm not sure what the approach should be for that. |
| /test |
chrisberkhout left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the way you reconciled stage and log_type.
In the spreadsheet I see that the "Information" column would be 1.0 if the type had equal numbers of present and absent fields and a lower value if it's less balanced in either direction. I'd be interested to know what exactly this measure is or what it's adapted from or inspired by.
Some changes
- The
stagefield definition can go away and the README should be regenerated. - The proposed commit message may mentioned stages as something we need to distinguish between, but it should be updated to focus on
log_type. - The proposed commit message should mention
sample_event.jsonchanges in other data streams as an additional change. - It would be good to have a test case for the
jrnltype. - In the spreadsheet I didn't see right away that there are hidden columns. I think it's better to show them and let the reader hide things if necessary.
Classification code
The classification code seems like it'll work. What follows is more about my though process and some commentary rather than problems that need to be fixed.
Reading the code for classification, it was clear immediately how the definite positives were handled. After that I could see that based on other fields there was some penalty and then some positive score, but it took a bit to see how the data and logic for these are related. I guessed that if I'd understood it correctly it could be written with more compact data and code (although a bit more computation) by using more set operations and I came up with this equivalent:
### NOTE LOG TYPE - script: lang: painless params: known_shared_keys: [acode, act, attcnt, attsize, cphr, dir, fileext, filemime, headerfrom, ip, md5, rcpt, recipient, rejcode, rejinfo, rejtype, route, senderdomain, sha1, sha256, size, sourceip, sourceip,, tlsver, url, urlcategory, virus] types: attachment-protect: unique_keys: [filename] shared_keys: [fileext, filemime, ip, md5, recipient, route, senderdomain, sha1, sha256, size] avlog: unique_keys: [customerip, mimecastip, senderdomaininternal] shared_keys: [fileext, filemime, ip, md5, recipient, route, senderdomain, sha1, sha256, size, virus] delivery: unique_keys: [attempt, delivered, err, latency, receiptack, snt, usetls] shared_keys: [acode, attcnt, attsize, cphr, dir, ip, rcpt, rejcode, rejinfo, rejtype, route, tlsver] impersonation-protect: unique_keys: [customname, customthreatdictionary, definition, hits, internalname, newdomain, replymismatch, similarcustomexternaldomain, similarinternaldomain, similarmimecastexternaldomain, taggedexternal, taggedmalicious, threatdictionary] shared_keys: [acode, ip, recipient, route] internal-email-protect: unique_keys: [scanresultinfo] shared_keys: [acode, recipient, route, url, urlcategory] jrnl: unique_keys: [rcptacttype] shared_keys: [acode, dir, rcpt] process: unique_keys: [attnames, hld, ipinternalname, ipnewdomain, ipreplymismatch, ipsimilardomain, ipthreaddict, msgsize] shared_keys: [acode, act, attcnt, attsize] receipt: unique_keys: [action, error, spaminfo, spamlimit, spamprocessingdetail, spamscore] shared_keys: [acode, act, cphr, dir, headerfrom, ip, rcpt, rejcode, rejinfo, rejtype, tlsver, virus] url-protect: unique_keys: [reason] shared_keys: [recipient, route, senderdomain, sourceip, url, urlcategory] spam: unique_keys: [] shared_keys: [acode, headerfrom, recipient, route, senderdomain, sourceip] if: ctx.mimecast instanceof Map source: | // Canonicalise keys to lowercase. If this causes issues in future // because case becomes significant, this table space optimisation // will need to be reverted. def keys = new HashSet(); for (def k: ctx.mimecast.keySet()) { keys.add(k.toLowerCase()); } for (typeEntry in params.types.entrySet()) { def uniqueKeysPresent = typeEntry.getValue().unique_keys.clone(); uniqueKeysPresent.retainAll(keys); if (uniqueKeysPresent.size() > 0) { ctx.mimecast.log_type = typeEntry.getKey(); return; } } def maxNumSharedKeysPresent = -1; def bestTypes = []; for (typeEntry in params.types.entrySet()) { def excessKnownSharedKeys = keys.clone(); excessKnownSharedKeys.retainAll(params.known_shared_keys); excessKnownSharedKeys.removeAll(typeEntry.getValue().shared_keys); if (excessKnownSharedKeys.size() > 0) { continue; } def sharedKeysPresent = typeEntry.getValue().shared_keys.clone(); sharedKeysPresent.retainAll(keys); if (sharedKeysPresent.size() == maxNumSharedKeysPresent) { bestTypes.add(typeEntry.getKey()); } else if (sharedKeysPresent.size() > maxNumSharedKeysPresent) { maxNumSharedKeysPresent = sharedKeysPresent.size(); bestTypes = [typeEntry.getKey()]; } } ctx.mimecast.log_type = bestTypes; return; Thinking about robustness, if types are changed, there would be problems with:
- adding a previously unique key to another type (both types would be identified as the first one in the types list)
- adding a shared key to another type (the altered type would be rejected because the document has an excess known shared key)
- adding a new type with new fields (I think it would match all known types)
Maybe this is an acceptable trade-off. Adding unknown keys would probably be the more common case and should not cause problems.
Alternative approaches would be:
- Matching lists of fields exactly: assuming all fields are sent for a given type, this would match perfectly. New types or modified types would be not matched rather than mismatched.
- Calculating similarity scores once rather than having a 3-step selection process: perhaps by counting excess and missing fields compared to each type, and weighting those counts heavier for fields expected in fewer types. Below a certain minimal similarity threshold it would be considered a new type. I think it's more elegant but also more complicated and could miss some easy cases (like the definite positive cases) unless parameters are tuned correctly.
This is purely a heuristic that I was using to help me understand/direct where I should look first. It's not directly used in the rules here. I don't completely remember what I was thinking, but from the name and the shape of the formula, I imagine that I wrote something that's close to S for the field set for each type (this is a pretty common thing for me to use when making cut decisions).
Yep, missed this. Removed.
New proposed commit message:
I believe this is already tested, though this is done via the file path.
Yep, this was just the working state. Unhidden.
Yes, all these are intentional. The rationale is that the vendor data is intrinsically brittle, so we make the choice to maintain the maximum data and be obviously misformed (scalar v array — not actually malformed which would lose the data) while still being queryable. WRT the alternative, the approach taken is intended to balance being reasonably clear (explicit) against being onerously long. The successive approach is taken to avoid the more expensive work that's required for score calculations unless it's demonstrated to be necessary. Ideally, the vendor would just provide the log type explicitly; a customer has filed a request with them for this, so if that is accepted and implemented (frankly, it should be; requiring the user to jump though hoops to effectively use the data is unreasonably brittle), then this all becomes moot. |
|
💚 Build Succeeded
History
cc @efd6 |
chrisberkhout left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, all sounds reasonable.
| Package mimecast - 1.24.0 containing this change is available at https://epr.elastic.co/search?package=mimecast |

100.0% Coverage on New Code
0.0% Duplication on New Code
Proposed commit message
The current ingest pipeline does not effectively distinguish documents that have come from the stages/log types of Mimecast email processing[1]: receipt, process and delivery. This can result in documents from different types being given the same document fingerprint and a subsequent ingest version error. So detect the log type of the event and use it and a small set of distinguishing fields to ensure we don't collide documents. The heuristics for log type detection and the set of fields chosen for fingerprinting are based on the documentation at [1] and the summary at [2].
[1]https://integrations.mimecast.com/documentation/tutorials/understanding-siem-logs/
[2]https://docs.google.com/spreadsheets/d/1zspKE-LjrlFztsguB3z5wCIFrN6X2yx5ZgC01mnLzuY/
Checklist
changelog.ymlfile.Author's Checklist
How to test this PR locally
Related issues
Message-IDheader value cause_idfingerprint collisions #9048Screenshots