|
45 | 45 | serializers, |
46 | 46 | ) |
47 | 47 | from dojo.api_v2.prefetch.prefetcher import _Prefetcher |
| 48 | +from dojo.authorization.authorization import user_has_permission_or_403 |
48 | 49 | from dojo.authorization.roles_permissions import Permissions |
49 | 50 | from dojo.celery_dispatch import dojo_dispatch_task |
50 | 51 | from dojo.cred.queries import get_authorized_cred_mappings |
@@ -351,7 +352,11 @@ def get_queryset(self): |
351 | 352 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
352 | 353 | ) |
353 | 354 | @action( |
354 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 355 | + detail=True, methods=["post"], |
| 356 | + # IsAuthenticated only: report generation requires View permission, |
| 357 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 358 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 359 | + permission_classes=[IsAuthenticated], |
355 | 360 | ) |
356 | 361 | def generate_report(self, request, pk=None): |
357 | 362 | endpoint = self.get_object() |
@@ -475,7 +480,11 @@ def reopen(self, request, pk=None): |
475 | 480 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
476 | 481 | ) |
477 | 482 | @action( |
478 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 483 | + detail=True, methods=["post"], |
| 484 | + # IsAuthenticated only: report generation requires View permission, |
| 485 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 486 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 487 | + permission_classes=[IsAuthenticated], |
479 | 488 | ) |
480 | 489 | def generate_report(self, request, pk=None): |
481 | 490 | engagement = self.get_object() |
@@ -691,7 +700,8 @@ def download_file(self, request, file_id, pk=None): |
691 | 700 | responses={status.HTTP_200_OK: serializers.EngagementUpdateJiraEpicSerializer}, |
692 | 701 | ) |
693 | 702 | @action( |
694 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 703 | + detail=True, methods=["post"], |
| 704 | + permission_classes=(IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission), |
695 | 705 | ) |
696 | 706 | def update_jira_epic(self, request, pk=None): |
697 | 707 | engagement = self.get_object() |
@@ -1383,7 +1393,11 @@ def set_finding_as_original(self, request, pk, new_fid): |
1383 | 1393 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
1384 | 1394 | ) |
1385 | 1395 | @action( |
1386 | | - detail=False, methods=["post"], permission_classes=[IsAuthenticated], |
| 1396 | + detail=False, methods=["post"], |
| 1397 | + # IsAuthenticated only: report generation requires View permission, |
| 1398 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 1399 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 1400 | + permission_classes=[IsAuthenticated], |
1387 | 1401 | ) |
1388 | 1402 | def generate_report(self, request): |
1389 | 1403 | findings = self.get_queryset() |
@@ -1728,38 +1742,55 @@ def batch(self, request, pk=None): |
1728 | 1742 | serialized_data = serializers.MetaMainSerializer(data=request.data) |
1729 | 1743 | if serialized_data.is_valid(raise_exception=True): |
1730 | 1744 | if request.method == "POST": |
1731 | | - self.process_post(request.data) |
| 1745 | + self.process_post(request) |
1732 | 1746 | status_code = status.HTTP_201_CREATED |
1733 | 1747 | if request.method == "PATCH": |
1734 | | - self.process_patch(request.data) |
| 1748 | + self.process_patch(request) |
1735 | 1749 | status_code = status.HTTP_200_OK |
1736 | 1750 |
|
1737 | 1751 | return Response(status=status_code, data=serialized_data.data) |
1738 | 1752 |
|
1739 | | - def process_post(self: object, data: dict): |
1740 | | - product = Product.objects.filter(id=data.get("product")).first() |
1741 | | - finding = Finding.objects.filter(id=data.get("finding")).first() |
1742 | | - endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() |
| 1753 | + def _fetch_and_authorize_parents(self, request, permission_map): |
| 1754 | + """Fetch parent objects and verify the user has the required permissions.""" |
| 1755 | + data = request.data |
| 1756 | + parents = {} |
| 1757 | + for field, (model, permission) in permission_map.items(): |
| 1758 | + obj = model.objects.filter(id=data.get(field)).first() |
| 1759 | + if obj: |
| 1760 | + user_has_permission_or_403(request.user, obj, permission) |
| 1761 | + parents[field] = obj |
| 1762 | + return parents |
| 1763 | + |
| 1764 | + def process_post(self, request): |
| 1765 | + data = request.data |
| 1766 | + parents = self._fetch_and_authorize_parents(request, { |
| 1767 | + "product": (Product, Permissions.Product_Edit), |
| 1768 | + "finding": (Finding, Permissions.Finding_Edit), |
| 1769 | + "endpoint": (Endpoint, Permissions.Location_Edit), |
| 1770 | + }) |
1743 | 1771 | metalist = data.get("metadata") |
1744 | 1772 | for metadata in metalist: |
1745 | 1773 | try: |
1746 | 1774 | DojoMeta.objects.create( |
1747 | | - product=product, |
1748 | | - finding=finding, |
1749 | | - endpoint=endpoint, |
| 1775 | + product=parents["product"], |
| 1776 | + finding=parents["finding"], |
| 1777 | + endpoint=parents["endpoint"], |
1750 | 1778 | name=metadata.get("name"), |
1751 | 1779 | value=metadata.get("value"), |
1752 | 1780 | ) |
1753 | 1781 | except (IntegrityError) as ex: # this should not happen as the data was validated in the batch call |
1754 | 1782 | raise ValidationError(str(ex)) |
1755 | 1783 |
|
1756 | | - def process_patch(self: object, data: dict): |
1757 | | - product = Product.objects.filter(id=data.get("product")).first() |
1758 | | - finding = Finding.objects.filter(id=data.get("finding")).first() |
1759 | | - endpoint = Endpoint.objects.filter(id=data.get("endpoint")).first() |
| 1784 | + def process_patch(self, request): |
| 1785 | + data = request.data |
| 1786 | + parents = self._fetch_and_authorize_parents(request, { |
| 1787 | + "product": (Product, Permissions.Product_Edit), |
| 1788 | + "finding": (Finding, Permissions.Finding_Edit), |
| 1789 | + "endpoint": (Endpoint, Permissions.Location_Edit), |
| 1790 | + }) |
1760 | 1791 | metalist = data.get("metadata") |
1761 | 1792 | for metadata in metalist: |
1762 | | - dojometa = DojoMeta.objects.filter(product=product, finding=finding, endpoint=endpoint, name=metadata.get("name")) |
| 1793 | + dojometa = DojoMeta.objects.filter(product=parents["product"], finding=parents["finding"], endpoint=parents["endpoint"], name=metadata.get("name")) |
1763 | 1794 | if dojometa: |
1764 | 1795 | try: |
1765 | 1796 | dojometa.update( |
@@ -1815,7 +1846,11 @@ def destroy(self, request, *args, **kwargs): |
1815 | 1846 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
1816 | 1847 | ) |
1817 | 1848 | @action( |
1818 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 1849 | + detail=True, methods=["post"], |
| 1850 | + # IsAuthenticated only: report generation requires View permission, |
| 1851 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 1852 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 1853 | + permission_classes=[IsAuthenticated], |
1819 | 1854 | ) |
1820 | 1855 | def generate_report(self, request, pk=None): |
1821 | 1856 | product = self.get_object() |
@@ -1956,7 +1991,11 @@ def destroy(self, request, *args, **kwargs): |
1956 | 1991 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
1957 | 1992 | ) |
1958 | 1993 | @action( |
1959 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 1994 | + detail=True, methods=["post"], |
| 1995 | + # IsAuthenticated only: report generation requires View permission, |
| 1996 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 1997 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 1998 | + permission_classes=[IsAuthenticated], |
1960 | 1999 | ) |
1961 | 2000 | def generate_report(self, request, pk=None): |
1962 | 2001 | product_type = self.get_object() |
@@ -2143,7 +2182,11 @@ def get_serializer_class(self): |
2143 | 2182 | responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, |
2144 | 2183 | ) |
2145 | 2184 | @action( |
2146 | | - detail=True, methods=["post"], permission_classes=[IsAuthenticated], |
| 2185 | + detail=True, methods=["post"], |
| 2186 | + # IsAuthenticated only: report generation requires View permission, |
| 2187 | + # enforced by the permission-filtered get_queryset(). The viewset's |
| 2188 | + # permission_classes would check Edit (POST), which is too restrictive. |
| 2189 | + permission_classes=[IsAuthenticated], |
2147 | 2190 | ) |
2148 | 2191 | def generate_report(self, request, pk=None): |
2149 | 2192 | test = self.get_object() |
|
0 commit comments