-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Expand file tree
/
Copy pathvulnerability_handler.py
More file actions
100 lines (89 loc) · 3.92 KB
/
vulnerability_handler.py
File metadata and controls
100 lines (89 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import contextlib
from datetime import datetime
from dojo.models import Finding
from dojo.tools.trivy_operator.uniform_vulnid import UniformTrivyVulnID
DESCRIPTION_TEMPLATE = """{title}
**Fixed version:** {fixed_version}
"""
TRIVY_SEVERITIES = {
"CRITICAL": "Critical",
"HIGH": "High",
"MEDIUM": "Medium",
"LOW": "Low",
"UNKNOWN": "Info",
}
class TrivyVulnerabilityHandler:
def handle_vulns(self, labels, vulnerabilities, test):
findings = []
resource_namespace = labels.get("trivy-operator.resource.namespace", "")
resource_kind = labels.get("trivy-operator.resource.kind", "")
resource_name = labels.get("trivy-operator.resource.name", "")
container_name = labels.get("trivy-operator.container.name", "")
for vulnerability in vulnerabilities:
vuln_id = vulnerability.get("vulnerabilityID", "0")
severity = TRIVY_SEVERITIES[vulnerability.get("severity")]
references = vulnerability.get("primaryLink")
mitigation = vulnerability.get("fixedVersion")
fix_available = True
if not vulnerability.get("fixedVersion"):
fix_available = False
package_name = vulnerability.get("resource")
package_version = vulnerability.get("installedVersion")
cvssv3_score = vulnerability.get("score")
published_date = vulnerability.get("publishedDate")
publish_date = None
if published_date:
with contextlib.suppress(ValueError):
publish_date = datetime.strptime(published_date, "%Y-%m-%dT%H:%M:%SZ").date()
finding_tags = [resource_namespace]
target_target = None
target_class = None
package_path = None
if vulnerability.get("packageType"):
package_type = vulnerability.get("packageType")
finding_tags.append(package_type)
if vulnerability.get("class"):
target_class = vulnerability.get("class")
finding_tags.append(target_class)
if vulnerability.get("packagePath"):
package_path = vulnerability.get("packagePath")
if vulnerability.get("target"):
target_target = vulnerability.get("target")
if target_class in {"os-pkgs", "lang-pkgs"}:
if package_path:
file_path = package_path
elif target_target:
file_path = target_target
else:
file_path = None
else:
file_path = None
description = DESCRIPTION_TEMPLATE.format(
title=vulnerability.get("title"), fixed_version=mitigation,
)
description += "\n**container.name:** " + container_name
description += "\n**resource.kind:** " + resource_kind
description += "\n**resource.name:** " + resource_name
description += "\n**resource.namespace:** " + resource_namespace
title = f"{vuln_id} {package_name} {package_version}"
finding = Finding(
test=test,
title=title,
severity=severity,
references=references,
mitigation=mitigation,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
description=description,
static_finding=True,
dynamic_finding=False,
file_path=file_path,
publish_date=publish_date,
fix_available=fix_available,
)
finding.unsaved_tags = [tag for tag in finding_tags if tag]
if vuln_id:
finding.unsaved_vulnerability_ids = [UniformTrivyVulnID().return_uniformed_vulnid(vuln_id)]
findings.append(finding)
return findings