Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions examples/get_bom_component_vulnerability_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
parser = argparse.ArgumentParser("Retreive BOM component vulnerability information for the given project and version")
parser.add_argument("project_name")
parser.add_argument("version")
parser.add_argument("-u", "--include_updated_vulns",
action='store_true',
help="If set, will also retrieve vulnerabilities whose update date/time is later than the newer_than date/time")
parser.add_argument("-n", "--newer_than",
default=None,
type=str,
Expand Down Expand Up @@ -57,17 +60,24 @@

version = hub.get_version_by_name(project, args.version)

import pdb; pdb.set_trace()

vulnerable_bom_components_info = hub.get_vulnerable_bom_components(version)

vulnerable_bom_components = vulnerable_bom_components_info.get('items', [])

if vulnerable_bom_components:
vulnerable_bom_components = sorted(
vulnerable_bom_components,
key = lambda k: k['vulnerabilityWithRemediation']['vulnerabilityPublishedDate'])
if newer_than:
vulnerable_bom_components = [v for v in vulnerable_bom_components
if timestring.Date(v['vulnerabilityWithRemediation']['vulnerabilityPublishedDate']) > newer_than ]
vulnerable_bom_components = sorted(
vulnerable_bom_components,
key = lambda k: k['vulnerabilityWithRemediation']['vulnerabilityPublishedDate'])
if newer_than:
if args.include_updated_vulns:
vulnerable_bom_components = [v for v in vulnerable_bom_components
if timestring.Date(v['vulnerabilityWithRemediation']['vulnerabilityPublishedDate']) > newer_than
or timestring.Date(v['vulnerabilityWithRemediation']['vulnerabilityUpdatedDate']) > newer_than ]
else:
vulnerable_bom_components = [v for v in vulnerable_bom_components
if timestring.Date(v['vulnerabilityWithRemediation']['vulnerabilityPublishedDate']) > newer_than ]
else:
logging.debug("Did not find any vulnerable BOM components in project {}, version {}".format(args.project_name, args.version))

Expand Down
109 changes: 109 additions & 0 deletions examples/get_vulnerability_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python

'''
Created on Mar 29, 2019

@author: gsnyder

Retrieve vulnerability notifications

Note: The user account you run this under will determine the scope (i.e. projects, versions) of
the notifications that can be received.

'''

import argparse
from datetime import datetime
import json
import logging
import pytz
import sys
import timestring

from blackduck.HubRestApi import HubInstance, object_id

#
# Example usage:
#
# To get all the vulnerability notices,
# python examples/get_vulnerability_notifications.py > all_vuln_notifications.json
#
# To get all the vulnerability notices and save the date/time of the last run,
# python examples/get_vulnerability_notifications.py -s > all_vuln_notifications.json
#
# To get all the vulnerability notices since the last run,
# python examples/get_vulnerability_notifications.py -n `cat .last_run` > all_vuln_notifications.json
#
# To get all the vulnerability notices since a date/time,
# python examples/get_vulnerability_notifications.py -n "March 29, 2019 12:00" > since_mar_29_at_noon_vuln_notifications.json
#
# To get all the vulnerability notices for a given project,
# python examples/get_vulnerability_notifications.py -p my-project > all_vuln_notifications_for_my_project.json
#
# To get all the vulnerability notices for a given project and version,
# python examples/get_vulnerability_notifications.py -p my-project -v 1.0 > all_vuln_notifications_for_my_project_v1.0.json
#
#


parser = argparse.ArgumentParser("Retreive vulnerability notifications")
parser.add_argument("-p", "--project", help="If supplied, filter the notifications to this project")
parser.add_argument("-v", "--version", help="If supplied, filter the notifications to this version (requires a project)")
parser.add_argument("-n", "--newer_than",
default=None,
type=str,
help="Set this option to see all vulnerability notifications published since the given date/time.")
parser.add_argument("-s", "--save_dt",
action='store_true',
help="If set, the date/time will be saved to a file named '.last_run' in the current directory which can be used later with the -n option to see vulnerabilities published since the last run.")
parser.add_argument("-l", "--limit", default=100000, help="To change the limit on the number of notifications to retrieve")

args = parser.parse_args()

if args.newer_than:
newer_than = timestring.Date(args.newer_than).date
# adjust to UTC so the comparison is normalized
newer_than = newer_than.astimezone(pytz.utc)
else:
newer_than = None

if args.save_dt:
with open(".last_run", "w") as f:
f.write(datetime.now().isoformat())

logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', stream=sys.stderr, level=logging.DEBUG)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

hub = HubInstance()

current_user = hub.get_current_user()

user_notifications_url = hub.get_link(current_user, "notifications")
user_notifications_url = "{}?limit={}".format(user_notifications_url, args.limit)

# notifications_url = "{}/api/notifications?limit={}".format(hub.get_urlbase(), args.limit)

vulnerability_notifications = []

response = hub.execute_get(user_notifications_url)
if response.status_code == 200:
notifications = response.json()
notifications = notifications.get('items', [])

vulnerability_notifications = list(filter(lambda n: n['type'] == "VULNERABILITY", notifications))
if newer_than:
vulnerability_notifications = list(
filter(lambda n: timestring.Date(n['createdAt']) > newer_than, vulnerability_notifications))
if args.project:
vulnerability_notifications = list(
filter(lambda n: args.project in [apv['projectName'] for apv in n['content']['affectedProjectVersions']],
vulnerability_notifications))
if args.version:
vulnerability_notifications = list(
filter(lambda n: args.version in [apv['projectVersionName'] for apv in n['content']['affectedProjectVersions']],
vulnerability_notifications))
else:
print("Failed to retrieve notifications for user {}".format(current_user))

print(json.dumps(vulnerability_notifications))