Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Update linux kernel to be a importer
Add a test for linux kernel Signed-off-by: ziad hany <ziadhany2016@gmail.com>
  • Loading branch information
ziadhany committed Oct 26, 2025
commit 313cb15e8b6ee2d5872efd6f874a739a810af9d0
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
from vulnerabilities.pipelines.v2_importers import linux_kernel_importer as linux_kernel_importer_v2
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
Expand Down Expand Up @@ -81,6 +82,7 @@
mozilla_importer_v2.MozillaImporterPipeline,
github_osv_importer_v2.GithubOSVImporterPipeline,
redhat_importer_v2.RedHatImporterPipeline,
linux_kernel_importer_v2.LinuxKernelPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
4 changes: 0 additions & 4 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
from vulnerabilities.pipelines import flag_ghost_packages
from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline
from vulnerabilities.pipelines import remove_duplicate_advisories
from vulnerabilities.pipelines.v2_improvers import (
collect_linux_kernel_cves_commits as collect_linux_kernel_cves_commits_v2,
)
from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2
from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2
from vulnerabilities.pipelines.v2_improvers import (
Expand Down Expand Up @@ -71,6 +68,5 @@
compute_version_rank_v2.ComputeVersionRankPipeline,
compute_advisory_todo_v2.ComputeToDo,
compute_advisory_todo.ComputeToDo,
collect_linux_kernel_cves_commits_v2.CollectFixCommitLinuxKernelPipeline,
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,48 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#
import re
import shutil
from pathlib import Path

from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.models import AdvisoryV2
from vulnerabilities.models import CodeFixV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import cve_regex
from vulnerabilities.utils import get_advisory_url


class CollectFixCommitLinuxKernelPipeline(VulnerableCodePipeline):
class LinuxKernelPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
Pipeline to collect fix commits from Linux Kernel:
Pipeline to collect Linux Kernel Pipeline:
"""

pipeline_id = "linux_kernel_cves_fix_commits"
spdx_license_expression = "Apache-2.0"
license_url = "https://github.com/nluedtke/linux_kernel_cves/blob/master/LICENSE"
importer_name = "linux_kernel_cves_fix_commits"
qualified_name = "linux_kernel_cves_fix_commits"
repo_url = "git+https://github.com/nluedtke/linux_kernel_cves"

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_fix_commits,
cls.collect_and_store_advisories,
cls.clean_downloads,
)

def advisories_count(self):
root = Path(self.vcs_response.dest_dir)
return sum(1 for _ in root.rglob("data/*.txt"))

def clone(self):
self.repo_url = "git+https://github.com/nluedtke/linux_kernel_cves"
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def collect_fix_commits(self):
self.log(f"Processing aosp_dataset fix commits.")
def collect_advisories(self):
self.log(f"Processing linux kernel fix commits.")
base_path = Path(self.vcs_response.dest_dir) / "data"
for file_path in base_path.rglob("*.txt"):
if "_CVEs.txt" in file_path.name:
Expand All @@ -58,27 +65,25 @@ def collect_fix_commits(self):
if not (vulnerability_id and commit_hash):
continue

try:
advisories = AdvisoryV2.objects.filter(
advisory_id__iendswith=vulnerability_id
references = []
for kernel_url in kernel_urls:
ref = ReferenceV2(
reference_type="commit",
url=kernel_url,
)
except AdvisoryV2.DoesNotExist:
self.log(f"Can't find vulnerability_id: {vulnerability_id}")
continue
references.append(ref)

for advisory in advisories:
for impact in advisory.impacted_packages.all():
for package in impact.affecting_packages.all():
code_fix, created = CodeFixV2.objects.get_or_create(
commits=[kernel_urls],
advisory=advisory,
affected_package=package,
)
advisory_url = get_advisory_url(
file=file_path,
base_path=self.vcs_response.dest_dir,
url="https://github.com/nluedtke/linux_kernel_cves/blob/master/",
)

if created:
self.log(
f"Created CodeFix entry for vulnerability_id: {vulnerability_id} with VCS URL {kernel_urls}"
)
yield AdvisoryData(
advisory_id=vulnerability_id,
references_v2=references,
url=advisory_url,
)

def parse_commits_file(self, file_path):
sha1_pattern = re.compile(r"\b[a-f0-9]{40}\b")
Expand All @@ -90,16 +95,21 @@ def parse_commits_file(self, file_path):
continue

cve_match = cve_regex.search(line)
cve = cve_match.group(1) if cve_match else None
if not cve_match:
continue

cve = cve_match.group(0)

sha1_match = sha1_pattern.search(line)
commit_hash = sha1_match.group(0) if sha1_match else None
yield cve, commit_hash

def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()
"""Cleanup any temporary repository data."""
self.log("Cleaning up local repository resources.")
if hasattr(self, "repo") and self.repo.working_dir:
shutil.rmtree(path=self.repo.working_dir)

def on_failure(self):
"""Ensure cleanup is always performed on failure."""
self.clean_downloads()
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from pathlib import Path
from unittest.mock import Mock

import pytest

from vulnerabilities.pipelines.v2_importers.linux_kernel_importer import LinuxKernelPipeline
from vulnerabilities.tests import util_tests

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "linux_kernel"


@pytest.mark.django_db
def test_linux_kernel_advisories():
expected_file = os.path.join(TEST_DATA, "expected-linux-kernel-advisory.json")
pipeline = LinuxKernelPipeline()
pipeline.vcs_response = Mock(dest_dir=TEST_DATA)
result = [adv.to_dict() for adv in pipeline.collect_advisories()]
util_tests.check_results_against_json(result, expected_file)
Loading