Skip to content

Commit 333b4ae

Browse files
reimport: add management command to reimport sample scans (#13893)
1 parent 7c32536 commit 333b4ae

1 file changed

Lines changed: 228 additions & 0 deletions

File tree

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import json
2+
import logging
3+
import time
4+
from importlib import import_module
5+
from importlib.util import find_spec
6+
from inspect import isclass
7+
from pathlib import Path
8+
9+
from django.core.management.base import BaseCommand, CommandError
10+
from django.urls import reverse
11+
from rest_framework.authtoken.models import Token
12+
from rest_framework.test import APIClient
13+
14+
from unittests.test_dashboard import User
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class Command(BaseCommand):
20+
21+
help = (
22+
"Reimport a specific unittest scan by filename into an existing test. "
23+
"Automatically deduces scan type from path."
24+
)
25+
26+
def add_arguments(self, parser):
27+
parser.add_argument(
28+
"test_id",
29+
type=int,
30+
help="ID of the test to reimport into",
31+
)
32+
parser.add_argument(
33+
"scan_file",
34+
type=str,
35+
help="Path to scan file relative to unittests/scans/ (e.g., 'jfrog_xray_unified/very_many_vulns.json')",
36+
)
37+
parser.add_argument(
38+
"--minimum-severity",
39+
type=str,
40+
default="Low",
41+
choices=["Critical", "High", "Medium", "Low", "Info"],
42+
help="Minimum severity to import (default: Low)",
43+
)
44+
parser.add_argument(
45+
"--active",
46+
action="store_true",
47+
default=True,
48+
help="Mark findings as active (default: True)",
49+
)
50+
parser.add_argument(
51+
"--verified",
52+
action="store_true",
53+
default=False,
54+
help="Mark findings as verified (default: False)",
55+
)
56+
parser.add_argument(
57+
"--close-old-findings",
58+
action="store_true",
59+
default=False,
60+
help="Close findings not present in the new scan (default: False)",
61+
)
62+
parser.add_argument(
63+
"--tags",
64+
action="append",
65+
default=[],
66+
help=(
67+
"Tag(s) to apply to the test (repeat --tags to add multiple). "
68+
"Example: --tags perf --tags jfrog"
69+
),
70+
)
71+
72+
def get_test_admin(self):
73+
return User.objects.get(username="admin")
74+
75+
def reimport_scan(self, payload, expected_http_status_code=201):
76+
testuser = self.get_test_admin()
77+
token = Token.objects.get(user=testuser)
78+
client = APIClient()
79+
client.credentials(HTTP_AUTHORIZATION="Token " + token.key)
80+
81+
response = client.post(reverse("reimportscan-list"), payload)
82+
if expected_http_status_code != response.status_code:
83+
msg = f"Expected HTTP status code {expected_http_status_code}, got {response.status_code}: {response.content[:1000]}"
84+
raise CommandError(msg)
85+
return json.loads(response.content)
86+
87+
def deduce_scan_type_from_path(self, scan_file_path):
88+
"""
89+
Deduce the scan type from the file path by finding the corresponding parser.
90+
91+
Args:
92+
scan_file_path: Path like 'zap/zap_sample.json' or 'jfrog_xray_unified/very_many_vulns.json'
93+
94+
Returns:
95+
tuple: (scan_type, parser_class) or raises CommandError if not found
96+
97+
"""
98+
# Extract the directory name (parser module name)
99+
path_parts = Path(scan_file_path).parts
100+
if len(path_parts) < 2:
101+
msg = f"Scan file path must include directory: {scan_file_path}"
102+
raise CommandError(msg)
103+
104+
module_name = path_parts[0]
105+
106+
# Try to find and load the parser module
107+
try:
108+
if not find_spec(f"dojo.tools.{module_name}.parser"):
109+
msg = f"No parser module found for '{module_name}'"
110+
raise CommandError(msg)
111+
112+
module = import_module(f"dojo.tools.{module_name}.parser")
113+
114+
# Find the parser class
115+
parser_class = None
116+
expected_class_name = module_name.replace("_", "") + "parser"
117+
118+
for attribute_name in dir(module):
119+
attribute = getattr(module, attribute_name)
120+
if isclass(attribute) and attribute_name.lower() == expected_class_name:
121+
parser_class = attribute
122+
break
123+
124+
if not parser_class:
125+
msg = f"No parser class found in module '{module_name}'"
126+
raise CommandError(msg)
127+
128+
# Get the scan type from the parser
129+
parser_instance = parser_class()
130+
scan_types = parser_instance.get_scan_types()
131+
132+
if not scan_types:
133+
msg = f"Parser '{module_name}' has no scan types"
134+
raise CommandError(msg)
135+
136+
return scan_types[0], parser_class
137+
138+
except ImportError as e:
139+
msg = f"Failed to import parser module '{module_name}': {e}"
140+
raise CommandError(msg)
141+
142+
def reimport_unittest_scan(self, test_id, scan_file, minimum_severity, active, verified, close_old_findings, tags):
143+
"""
144+
Reimport a specific unittest scan file into an existing test.
145+
146+
Args:
147+
test_id: ID of the test to reimport into
148+
scan_file: Path to scan file relative to unittests/scans/
149+
minimum_severity: Minimum severity level
150+
active: Whether findings should be active
151+
verified: Whether findings should be verified
152+
close_old_findings: Whether to close findings not in the new scan
153+
tags: List of tags to apply
154+
155+
"""
156+
# Validate scan file exists
157+
scan_path = Path("unittests/scans") / scan_file
158+
if not scan_path.exists():
159+
msg = f"Scan file not found: {scan_path}"
160+
raise CommandError(msg)
161+
162+
# Deduce scan type from path
163+
scan_type, _parser_class = self.deduce_scan_type_from_path(scan_file)
164+
165+
logger.info(f"Reimporting scan '{scan_file}' using scan type '{scan_type}'")
166+
logger.info(f"Target: Test ID {test_id}")
167+
168+
# Reimport the scan
169+
with scan_path.open(encoding="utf-8") as testfile:
170+
payload = {
171+
"test": test_id,
172+
"minimum_severity": minimum_severity,
173+
"scan_type": scan_type,
174+
"file": testfile,
175+
"version": "1.0.1",
176+
"active": active,
177+
"verified": verified,
178+
"close_old_findings": close_old_findings,
179+
}
180+
181+
if tags:
182+
payload["tags"] = tags
183+
184+
result = self.reimport_scan(payload)
185+
186+
logger.info(f"Successfully reimported scan. Test ID: {result.get('test')}")
187+
logger.info(f"Reimport summary: {result.get('scan_save_message', 'No summary available')}")
188+
189+
return result
190+
191+
def handle(self, *args, **options):
192+
test_id = options["test_id"]
193+
scan_file = options["scan_file"]
194+
minimum_severity = options["minimum_severity"]
195+
active = options["active"]
196+
verified = options["verified"]
197+
close_old_findings = options["close_old_findings"]
198+
tags = options["tags"]
199+
200+
start_time = time.time()
201+
202+
try:
203+
self.reimport_unittest_scan(
204+
test_id=test_id,
205+
scan_file=scan_file,
206+
minimum_severity=minimum_severity,
207+
active=active,
208+
verified=verified,
209+
close_old_findings=close_old_findings,
210+
tags=tags,
211+
)
212+
213+
end_time = time.time()
214+
duration = end_time - start_time
215+
216+
self.stdout.write(
217+
self.style.SUCCESS(
218+
f"Successfully reimported '{scan_file}' into test ID {test_id} "
219+
f"(took {duration:.2f} seconds)",
220+
),
221+
)
222+
223+
except Exception as e:
224+
end_time = time.time()
225+
duration = end_time - start_time
226+
logger.exception(f"Failed to reimport scan '{scan_file}' after {duration:.2f} seconds")
227+
msg = f"Reimport failed after {duration:.2f} seconds: {e}"
228+
raise CommandError(msg)

0 commit comments

Comments
 (0)