|
| 1 | +import io |
1 | 2 | import json |
2 | 3 | import logging |
| 4 | +import zipfile |
3 | 5 |
|
4 | 6 | logger = logging.getLogger(__name__) |
5 | 7 |
|
| 8 | +# Zip bomb protection limits |
| 9 | +MAX_ZIP_MEMBERS = 1000 |
| 10 | +MAX_ZIP_MEMBER_SIZE = 512 * 1024 * 1024 # 512 MB per member (uncompressed) |
| 11 | +MAX_ZIP_TOTAL_SIZE = 1 * 1024 * 1024 * 1024 # 1 GB total (uncompressed) |
| 12 | +MAX_ZIP_RATIO = 100 # max compression ratio (uncompressed / compressed) |
| 13 | + |
| 14 | + |
| 15 | +def safe_open_zip(file): |
| 16 | + """ |
| 17 | + Open a zip file with protection against zip bomb attacks. |
| 18 | +
|
| 19 | + Validates member count, per-member uncompressed size, total uncompressed |
| 20 | + size, and compression ratios using the central-directory metadata before |
| 21 | + any data is extracted. |
| 22 | +
|
| 23 | + Accepts a file-like object or an io.TextIOWrapper (in which case |
| 24 | + file.name is used as the path). |
| 25 | +
|
| 26 | + Returns an open ZipFile. Use as a context manager or call .close() |
| 27 | + explicitly when done. |
| 28 | +
|
| 29 | + Raises ValueError if any limit is exceeded. |
| 30 | + """ |
| 31 | + zf = zipfile.ZipFile(file.name, "r") if isinstance(file, io.TextIOWrapper) else zipfile.ZipFile(file, "r") |
| 32 | + |
| 33 | + infos = zf.infolist() |
| 34 | + |
| 35 | + if len(infos) > MAX_ZIP_MEMBERS: |
| 36 | + zf.close() |
| 37 | + msg = f"Zip file contains {len(infos)} members, exceeding the limit of {MAX_ZIP_MEMBERS}." |
| 38 | + raise ValueError(msg) |
| 39 | + |
| 40 | + total_size = 0 |
| 41 | + for info in infos: |
| 42 | + if info.file_size > MAX_ZIP_MEMBER_SIZE: |
| 43 | + zf.close() |
| 44 | + msg = ( |
| 45 | + f"Zip member '{info.filename}' has uncompressed size {info.file_size} bytes, " |
| 46 | + f"exceeding the per-member limit of {MAX_ZIP_MEMBER_SIZE} bytes." |
| 47 | + ) |
| 48 | + raise ValueError(msg) |
| 49 | + if info.compress_size > 0 and (info.file_size / info.compress_size) > MAX_ZIP_RATIO: |
| 50 | + zf.close() |
| 51 | + ratio = info.file_size / info.compress_size |
| 52 | + msg = ( |
| 53 | + f"Zip member '{info.filename}' has a compression ratio of " |
| 54 | + f"{ratio:.1f}:1, exceeding the limit of {MAX_ZIP_RATIO}:1." |
| 55 | + ) |
| 56 | + raise ValueError(msg) |
| 57 | + total_size += info.file_size |
| 58 | + if total_size > MAX_ZIP_TOTAL_SIZE: |
| 59 | + zf.close() |
| 60 | + msg = f"Zip file total uncompressed size exceeds the limit of {MAX_ZIP_TOTAL_SIZE} bytes." |
| 61 | + raise ValueError(msg) |
| 62 | + |
| 63 | + return zf |
| 64 | + |
| 65 | + |
| 66 | +def safe_read_all_zip(file): |
| 67 | + """ |
| 68 | + Open a zip file safely and read all members into a dict {name: bytes}. |
| 69 | +
|
| 70 | + Applies the same zip bomb protections as safe_open_zip before reading |
| 71 | + any data. |
| 72 | +
|
| 73 | + Raises ValueError if any limit is exceeded. |
| 74 | + """ |
| 75 | + zf = safe_open_zip(file) |
| 76 | + try: |
| 77 | + return {name: zf.read(name) for name in zf.namelist()} |
| 78 | + finally: |
| 79 | + zf.close() |
| 80 | + |
6 | 81 |
|
7 | 82 | def get_npm_cwe(item_node): |
8 | 83 | """ |
|
0 commit comments