finite_state_sdk
1import json 2from enum import Enum 3 4import requests 5import time 6from warnings import warn 7import finite_state_sdk.queries as queries 8from tenacity import retry, retry_if_exception, stop_after_attempt, wait_fixed 9from finite_state_sdk.utils import ( 10 BreakoutException, 11 is_mutation, 12 is_not_breakout_exception, 13) 14 15API_URL = 'https://platform.finitestate.io/api/v1/graphql' 16AUDIENCE = "https://platform.finitestate.io/api/v1/graphql" 17TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token" 18 19""" 20DEFAULT CHUNK SIZE: 1000 MiB 21""" 22DEFAULT_CHUNK_SIZE = 1024**2 * 1000 23""" 24MAX CHUNK SIZE: 2 GiB 25""" 26MAX_CHUNK_SIZE = 1024**2 * 2000 27""" 28MIN CHUNK SIZE: 5 MiB 29""" 30MIN_CHUNK_SIZE = 1024**2 * 5 31 32 33class UploadMethod(Enum): 34 """ 35 Enumeration class representing different upload methods. 36 37 Attributes: 38 WEB_APP_UI: Upload method via web application UI. 39 API: Upload method via API. 40 GITHUB_INTEGRATION: Upload method via GitHub integration. 41 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 42 43 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 44 """ 45 WEB_APP_UI = "WEB_APP_UI" 46 API = "API" 47 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 48 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION" 49 50 51def create_artifact( 52 token, 53 organization_context, 54 business_unit_id=None, 55 created_by_user_id=None, 56 asset_version_id=None, 57 artifact_name=None, 58 product_id=None, 59): 60 """ 61 Create a new Artifact. 62 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 63 Please see the examples in the Github repository for more information: 64 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 65 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 66 67 Args: 68 token (str): 69 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 70 organization_context (str): 71 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 72 business_unit_id (str, required): 73 Business Unit ID to associate the artifact with. 74 created_by_user_id (str, required): 75 User ID of the user creating the artifact. 76 asset_version_id (str, required): 77 Asset Version ID to associate the artifact with. 78 artifact_name (str, required): 79 The name of the Artifact being created. 80 product_id (str, optional): 81 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 82 83 Raises: 84 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 85 Exception: Raised if the query fails. 86 87 Returns: 88 dict: createArtifact Object 89 """ 90 if not business_unit_id: 91 raise ValueError("Business unit ID is required") 92 if not created_by_user_id: 93 raise ValueError("Created by user ID is required") 94 if not asset_version_id: 95 raise ValueError("Asset version ID is required") 96 if not artifact_name: 97 raise ValueError("Artifact name is required") 98 99 graphql_query = """ 100 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 101 createArtifact(input: $input) { 102 id 103 name 104 assetVersion { 105 id 106 name 107 asset { 108 id 109 name 110 } 111 } 112 createdBy { 113 id 114 email 115 } 116 ctx { 117 asset 118 products 119 businessUnits 120 } 121 } 122 } 123 """ 124 125 # Asset name, business unit context, and creating user are required 126 variables = { 127 "input": { 128 "name": artifact_name, 129 "createdBy": created_by_user_id, 130 "assetVersion": asset_version_id, 131 "ctx": { 132 "asset": asset_version_id, 133 "businessUnits": [business_unit_id] 134 } 135 } 136 } 137 138 if product_id is not None: 139 variables["input"]["ctx"]["products"] = product_id 140 141 response = send_graphql_query(token, organization_context, graphql_query, variables) 142 return response['data'] 143 144 145def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 146 """ 147 Create a new Asset. 148 149 Args: 150 token (str): 151 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 152 organization_context (str): 153 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 154 business_unit_id (str, required): 155 Business Unit ID to associate the asset with. 156 created_by_user_id (str, required): 157 User ID of the user creating the asset. 158 asset_name (str, required): 159 The name of the Asset being created. 160 product_id (str, optional): 161 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 162 163 Raises: 164 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 165 Exception: Raised if the query fails. 166 167 Returns: 168 dict: createAsset Object 169 """ 170 if not business_unit_id: 171 raise ValueError("Business unit ID is required") 172 if not created_by_user_id: 173 raise ValueError("Created by user ID is required") 174 if not asset_name: 175 raise ValueError("Asset name is required") 176 177 graphql_query = """ 178 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 179 createAsset(input: $input) { 180 id 181 name 182 dependentProducts { 183 id 184 name 185 } 186 group { 187 id 188 name 189 } 190 createdBy { 191 id 192 email 193 } 194 ctx { 195 asset 196 products 197 businessUnits 198 } 199 } 200 } 201 """ 202 203 # Asset name, business unit context, and creating user are required 204 variables = { 205 "input": { 206 "name": asset_name, 207 "group": business_unit_id, 208 "createdBy": created_by_user_id, 209 "ctx": { 210 "businessUnits": [business_unit_id] 211 } 212 } 213 } 214 215 if product_id is not None: 216 variables["input"]["ctx"]["products"] = product_id 217 218 response = send_graphql_query(token, organization_context, graphql_query, variables) 219 return response['data'] 220 221 222def create_asset_version( 223 token, 224 organization_context, 225 business_unit_id=None, 226 created_by_user_id=None, 227 asset_id=None, 228 asset_version_name=None, 229 product_id=None, 230): 231 """ 232 Create a new Asset Version. 233 234 Args: 235 token (str): 236 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 237 organization_context (str): 238 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 239 business_unit_id (str, required): 240 Business Unit ID to associate the asset version with. 241 created_by_user_id (str, required): 242 User ID of the user creating the asset version. 243 asset_id (str, required): 244 Asset ID to associate the asset version with. 245 asset_version_name (str, required): 246 The name of the Asset Version being created. 247 product_id (str, optional): 248 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 249 250 Raises: 251 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 252 Exception: Raised if the query fails. 253 254 Returns: 255 dict: createAssetVersion Object 256 257 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 258 """ 259 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 260 if not business_unit_id: 261 raise ValueError("Business unit ID is required") 262 if not created_by_user_id: 263 raise ValueError("Created by user ID is required") 264 if not asset_id: 265 raise ValueError("Asset ID is required") 266 if not asset_version_name: 267 raise ValueError("Asset version name is required") 268 269 graphql_query = """ 270 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 271 createAssetVersion(input: $input) { 272 id 273 name 274 asset { 275 id 276 name 277 } 278 createdBy { 279 id 280 email 281 } 282 ctx { 283 asset 284 products 285 businessUnits 286 } 287 } 288 } 289 """ 290 291 # Asset name, business unit context, and creating user are required 292 variables = { 293 "input": { 294 "name": asset_version_name, 295 "createdBy": created_by_user_id, 296 "asset": asset_id, 297 "ctx": { 298 "asset": asset_id, 299 "businessUnits": [business_unit_id] 300 } 301 } 302 } 303 304 if product_id is not None: 305 variables["input"]["ctx"]["products"] = product_id 306 307 response = send_graphql_query(token, organization_context, graphql_query, variables) 308 return response['data'] 309 310 311def create_asset_version_on_asset( 312 token, 313 organization_context, 314 created_by_user_id=None, 315 asset_id=None, 316 asset_version_name=None, 317 product_id=None, 318): 319 """ 320 Create a new Asset Version. 321 322 Args: 323 token (str): 324 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 325 organization_context (str): 326 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 327 created_by_user_id (str, optional): 328 User ID of the user creating the asset version. 329 asset_id (str, required): 330 Asset ID to associate the asset version with. 331 asset_version_name (str, required): 332 The name of the Asset Version being created. 333 334 Raises: 335 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 336 Exception: Raised if the query fails. 337 338 Returns: 339 dict: createAssetVersion Object 340 """ 341 if not asset_id: 342 raise ValueError("Asset ID is required") 343 if not asset_version_name: 344 raise ValueError("Asset version name is required") 345 346 graphql_query = """ 347 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) { 348 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) { 349 id 350 assetVersion { 351 id 352 } 353 } 354 } 355 """ 356 357 # Asset name, business unit context, and creating user are required 358 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 359 360 if created_by_user_id: 361 variables["createdByUserId"] = created_by_user_id 362 363 if product_id: 364 variables["productId"] = product_id 365 366 response = send_graphql_query(token, organization_context, graphql_query, variables) 367 return response['data'] 368 369 370def create_new_asset_version_artifact_and_test_for_upload( 371 token, 372 organization_context, 373 business_unit_id=None, 374 created_by_user_id=None, 375 asset_id=None, 376 version=None, 377 product_id=None, 378 test_type=None, 379 artifact_description=None, 380 upload_method: UploadMethod = UploadMethod.API, 381): 382 """ 383 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 384 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 385 386 Args: 387 token (str): 388 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 389 organization_context (str): 390 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 391 business_unit_id (str, optional): 392 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 393 created_by_user_id (str, optional): 394 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 395 asset_id (str, required): 396 Asset ID to create the asset version for. If not provided, the default asset will be used. 397 version (str, required): 398 Version to create the asset version for. 399 product_id (str, optional): 400 Product ID to create the entities for. If not provided, the default product will be used. 401 test_type (str, required): 402 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 403 artifact_description (str, optional): 404 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 405 upload_method (UploadMethod, optional): 406 The method of uploading the test results. Default is UploadMethod.API. 407 408 409 Raises: 410 ValueError: Raised if asset_id or version are not provided. 411 Exception: Raised if the query fails. 412 413 Returns: 414 str: The Test ID of the newly created test that is used for uploading the file. 415 """ 416 if not asset_id: 417 raise ValueError("Asset ID is required") 418 if not version: 419 raise ValueError("Version is required") 420 421 assets = get_all_assets(token, organization_context, asset_id=asset_id) 422 if not assets: 423 raise ValueError("No assets found with the provided asset ID") 424 asset = assets[0] 425 426 # get the asset name 427 asset_name = asset['name'] 428 429 # get the existing asset product IDs 430 asset_product_ids = asset['ctx']['products'] 431 432 # get the asset product ID 433 if product_id and product_id not in asset_product_ids: 434 asset_product_ids.append(product_id) 435 436 # if business_unit_id or created_by_user_id are not provided, get the existing asset 437 if not business_unit_id or not created_by_user_id: 438 if not business_unit_id: 439 business_unit_id = asset['group']['id'] 440 if not created_by_user_id: 441 created_by_user_id = asset['createdBy']['id'] 442 443 if not business_unit_id: 444 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 445 if not created_by_user_id: 446 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 447 448 # create the asset version 449 response = create_asset_version_on_asset( 450 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id 451 ) 452 # get the asset version ID 453 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 454 455 # create the test 456 if test_type == "finite_state_binary_analysis": 457 # create the artifact 458 if not artifact_description: 459 artifact_description = "Binary" 460 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 461 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 462 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 463 artifact_name=binary_artifact_name, product_id=asset_product_ids) 464 465 # get the artifact ID 466 binary_artifact_id = response['createArtifact']['id'] 467 468 # create the test 469 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 470 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 471 created_by_user_id=created_by_user_id, asset_id=asset_id, 472 artifact_id=binary_artifact_id, product_id=asset_product_ids, 473 test_name=test_name, upload_method=upload_method) 474 test_id = response['createTest']['id'] 475 return test_id 476 477 else: 478 # create the artifact 479 if not artifact_description: 480 artifact_description = "Unspecified Artifact" 481 artifact_name = f"{asset_name} {version} - {artifact_description}" 482 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 483 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 484 artifact_name=artifact_name, product_id=asset_product_ids) 485 486 # get the artifact ID 487 binary_artifact_id = response['createArtifact']['id'] 488 489 # create the test 490 test_name = f"{asset_name} {version} - {test_type}" 491 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 492 created_by_user_id=created_by_user_id, asset_id=asset_id, 493 artifact_id=binary_artifact_id, product_id=asset_product_ids, 494 test_name=test_name, test_type=test_type, 495 upload_method=upload_method) 496 test_id = response['createTest']['id'] 497 return test_id 498 499 500def create_new_asset_version_and_upload_binary( 501 token, 502 organization_context, 503 business_unit_id=None, 504 created_by_user_id=None, 505 asset_id=None, 506 version=None, 507 file_path=None, 508 product_id=None, 509 artifact_description=None, 510 quick_scan=False, 511 upload_method: UploadMethod = UploadMethod.API, 512 enable_bandit_scan: bool = False, 513): 514 """ 515 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 516 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 517 518 Args: 519 token (str): 520 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 521 organization_context (str): 522 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 523 business_unit_id (str, optional): 524 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 525 created_by_user_id (str, optional): 526 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 527 asset_id (str, required): 528 Asset ID to create the asset version for. 529 version (str, required): 530 Version to create the asset version for. 531 file_path (str, required): 532 Local path to the file to upload. 533 product_id (str, optional): 534 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 535 artifact_description (str, optional): 536 Description of the artifact. If not provided, the default is "Firmware Binary". 537 quick_scan (bool, optional): 538 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 539 upload_method (UploadMethod, optional): 540 The method of uploading the test results. Default is UploadMethod.API. 541 enable_bandit_scan (bool, optional): 542 If True, will create an additional bandit scan in addition to the default binary analysis scan. 543 544 Raises: 545 ValueError: Raised if asset_id, version, or file_path are not provided. 546 Exception: Raised if any of the queries fail. 547 548 Returns: 549 dict: The response from the GraphQL query, a createAssetVersion Object. 550 """ 551 if not asset_id: 552 raise ValueError("Asset ID is required") 553 if not version: 554 raise ValueError("Version is required") 555 if not file_path: 556 raise ValueError("File path is required") 557 558 # create the asset version and binary test 559 if not artifact_description: 560 artifact_description = "Firmware Binary" 561 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 562 token, 563 organization_context, 564 business_unit_id=business_unit_id, 565 created_by_user_id=created_by_user_id, 566 asset_id=asset_id, 567 version=version, 568 product_id=product_id, 569 test_type="finite_state_binary_analysis", 570 artifact_description=artifact_description, 571 upload_method=upload_method, 572 ) 573 574 # upload file for binary test 575 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 576 quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan) 577 return response 578 579 580def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 581 created_by_user_id=None, asset_id=None, version=None, 582 file_path=None, product_id=None, test_type=None, 583 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 584 """ 585 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 586 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 587 588 Args: 589 token (str): 590 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 591 organization_context (str): 592 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 593 business_unit_id (str, optional): 594 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 595 created_by_user_id (str, optional): 596 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 597 asset_id (str, required): 598 Asset ID to create the asset version for. 599 version (str, required): 600 Version to create the asset version for. 601 file_path (str, required): 602 Path to the test results file to upload. 603 product_id (str, optional): 604 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 605 test_type (str, required): 606 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 607 artifact_description (str, optional): 608 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 609 upload_method (UploadMethod, optional): 610 The method of uploading the test results. Default is UploadMethod.API. 611 612 Raises: 613 ValueError: If the asset_id, version, or file_path are not provided. 614 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 615 616 Returns: 617 dict: The response from the GraphQL query, a createAssetVersion Object. 618 """ 619 if not asset_id: 620 raise ValueError("Asset ID is required") 621 if not version: 622 raise ValueError("Version is required") 623 if not file_path: 624 raise ValueError("File path is required") 625 if not test_type: 626 raise ValueError("Test type is required") 627 628 # create the asset version and test 629 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 630 business_unit_id=business_unit_id, 631 created_by_user_id=created_by_user_id, 632 asset_id=asset_id, version=version, 633 product_id=product_id, test_type=test_type, 634 artifact_description=artifact_description, 635 upload_method=upload_method) 636 637 # upload test results file 638 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 639 return response 640 641 642def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 643 product_description=None, vendor_id=None, vendor_name=None): 644 """ 645 Create a new Product. 646 647 Args: 648 token (str): 649 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 650 organization_context (str): 651 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 652 business_unit_id (str, required): 653 Business Unit ID to associate the product with. 654 created_by_user_id (str, required): 655 User ID of the user creating the product. 656 product_name (str, required): 657 The name of the Product being created. 658 product_description (str, optional): 659 The description of the Product being created. 660 vendor_id (str, optional): 661 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 662 vendor_name (str, optional): 663 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 664 665 Raises: 666 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 667 Exception: Raised if the query fails. 668 669 Returns: 670 dict: createProduct Object 671 """ 672 673 if not business_unit_id: 674 raise ValueError("Business unit ID is required") 675 if not created_by_user_id: 676 raise ValueError("Created by user ID is required") 677 if not product_name: 678 raise ValueError("Product name is required") 679 680 graphql_query = """ 681 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 682 createProduct(input: $input) { 683 id 684 name 685 vendor { 686 name 687 } 688 group { 689 id 690 name 691 } 692 createdBy { 693 id 694 email 695 } 696 ctx { 697 businessUnit 698 } 699 } 700 } 701 """ 702 703 # Product name, business unit context, and creating user are required 704 variables = { 705 "input": { 706 "name": product_name, 707 "group": business_unit_id, 708 "createdBy": created_by_user_id, 709 "ctx": { 710 "businessUnit": business_unit_id 711 } 712 } 713 } 714 715 if product_description is not None: 716 variables["input"]["description"] = product_description 717 718 # If the vendor ID is specified, this will link the new product to the existing vendor 719 if vendor_id is not None: 720 variables["input"]["vendor"] = { 721 "id": vendor_id 722 } 723 724 # If the vendor name is specified, this will create a new vendor and link it to the new product 725 if vendor_name is not None: 726 variables["input"]["createVendor"] = { 727 "name": vendor_name 728 } 729 730 response = send_graphql_query(token, organization_context, graphql_query, variables) 731 732 return response['data'] 733 734 735def create_test( 736 token, 737 organization_context, 738 business_unit_id=None, 739 created_by_user_id=None, 740 asset_id=None, 741 artifact_id=None, 742 test_name=None, 743 product_id=None, 744 test_type=None, 745 tools=[], 746 upload_method: UploadMethod = UploadMethod.API, 747): 748 """ 749 Create a new Test object for uploading files. 750 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 751 Please see the examples in the Github repository for more information: 752 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 753 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 754 755 Args: 756 token (str): 757 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 758 organization_context (str): 759 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 760 business_unit_id (str, required): 761 Business Unit ID to associate the Test with. 762 created_by_user_id (str, required): 763 User ID of the user creating the Test. 764 asset_id (str, required): 765 Asset ID to associate the Test with. 766 artifact_id (str, required): 767 Artifact ID to associate the Test with. 768 test_name (str, required): 769 The name of the Test being created. 770 product_id (str, optional): 771 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 772 test_type (str, required): 773 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 774 tools (list, optional): 775 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 776 upload_method (UploadMethod, required): 777 The method of uploading the test results. 778 779 Raises: 780 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 781 Exception: Raised if the query fails. 782 783 Returns: 784 dict: createTest Object 785 """ 786 if not business_unit_id: 787 raise ValueError("Business unit ID is required") 788 if not created_by_user_id: 789 raise ValueError("Created by user ID is required") 790 if not asset_id: 791 raise ValueError("Asset ID is required") 792 if not artifact_id: 793 raise ValueError("Artifact ID is required") 794 if not test_name: 795 raise ValueError("Test name is required") 796 if not test_type: 797 raise ValueError("Test type is required") 798 799 graphql_query = """ 800 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 801 createTest(input: $input) { 802 id 803 name 804 artifactUnderTest { 805 id 806 name 807 assetVersion { 808 id 809 name 810 asset { 811 id 812 name 813 dependentProducts { 814 id 815 name 816 } 817 } 818 } 819 } 820 createdBy { 821 id 822 email 823 } 824 ctx { 825 asset 826 products 827 businessUnits 828 } 829 uploadMethod 830 } 831 } 832 """ 833 834 # Asset name, business unit context, and creating user are required 835 variables = { 836 "input": { 837 "name": test_name, 838 "createdBy": created_by_user_id, 839 "artifactUnderTest": artifact_id, 840 "testResultFileFormat": test_type, 841 "ctx": { 842 "asset": asset_id, 843 "businessUnits": [business_unit_id] 844 }, 845 "tools": tools, 846 "uploadMethod": upload_method.value 847 } 848 } 849 850 if product_id is not None: 851 variables["input"]["ctx"]["products"] = product_id 852 853 response = send_graphql_query(token, organization_context, graphql_query, variables) 854 return response['data'] 855 856 857def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 858 asset_id=None, artifact_id=None, test_name=None, product_id=None, 859 upload_method: UploadMethod = UploadMethod.API): 860 """ 861 Create a new Test object for uploading files for Finite State Binary Analysis. 862 863 Args: 864 token (str): 865 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 866 organization_context (str): 867 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 868 business_unit_id (str, required): 869 Business Unit ID to associate the Test with. 870 created_by_user_id (str, required): 871 User ID of the user creating the Test. 872 asset_id (str, required): 873 Asset ID to associate the Test with. 874 artifact_id (str, required): 875 Artifact ID to associate the Test with. 876 test_name (str, required): 877 The name of the Test being created. 878 product_id (str, optional): 879 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 880 upload_method (UploadMethod, optional): 881 The method of uploading the test results. Default is UploadMethod.API. 882 883 Raises: 884 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 885 Exception: Raised if the query fails. 886 887 Returns: 888 dict: createTest Object 889 """ 890 tools = [ 891 { 892 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 893 "name": "Finite State Binary Analysis" 894 } 895 ] 896 return create_test(token, organization_context, business_unit_id=business_unit_id, 897 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 898 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 899 tools=tools, upload_method=upload_method) 900 901 902def create_test_as_cyclone_dx( 903 token, 904 organization_context, 905 business_unit_id=None, 906 created_by_user_id=None, 907 asset_id=None, 908 artifact_id=None, 909 test_name=None, 910 product_id=None, 911 upload_method: UploadMethod = UploadMethod.API, 912): 913 """ 914 Create a new Test object for uploading CycloneDX files. 915 916 Args: 917 token (str): 918 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 919 organization_context (str): 920 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 921 business_unit_id (str, required): 922 Business Unit ID to associate the Test with. 923 created_by_user_id (str, required): 924 User ID of the user creating the Test. 925 asset_id (str, required): 926 Asset ID to associate the Test with. 927 artifact_id (str, required): 928 Artifact ID to associate the Test with. 929 test_name (str, required): 930 The name of the Test being created. 931 product_id (str, optional): 932 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 933 upload_method (UploadMethod, optional): 934 The method of uploading the test results. Default is UploadMethod.API. 935 936 Raises: 937 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 938 Exception: Raised if the query fails. 939 940 Returns: 941 dict: createTest Object 942 """ 943 return create_test(token, organization_context, business_unit_id=business_unit_id, 944 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 945 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method) 946 947 948def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 949 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 950 upload_method: UploadMethod = UploadMethod.API): 951 """ 952 Create a new Test object for uploading Third Party Scanner files. 953 954 Args: 955 token (str): 956 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 957 organization_context (str): 958 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 959 business_unit_id (str, required): 960 Business Unit ID to associate the Test with. 961 created_by_user_id (str, required): 962 User ID of the user creating the Test. 963 asset_id (str, required): 964 Asset ID to associate the Test with. 965 artifact_id (str, required): 966 Artifact ID to associate the Test with. 967 test_name (str, required): 968 The name of the Test being created. 969 product_id (str, optional): 970 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 971 test_type (str, required): 972 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 973 upload_method (UploadMethod, optional): 974 The method of uploading the test results. Default is UploadMethod.API. 975 976 Raises: 977 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 978 Exception: Raised if the query fails. 979 980 Returns: 981 dict: createTest Object 982 """ 983 return create_test(token, organization_context, business_unit_id=business_unit_id, 984 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 985 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method) 986 987 988def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 989 report_subtype=None, output_filename=None, verbose=False): 990 """ 991 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 992 993 Args: 994 token (str): 995 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 996 organization_context (str): 997 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 998 asset_version_id (str, required): 999 The Asset Version ID to download the report for. 1000 report_type (str, required): 1001 The file type of the report to download. Valid values are "CSV" and "PDF". 1002 report_subtype (str, required): 1003 The type of report to download. Based on available reports for the `report_type` specified 1004 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1005 Valid values for PDF are "RISK_SUMMARY". 1006 output_filename (str, optional): 1007 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1008 verbose (bool, optional): 1009 If True, will print additional information to the console. Defaults to False. 1010 1011 Raises: 1012 ValueError: Raised if required parameters are not provided. 1013 Exception: Raised if the query fails. 1014 1015 Returns: 1016 None 1017 """ 1018 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1019 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1020 1021 # Send an HTTP GET request to the URL 1022 response = requests.get(url) 1023 1024 # Check if the request was successful (status code 200) 1025 if response.status_code == 200: 1026 # Open a local file in binary write mode and write the content to it 1027 if verbose: 1028 print("File downloaded successfully.") 1029 with open(output_filename, 'wb') as file: 1030 file.write(response.content) 1031 if verbose: 1032 print(f'Wrote file to {output_filename}') 1033 else: 1034 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1035 1036 1037def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1038 output_filename=None, verbose=False): 1039 """ 1040 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1041 1042 Args: 1043 token (str): 1044 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1045 organization_context (str): 1046 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1047 product_id (str, required): 1048 The Product ID to download the report for. 1049 report_type (str, required): 1050 The file type of the report to download. Valid values are "CSV". 1051 report_subtype (str, required): 1052 The type of report to download. Based on available reports for the `report_type` specified 1053 Valid values for CSV are "ALL_FINDINGS". 1054 output_filename (str, optional): 1055 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1056 verbose (bool, optional): 1057 If True, will print additional information to the console. Defaults to False. 1058 """ 1059 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1060 report_subtype=report_subtype, verbose=verbose) 1061 1062 # Send an HTTP GET request to the URL 1063 response = requests.get(url) 1064 1065 # Check if the request was successful (status code 200) 1066 if response.status_code == 200: 1067 # Open a local file in binary write mode and write the content to it 1068 if verbose: 1069 print("File downloaded successfully.") 1070 with open(output_filename, 'wb') as file: 1071 file.write(response.content) 1072 if verbose: 1073 print(f'Wrote file to {output_filename}') 1074 else: 1075 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1076 1077 1078def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1079 output_filename="sbom.json", verbose=False): 1080 """ 1081 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1082 1083 Args: 1084 token (str): 1085 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1086 organization_context (str): 1087 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1088 sbom_type (str, required): 1089 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1090 sbom_subtype (str, required): 1091 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1092 asset_version_id (str, required): 1093 The Asset Version ID to download the SBOM for. 1094 output_filename (str, required): 1095 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1096 verbose (bool, optional): 1097 If True, will print additional information to the console. Defaults to False. 1098 1099 Raises: 1100 ValueError: Raised if required parameters are not provided. 1101 Exception: Raised if the query fails. 1102 1103 Returns: 1104 None 1105 """ 1106 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1107 asset_version_id=asset_version_id, verbose=verbose) 1108 1109 # Send an HTTP GET request to the URL 1110 response = requests.get(url) 1111 1112 # Check if the request was successful (status code 200) 1113 if response.status_code == 200: 1114 # Open a local file in binary write mode and write the content to it 1115 if verbose: 1116 print("File downloaded successfully.") 1117 with open(output_filename, 'wb') as file: 1118 file.write(response.content) 1119 if verbose: 1120 print(f'Wrote file to {output_filename}') 1121 else: 1122 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1123 1124 1125def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1126 """ 1127 Helper method to read a file in chunks. 1128 1129 Args: 1130 file_path (str): 1131 Local path to the file to read. 1132 chunk_size (int, optional): 1133 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1134 1135 Yields: 1136 bytes: The next chunk of the file. 1137 1138 Raises: 1139 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1140 """ 1141 with open(file_path, 'rb') as f: 1142 while True: 1143 chunk = f.read(chunk_size) 1144 if chunk: 1145 yield chunk 1146 else: 1147 break 1148 1149 1150def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1151 """ 1152 Get all artifacts in the organization. Uses pagination to get all results. 1153 1154 Args: 1155 token (str): 1156 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1157 organization_context (str): 1158 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1159 artifact_id (str, optional): 1160 An optional Artifact ID if this is used to get a single artifact, by default None 1161 business_unit_id (str, optional): 1162 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1163 1164 Raises: 1165 Exception: Raised if the query fails. 1166 1167 Returns: 1168 list: List of Artifact Objects 1169 """ 1170 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1171 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets') 1172 1173 1174def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1175 """ 1176 Gets all assets in the organization. Uses pagination to get all results. 1177 1178 Args: 1179 token (str): 1180 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1181 organization_context (str): 1182 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1183 asset_id (str, optional): 1184 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1185 business_unit_id (str, optional): 1186 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1187 1188 Raises: 1189 Exception: Raised if the query fails. 1190 1191 Returns: 1192 list: List of Asset Objects 1193 """ 1194 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1195 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1196 1197 1198def get_all_asset_versions(token, organization_context): 1199 """ 1200 Get all asset versions in the organization. Uses pagination to get all results. 1201 1202 Args: 1203 token (str): 1204 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1205 organization_context (str): 1206 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1207 1208 Raises: 1209 Exception: Raised if the query fails. 1210 1211 Returns: 1212 list: List of AssetVersion Objects 1213 """ 1214 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1215 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions') 1216 1217 1218def get_all_asset_versions_for_product(token, organization_context, product_id): 1219 """ 1220 Get all asset versions for a product. Uses pagination to get all results. 1221 1222 Args: 1223 token (str): 1224 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1225 organization_context (str): 1226 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1227 product_id (str): 1228 The Product ID to get asset versions for 1229 1230 Returns: 1231 list: List of AssetVersion Objects 1232 """ 1233 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1234 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1235 1236 1237def get_all_business_units(token, organization_context): 1238 """ 1239 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1240 1241 Args: 1242 token (str): 1243 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1244 organization_context (str): 1245 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1246 1247 Raises: 1248 Exception: Raised if the query fails. 1249 1250 Returns: 1251 list: List of Group Objects 1252 """ 1253 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1254 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups') 1255 1256 1257def get_all_organizations(token, organization_context): 1258 """ 1259 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1260 1261 Args: 1262 token (str): 1263 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1264 organization_context (str): 1265 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1266 1267 Raises: 1268 Exception: Raised if the query fails. 1269 1270 Returns: 1271 list: List of Organization Objects 1272 """ 1273 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1274 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations') 1275 1276 1277def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1278 """ 1279 Get all results from a paginated GraphQL query 1280 1281 Args: 1282 token (str): 1283 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1284 organization_context (str): 1285 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1286 query (str): 1287 The GraphQL query string 1288 variables (dict, optional): 1289 Variables to be used in the GraphQL query, by default None 1290 field (str, required): 1291 The field in the response JSON that contains the results 1292 limit (int, Optional): 1293 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1294 1295 Raises: 1296 Exception: If the response status code is not 200, or if the field is not in the response JSON 1297 1298 Returns: 1299 list: List of results 1300 """ 1301 1302 if not field: 1303 raise Exception("Error: field is required") 1304 if limit and limit > 1000: 1305 raise Exception("Error: limit cannot be greater than 1000") 1306 if limit and limit < 1: 1307 raise Exception("Error: limit cannot be less than 1") 1308 if not variables["first"]: 1309 raise Exception("Error: first is required") 1310 if variables["first"] < 1: 1311 raise Exception("Error: first cannot be less than 1") 1312 if variables["first"] > 1000: 1313 raise Exception("Error: limit cannot be greater than 1000") 1314 1315 # query the API for the first page of results 1316 response_data = send_graphql_query(token, organization_context, query, variables) 1317 1318 # if there are no results, return an empty list 1319 if not response_data: 1320 return [] 1321 1322 # create a list to store the results 1323 results = [] 1324 1325 # add the first page of results to the list 1326 if field in response_data['data']: 1327 results.extend(response_data['data'][field]) 1328 else: 1329 raise Exception(f"Error: {field} not in response JSON") 1330 1331 if len(response_data['data'][field]) > 0: 1332 # get the cursor from the last entry in the list 1333 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1334 1335 while cursor: 1336 if limit and len(results) == limit: 1337 break 1338 1339 variables['after'] = cursor 1340 1341 # add the next page of results to the list 1342 response_data = send_graphql_query(token, organization_context, query, variables) 1343 results.extend(response_data['data'][field]) 1344 1345 try: 1346 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1347 except IndexError: 1348 # when there is no additional cursor, stop getting more pages 1349 cursor = None 1350 1351 return results 1352 1353 1354def get_all_products(token, organization_context): 1355 """ 1356 Get all products in the organization. Uses pagination to get all results. 1357 1358 Args: 1359 token (str): 1360 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1361 organization_context (str): 1362 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1363 1364 Raises: 1365 Exception: Raised if the query fails. 1366 1367 Returns: 1368 list: List of Product Objects 1369 1370 .. deprecated:: 0.1.4. Use get_products instead. 1371 """ 1372 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1373 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1374 queries.ALL_PRODUCTS['variables'], 'allProducts') 1375 1376 1377def get_all_users(token, organization_context): 1378 """ 1379 Get all users in the organization. Uses pagination to get all results. 1380 1381 Args: 1382 token (str): 1383 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1384 organization_context (str): 1385 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1386 1387 Raises: 1388 Exception: Raised if the query fails. 1389 1390 Returns: 1391 list: List of User Objects 1392 """ 1393 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1394 queries.ALL_USERS['variables'], 'allUsers') 1395 1396 1397def get_artifact_context(token, organization_context, artifact_id): 1398 """ 1399 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1400 1401 Args: 1402 token (str): 1403 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1404 organization_context (str): 1405 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1406 1407 Raises: 1408 Exception: Raised if the query fails. 1409 1410 Returns: 1411 dict: Artifact Context Object 1412 """ 1413 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1414 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1415 1416 return artifact[0]['ctx'] 1417 1418 1419def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1420 """ 1421 Gets assets in the organization. Uses pagination to get all results. 1422 1423 Args: 1424 token (str): 1425 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1426 organization_context (str): 1427 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1428 asset_id (str, optional): 1429 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1430 business_unit_id (str, optional): 1431 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1432 1433 Raises: 1434 Exception: Raised if the query fails. 1435 1436 Returns: 1437 list: List of Asset Objects 1438 """ 1439 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1440 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1441 1442 1443def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1444 """ 1445 Gets asset versions in the organization. Uses pagination to get all results. 1446 1447 Args: 1448 token (str): 1449 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1450 organization_context (str): 1451 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1452 asset_version_id (str, optional): 1453 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1454 asset_id (str, optional): 1455 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1456 business_unit_id (str, optional): 1457 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1458 1459 Raises: 1460 Exception: Raised if the query fails. 1461 1462 Returns: 1463 list: List of AssetVersion Objects 1464 """ 1465 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1466 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1467 asset_id=asset_id, 1468 business_unit_id=business_unit_id), 1469 'allAssetVersions') 1470 1471 1472def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1473 """ 1474 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1475 1476 Args: 1477 client_id (str): 1478 CLIENT_ID as specified in the API documentation 1479 client_secret (str): 1480 CLIENT_SECRET as specified in the API documentation 1481 token_url (str, optional): 1482 Token URL, by default TOKEN_URL 1483 audience (str, optional): 1484 Audience, by default AUDIENCE 1485 1486 Raises: 1487 Exception: If the response status code is not 200 1488 1489 Returns: 1490 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1491 """ 1492 payload = { 1493 "client_id": client_id, 1494 "client_secret": client_secret, 1495 "audience": AUDIENCE, 1496 "grant_type": "client_credentials" 1497 } 1498 1499 headers = { 1500 'content-type': "application/json" 1501 } 1502 1503 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1504 if response.status_code == 200: 1505 auth_token = response.json()['access_token'] 1506 else: 1507 raise Exception(f"Error: {response.status_code} - {response.text}") 1508 1509 return auth_token 1510 1511 1512def get_findings( 1513 token, 1514 organization_context, 1515 asset_version_id=None, 1516 finding_id=None, 1517 category=None, 1518 status=None, 1519 severity=None, 1520 count=False, 1521 limit=None, 1522): 1523 """ 1524 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1525 Args: 1526 token (str): 1527 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1528 organization_context (str): 1529 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1530 asset_version_id (str, optional): 1531 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1532 finding_id (str, optional): 1533 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1534 category (str, optional): 1535 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1536 This can be a single string, or an array of values. 1537 status (str, optional): 1538 The status of Findings to return. 1539 severity (str, optional): 1540 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1541 count (bool, optional): 1542 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1543 limit (int, optional): 1544 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1545 1546 Raises: 1547 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1548 1549 Returns: 1550 list: List of Finding Objects 1551 """ 1552 1553 if limit and limit > 1000: 1554 raise Exception("Error: limit must be less than 1000") 1555 if limit and limit < 1: 1556 raise Exception("Error: limit must be greater than 0") 1557 1558 if count: 1559 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1560 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1561 finding_id=finding_id, category=category, 1562 status=status, severity=severity, 1563 limit=limit))["data"]["_allFindingsMeta"] 1564 else: 1565 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1566 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1567 finding_id=finding_id, category=category, 1568 status=status, severity=severity, 1569 limit=limit), 'allFindings', limit=limit) 1570 1571 1572def get_product_asset_versions(token, organization_context, product_id=None): 1573 """ 1574 Gets all the asset versions for a product. 1575 Args: 1576 token (str): 1577 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1578 organization_context (str): 1579 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1580 product_id (str, optional): 1581 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1582 Raises: 1583 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1584 Returns: 1585 list: List of AssetVersion Objects 1586 """ 1587 if not product_id: 1588 raise Exception("Product ID is required") 1589 1590 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1591 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1592 1593 1594def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1595 """ 1596 Gets all the products for the specified business unit. 1597 Args: 1598 token (str): 1599 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1600 organization_context (str): 1601 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1602 product_id (str, optional): 1603 Product ID to get. If not provided, will get all products in the organization. 1604 business_unit_id (str, optional): 1605 Business Unit ID to get products for. If not provided, will get all products in the organization. 1606 Raises: 1607 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1608 Returns: 1609 list: List of Product Objects 1610 """ 1611 1612 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1613 queries.GET_PRODUCTS['variables'](product_id=product_id, 1614 business_unit_id=business_unit_id), 1615 'allProducts') 1616 1617 1618def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1619 report_subtype=None, verbose=False) -> str: 1620 """ 1621 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1622 This may take several minutes to complete, depending on the size of the report. 1623 1624 Args: 1625 token (str): 1626 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1627 organization_context (str): 1628 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1629 asset_version_id (str, optional): 1630 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1631 product_id (str, optional): 1632 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1633 report_type (str, required): 1634 The file type of the report to download. Valid values are "CSV" and "PDF". 1635 report_subtype (str, required): 1636 The type of report to download. Based on available reports for the `report_type` specified 1637 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1638 Valid values for PDF are "RISK_SUMMARY". 1639 verbose (bool, optional): 1640 If True, print additional information to the console. Defaults to False. 1641 """ 1642 if not report_type: 1643 raise ValueError("Report Type is required") 1644 if not report_subtype: 1645 raise ValueError("Report Subtype is required") 1646 if not asset_version_id and not product_id: 1647 raise ValueError("Asset Version ID or Product ID is required") 1648 1649 if asset_version_id and product_id: 1650 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1651 1652 if report_type not in ["CSV", "PDF"]: 1653 raise Exception(f"Report Type {report_type} not supported") 1654 1655 if report_type == "CSV": 1656 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1657 raise Exception(f"Report Subtype {report_subtype} not supported") 1658 1659 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1660 report_type=report_type, report_subtype=report_subtype) 1661 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1662 report_type=report_type, report_subtype=report_subtype) 1663 1664 response_data = send_graphql_query(token, organization_context, mutation, variables) 1665 if verbose: 1666 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1667 1668 # get exportJobId from the result 1669 if asset_version_id: 1670 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1671 elif product_id: 1672 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1673 else: 1674 raise Exception( 1675 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1676 1677 if verbose: 1678 print(f'Export Job ID: {export_job_id}') 1679 1680 if report_type == "PDF": 1681 if report_subtype not in ["RISK_SUMMARY"]: 1682 raise Exception(f"Report Subtype {report_subtype} not supported") 1683 1684 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1685 report_type=report_type, report_subtype=report_subtype) 1686 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1687 report_type=report_type, report_subtype=report_subtype) 1688 1689 response_data = send_graphql_query(token, organization_context, mutation, variables) 1690 if verbose: 1691 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1692 1693 # get exportJobId from the result 1694 if asset_version_id: 1695 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1696 elif product_id: 1697 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1698 else: 1699 raise Exception( 1700 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1701 1702 if verbose: 1703 print(f'Export Job ID: {export_job_id}') 1704 1705 if not export_job_id: 1706 raise Exception( 1707 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1708 1709 # poll the API until the export job is complete 1710 sleep_time = 10 1711 total_time = 0 1712 if verbose: 1713 print(f'Polling every {sleep_time} seconds for export job to complete') 1714 1715 while True: 1716 time.sleep(sleep_time) 1717 total_time += sleep_time 1718 if verbose: 1719 print(f'Total time elapsed: {total_time} seconds') 1720 1721 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1722 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1723 1724 response_data = send_graphql_query(token, organization_context, query, variables) 1725 1726 if verbose: 1727 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1728 1729 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1730 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1731 if verbose: 1732 print( 1733 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1734 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1735 1736 1737def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1738 verbose=False) -> str: 1739 """ 1740 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1741 This may take several minutes to complete, depending on the size of SBOM. 1742 1743 Args: 1744 token (str): 1745 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1746 organization_context (str): 1747 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1748 sbom_type (str, required): 1749 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1750 sbom_subtype (str, required): 1751 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1752 asset_version_id (str, required): 1753 Asset Version ID to download the SBOM for. 1754 verbose (bool, optional): 1755 If True, print additional information to the console. Defaults to False. 1756 1757 Raises: 1758 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1759 Exception: Raised if the query fails. 1760 1761 Returns: 1762 str: URL to download the SBOM from. 1763 """ 1764 1765 if not sbom_type: 1766 raise ValueError("SBOM Type is required") 1767 if not sbom_subtype: 1768 raise ValueError("SBOM Subtype is required") 1769 if not asset_version_id: 1770 raise ValueError("Asset Version ID is required") 1771 1772 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1773 raise Exception(f"SBOM Type {sbom_type} not supported") 1774 1775 if sbom_type == "CYCLONEDX": 1776 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1777 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1778 1779 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1780 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1781 1782 response_data = send_graphql_query(token, organization_context, mutation, variables) 1783 if verbose: 1784 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1785 1786 # get exportJobId from the result 1787 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1788 if verbose: 1789 print(f'Export Job ID: {export_job_id}') 1790 1791 if sbom_type == "SPDX": 1792 if sbom_subtype not in ["SBOM_ONLY"]: 1793 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1794 1795 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1796 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1797 1798 response_data = send_graphql_query(token, organization_context, mutation, variables) 1799 if verbose: 1800 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1801 1802 # get exportJobId from the result 1803 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1804 if verbose: 1805 print(f'Export Job ID: {export_job_id}') 1806 1807 if not export_job_id: 1808 raise Exception( 1809 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1810 1811 # poll the API until the export job is complete 1812 sleep_time = 10 1813 total_time = 0 1814 if verbose: 1815 print(f'Polling every {sleep_time} seconds for export job to complete') 1816 while True: 1817 time.sleep(sleep_time) 1818 total_time += sleep_time 1819 if verbose: 1820 print(f'Total time elapsed: {total_time} seconds') 1821 1822 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1823 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1824 1825 response_data = send_graphql_query(token, organization_context, query, variables) 1826 1827 if verbose: 1828 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1829 1830 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1831 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1832 if verbose: 1833 print( 1834 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1835 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1836 1837 1838def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1839 """ 1840 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1841 Args: 1842 token (str): 1843 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1844 organization_context (str): 1845 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1846 asset_version_id (str, optional): 1847 Asset Version ID to get software components for. 1848 type (str, optional): 1849 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1850 Raises: 1851 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1852 Returns: 1853 list: List of Software Component Objects 1854 """ 1855 if not asset_version_id: 1856 raise Exception("Asset Version ID is required") 1857 1858 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1859 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1860 type=type), 1861 'allSoftwareComponentInstances') 1862 1863 1864def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1865 case_sensitive=False) -> list: 1866 """ 1867 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1868 Search Methods: EXACT or CONTAINS 1869 An exact match will return only the software component whose name matches the name exactly. 1870 A contains match will return all software components whose name contains the search string. 1871 1872 Args: 1873 token (str): 1874 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1875 organization_context (str): 1876 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1877 name (str, required): 1878 Name of the software component to search for. 1879 version (str, optional): 1880 Version of the software component to search for. If not specified, will search for all versions of the software component. 1881 asset_version_id (str, optional): 1882 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1883 search_method (str, optional): 1884 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1885 case_sensitive (bool, optional): 1886 Whether or not to perform a case sensitive search. Defaults to False. 1887 Raises: 1888 ValueError: Raised if name is not provided. 1889 Exception: Raised if the query fails. 1890 Returns: 1891 list: List of SoftwareComponentInstance Objects 1892 """ 1893 if asset_version_id: 1894 query = """ 1895query GetSoftwareComponentInstances_SDK( 1896 $filter: SoftwareComponentInstanceFilter 1897 $after: String 1898 $first: Int 1899) { 1900 allSoftwareComponentInstances( 1901 filter: $filter 1902 after: $after 1903 first: $first 1904 ) { 1905 _cursor 1906 id 1907 name 1908 version 1909 originalComponents { 1910 id 1911 name 1912 version 1913 } 1914 } 1915} 1916""" 1917 else: 1918 # gets the asset version info that contains the software component 1919 query = """ 1920query GetSoftwareComponentInstances_SDK( 1921 $filter: SoftwareComponentInstanceFilter 1922 $after: String 1923 $first: Int 1924) { 1925 allSoftwareComponentInstances( 1926 filter: $filter 1927 after: $after 1928 first: $first 1929 ) { 1930 _cursor 1931 id 1932 name 1933 version 1934 assetVersion { 1935 id 1936 name 1937 asset { 1938 id 1939 name 1940 } 1941 } 1942 } 1943} 1944""" 1945 1946 variables = { 1947 "filter": { 1948 "mergedComponentRefId": None 1949 }, 1950 "after": None, 1951 "first": 100 1952 } 1953 1954 if asset_version_id: 1955 variables["filter"]["assetVersionRefId"] = asset_version_id 1956 1957 if search_method == 'EXACT': 1958 if case_sensitive: 1959 variables["filter"]["name"] = name 1960 else: 1961 variables["filter"]["name_like"] = name 1962 elif search_method == 'CONTAINS': 1963 variables["filter"]["name_contains"] = name 1964 1965 if version: 1966 if search_method == 'EXACT': 1967 variables["filter"]["version"] = version 1968 elif search_method == 'CONTAINS': 1969 variables["filter"]["version_contains"] = version 1970 1971 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1972 field="allSoftwareComponentInstances") 1973 1974 return records 1975 1976 1977@retry(stop=stop_after_attempt(5), wait=wait_fixed(5), retry=retry_if_exception(is_not_breakout_exception)) 1978def send_graphql_query(token, organization_context, query, variables=None): 1979 """ 1980 Send a GraphQL query to the API 1981 1982 Args: 1983 token (str): 1984 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1985 organization_context (str): 1986 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1987 query (str): 1988 The GraphQL query string 1989 variables (dict, optional): 1990 Variables to be used in the GraphQL query, by default None 1991 1992 Raises: 1993 Exception: If the response status code is not 200 1994 1995 Returns: 1996 dict: Response JSON 1997 """ 1998 headers = { 1999 "Content-Type": "application/json", 2000 "Authorization": f"Bearer {token}", 2001 "Organization-Context": organization_context, 2002 } 2003 data = {"query": query, "variables": variables} 2004 2005 response = requests.post(API_URL, headers=headers, json=data) 2006 if response.status_code == 200: 2007 thejson = response.json() 2008 2009 if "errors" in thejson: 2010 # Raise a BreakoutException for GraphQL errors 2011 raise BreakoutException(f"Error: {thejson['errors']}") 2012 2013 return thejson 2014 else: 2015 is_mutation_operation = is_mutation(query) 2016 if is_mutation_operation: 2017 raise BreakoutException(f"Error: {response.status_code} - {response.text}") 2018 else: 2019 raise Exception(f"Error: {response.status_code} - {response.text}") 2020 2021 2022def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2023 justification=None, response=None, comment=None): 2024 """ 2025 Updates the status of a findings or multiple findings. This is a blocking call. 2026 2027 Args: 2028 token (str): 2029 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2030 organization_context (str): 2031 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2032 user_id (str, required): 2033 User ID to update the finding status for. 2034 finding_ids (str, required): 2035 Finding ID to update the status for. 2036 status (str, required): 2037 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2038 justification (str, optional): 2039 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2040 response (str, optional): 2041 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2042 comment (str, optional): 2043 Optional comment to add to the finding status update. 2044 2045 Raises: 2046 ValueError: Raised if required parameters are not provided. 2047 Exception: Raised if the query fails. 2048 2049 Returns: 2050 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2051 """ 2052 if not user_id: 2053 raise ValueError("User Id is required") 2054 if not finding_ids: 2055 raise ValueError("Finding Ids is required") 2056 if not status: 2057 raise ValueError("Status is required") 2058 2059 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2060 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2061 justification=justification, response=response, 2062 comment=comment) 2063 2064 return send_graphql_query(token, organization_context, mutation, variables) 2065 2066 2067def upload_file_for_binary_analysis( 2068 token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False 2069): 2070 """ 2071 Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. 2072 NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that. 2073 2074 Args: 2075 token (str): 2076 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2077 organization_context (str): 2078 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2079 test_id (str, required): 2080 Test ID to upload the file for. 2081 file_path (str, required): 2082 Local path to the file to upload. 2083 chunk_size (int, optional): 2084 The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB. 2085 quick_scan (bool, optional): 2086 If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation. 2087 enable_bandit_scan (bool, optional): 2088 If True, will create an additional bandit scan in addition to the default binary analysis scan. 2089 2090 Raises: 2091 ValueError: Raised if test_id or file_path are not provided. 2092 Exception: Raised if the query fails. 2093 2094 Returns: 2095 dict: The response from the GraphQL query, a completeMultipartUpload Object. 2096 """ 2097 # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation 2098 if not test_id: 2099 raise ValueError("Test Id is required") 2100 if not file_path: 2101 raise ValueError("File Path is required") 2102 if chunk_size < MIN_CHUNK_SIZE: 2103 raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes") 2104 if chunk_size >= MAX_CHUNK_SIZE: 2105 raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes") 2106 2107 # Start Multi-part Upload 2108 graphql_query = """ 2109 mutation Start_SDK($testId: ID!) { 2110 startMultipartUploadV2(testId: $testId) { 2111 uploadId 2112 key 2113 } 2114 } 2115 """ 2116 2117 variables = { 2118 "testId": test_id 2119 } 2120 2121 response = send_graphql_query(token, organization_context, graphql_query, variables) 2122 2123 upload_id = response['data']['startMultipartUploadV2']['uploadId'] 2124 upload_key = response['data']['startMultipartUploadV2']['key'] 2125 2126 # if the file is greater than max chunk size (or 5 GB), split the file in chunks, 2127 # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part) 2128 # and upload the file to the returned upload URL 2129 i = 0 2130 part_data = [] 2131 for chunk in file_chunks(file_path, chunk_size): 2132 i = i + 1 2133 graphql_query = """ 2134 mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) { 2135 generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) { 2136 key 2137 uploadUrl 2138 } 2139 } 2140 """ 2141 2142 variables = { 2143 "partNumber": i, 2144 "uploadId": upload_id, 2145 "uploadKey": upload_key 2146 } 2147 2148 response = send_graphql_query(token, organization_context, graphql_query, variables) 2149 2150 chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl'] 2151 2152 # upload the chunk to the upload URL 2153 response = upload_bytes_to_url(chunk_upload_url, chunk) 2154 2155 part_data.append({ 2156 "ETag": response.headers['ETag'], 2157 "PartNumber": i 2158 }) 2159 2160 # call completeMultipartUploadV2 2161 graphql_query = """ 2162 mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) { 2163 completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) { 2164 key 2165 } 2166 } 2167 """ 2168 2169 variables = { 2170 "partData": part_data, 2171 "uploadId": upload_id, 2172 "uploadKey": upload_key 2173 } 2174 2175 response = send_graphql_query(token, organization_context, graphql_query, variables) 2176 2177 # get key from the result 2178 key = response['data']['completeMultipartUploadV2']['key'] 2179 2180 variables = { 2181 "key": key, 2182 "testId": test_id 2183 } 2184 2185 # call launchBinaryUploadProcessing 2186 if quick_scan or enable_bandit_scan: 2187 graphql_query = """ 2188 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) { 2189 launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) { 2190 key 2191 newBanditScanId 2192 } 2193 } 2194 """ 2195 else: 2196 graphql_query = """ 2197 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) { 2198 launchBinaryUploadProcessing(key: $key, testId: $testId) { 2199 key 2200 newBanditScanId 2201 } 2202 } 2203 """ 2204 2205 if quick_scan: 2206 variables["configurationOptions"] = ["QUICK_SCAN"] 2207 2208 if enable_bandit_scan: 2209 config_options = variables.get("configurationOptions", []) 2210 config_options.append("ENABLE_BANDIT_SCAN") 2211 variables["configurationOptions"] = config_options 2212 2213 response = send_graphql_query(token, organization_context, graphql_query, variables) 2214 2215 return response['data'] 2216 2217 2218def upload_test_results_file(token, organization_context, test_id=None, file_path=None): 2219 """ 2220 Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that. 2221 2222 Args: 2223 token (str): 2224 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2225 organization_context (str): 2226 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2227 test_id (str, required): 2228 Test ID to upload the file for. 2229 file_path (str, required): 2230 Local path to the file to upload. 2231 2232 Raises: 2233 ValueError: Raised if test_id or file_path are not provided. 2234 Exception: Raised if the query fails. 2235 2236 Returns: 2237 dict: The response from the GraphQL query, a completeTestResultUpload Object. 2238 """ 2239 if not test_id: 2240 raise ValueError("Test Id is required") 2241 if not file_path: 2242 raise ValueError("File Path is required") 2243 2244 # Gerneate Test Result Upload URL 2245 graphql_query = """ 2246 mutation GenerateTestResultUploadUrl_SDK($testId: ID!) { 2247 generateSinglePartUploadUrl(testId: $testId) { 2248 uploadUrl 2249 key 2250 } 2251 } 2252 """ 2253 2254 variables = { 2255 "testId": test_id 2256 } 2257 2258 response = send_graphql_query(token, organization_context, graphql_query, variables) 2259 2260 # get the upload URL and key 2261 upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl'] 2262 key = response['data']['generateSinglePartUploadUrl']['key'] 2263 2264 # upload the file 2265 upload_file_to_url(upload_url, file_path) 2266 2267 # complete the upload 2268 graphql_query = """ 2269 mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) { 2270 launchTestResultProcessing(key: $key, testId: $testId) { 2271 key 2272 } 2273 } 2274 """ 2275 2276 variables = { 2277 "testId": test_id, 2278 "key": key 2279 } 2280 2281 response = send_graphql_query(token, organization_context, graphql_query, variables) 2282 return response['data'] 2283 2284 2285def upload_bytes_to_url(url, bytes): 2286 """ 2287 Used for uploading a file to a pre-signed S3 URL 2288 2289 Args: 2290 url (str): 2291 (Pre-signed S3) URL 2292 bytes (bytes): 2293 Bytes to upload 2294 2295 Raises: 2296 Exception: If the response status code is not 200 2297 2298 Returns: 2299 requests.Response: Response object 2300 """ 2301 response = requests.put(url, data=bytes) 2302 2303 if response.status_code == 200: 2304 return response 2305 else: 2306 raise Exception(f"Error: {response.status_code} - {response.text}") 2307 2308 2309def upload_file_to_url(url, file_path): 2310 """ 2311 Used for uploading a file to a pre-signed S3 URL 2312 2313 Args: 2314 url (str): 2315 (Pre-signed S3) URL 2316 file_path (str): 2317 Local path to file to upload 2318 2319 Raises: 2320 Exception: If the response status code is not 200 2321 2322 Returns: 2323 requests.Response: Response object 2324 """ 2325 with open(file_path, 'rb') as file: 2326 response = requests.put(url, data=file) 2327 2328 if response.status_code == 200: 2329 return response 2330 else: 2331 raise Exception(f"Error: {response.status_code} - {response.text}")
DEFAULT CHUNK SIZE: 1000 MiB
MAX CHUNK SIZE: 2 GiB
MIN CHUNK SIZE: 5 MiB
34class UploadMethod(Enum): 35 """ 36 Enumeration class representing different upload methods. 37 38 Attributes: 39 WEB_APP_UI: Upload method via web application UI. 40 API: Upload method via API. 41 GITHUB_INTEGRATION: Upload method via GitHub integration. 42 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 43 44 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 45 """ 46 WEB_APP_UI = "WEB_APP_UI" 47 API = "API" 48 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 49 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
Enumeration class representing different upload methods.
Attributes:
- WEB_APP_UI: Upload method via web application UI.
- API: Upload method via API.
- GITHUB_INTEGRATION: Upload method via GitHub integration.
- AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
To use any value from this enumeration, use UploadMethod.
Inherited Members
- enum.Enum
- name
- value
52def create_artifact( 53 token, 54 organization_context, 55 business_unit_id=None, 56 created_by_user_id=None, 57 asset_version_id=None, 58 artifact_name=None, 59 product_id=None, 60): 61 """ 62 Create a new Artifact. 63 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 64 Please see the examples in the Github repository for more information: 65 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 66 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 67 68 Args: 69 token (str): 70 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 71 organization_context (str): 72 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 73 business_unit_id (str, required): 74 Business Unit ID to associate the artifact with. 75 created_by_user_id (str, required): 76 User ID of the user creating the artifact. 77 asset_version_id (str, required): 78 Asset Version ID to associate the artifact with. 79 artifact_name (str, required): 80 The name of the Artifact being created. 81 product_id (str, optional): 82 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 83 84 Raises: 85 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 86 Exception: Raised if the query fails. 87 88 Returns: 89 dict: createArtifact Object 90 """ 91 if not business_unit_id: 92 raise ValueError("Business unit ID is required") 93 if not created_by_user_id: 94 raise ValueError("Created by user ID is required") 95 if not asset_version_id: 96 raise ValueError("Asset version ID is required") 97 if not artifact_name: 98 raise ValueError("Artifact name is required") 99 100 graphql_query = """ 101 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 102 createArtifact(input: $input) { 103 id 104 name 105 assetVersion { 106 id 107 name 108 asset { 109 id 110 name 111 } 112 } 113 createdBy { 114 id 115 email 116 } 117 ctx { 118 asset 119 products 120 businessUnits 121 } 122 } 123 } 124 """ 125 126 # Asset name, business unit context, and creating user are required 127 variables = { 128 "input": { 129 "name": artifact_name, 130 "createdBy": created_by_user_id, 131 "assetVersion": asset_version_id, 132 "ctx": { 133 "asset": asset_version_id, 134 "businessUnits": [business_unit_id] 135 } 136 } 137 } 138 139 if product_id is not None: 140 variables["input"]["ctx"]["products"] = product_id 141 142 response = send_graphql_query(token, organization_context, graphql_query, variables) 143 return response['data']
Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the artifact with.
- created_by_user_id (str, required): User ID of the user creating the artifact.
- asset_version_id (str, required): Asset Version ID to associate the artifact with.
- artifact_name (str, required): The name of the Artifact being created.
- product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createArtifact Object
146def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 147 """ 148 Create a new Asset. 149 150 Args: 151 token (str): 152 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 153 organization_context (str): 154 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 155 business_unit_id (str, required): 156 Business Unit ID to associate the asset with. 157 created_by_user_id (str, required): 158 User ID of the user creating the asset. 159 asset_name (str, required): 160 The name of the Asset being created. 161 product_id (str, optional): 162 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 163 164 Raises: 165 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 166 Exception: Raised if the query fails. 167 168 Returns: 169 dict: createAsset Object 170 """ 171 if not business_unit_id: 172 raise ValueError("Business unit ID is required") 173 if not created_by_user_id: 174 raise ValueError("Created by user ID is required") 175 if not asset_name: 176 raise ValueError("Asset name is required") 177 178 graphql_query = """ 179 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 180 createAsset(input: $input) { 181 id 182 name 183 dependentProducts { 184 id 185 name 186 } 187 group { 188 id 189 name 190 } 191 createdBy { 192 id 193 email 194 } 195 ctx { 196 asset 197 products 198 businessUnits 199 } 200 } 201 } 202 """ 203 204 # Asset name, business unit context, and creating user are required 205 variables = { 206 "input": { 207 "name": asset_name, 208 "group": business_unit_id, 209 "createdBy": created_by_user_id, 210 "ctx": { 211 "businessUnits": [business_unit_id] 212 } 213 } 214 } 215 216 if product_id is not None: 217 variables["input"]["ctx"]["products"] = product_id 218 219 response = send_graphql_query(token, organization_context, graphql_query, variables) 220 return response['data']
Create a new Asset.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset with.
- created_by_user_id (str, required): User ID of the user creating the asset.
- asset_name (str, required): The name of the Asset being created.
- product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAsset Object
223def create_asset_version( 224 token, 225 organization_context, 226 business_unit_id=None, 227 created_by_user_id=None, 228 asset_id=None, 229 asset_version_name=None, 230 product_id=None, 231): 232 """ 233 Create a new Asset Version. 234 235 Args: 236 token (str): 237 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 238 organization_context (str): 239 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 240 business_unit_id (str, required): 241 Business Unit ID to associate the asset version with. 242 created_by_user_id (str, required): 243 User ID of the user creating the asset version. 244 asset_id (str, required): 245 Asset ID to associate the asset version with. 246 asset_version_name (str, required): 247 The name of the Asset Version being created. 248 product_id (str, optional): 249 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 250 251 Raises: 252 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 253 Exception: Raised if the query fails. 254 255 Returns: 256 dict: createAssetVersion Object 257 258 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 259 """ 260 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 261 if not business_unit_id: 262 raise ValueError("Business unit ID is required") 263 if not created_by_user_id: 264 raise ValueError("Created by user ID is required") 265 if not asset_id: 266 raise ValueError("Asset ID is required") 267 if not asset_version_name: 268 raise ValueError("Asset version name is required") 269 270 graphql_query = """ 271 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 272 createAssetVersion(input: $input) { 273 id 274 name 275 asset { 276 id 277 name 278 } 279 createdBy { 280 id 281 email 282 } 283 ctx { 284 asset 285 products 286 businessUnits 287 } 288 } 289 } 290 """ 291 292 # Asset name, business unit context, and creating user are required 293 variables = { 294 "input": { 295 "name": asset_version_name, 296 "createdBy": created_by_user_id, 297 "asset": asset_id, 298 "ctx": { 299 "asset": asset_id, 300 "businessUnits": [business_unit_id] 301 } 302 } 303 } 304 305 if product_id is not None: 306 variables["input"]["ctx"]["products"] = product_id 307 308 response = send_graphql_query(token, organization_context, graphql_query, variables) 309 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset version with.
- created_by_user_id (str, required): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
- product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
312def create_asset_version_on_asset( 313 token, 314 organization_context, 315 created_by_user_id=None, 316 asset_id=None, 317 asset_version_name=None, 318 product_id=None, 319): 320 """ 321 Create a new Asset Version. 322 323 Args: 324 token (str): 325 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 326 organization_context (str): 327 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 328 created_by_user_id (str, optional): 329 User ID of the user creating the asset version. 330 asset_id (str, required): 331 Asset ID to associate the asset version with. 332 asset_version_name (str, required): 333 The name of the Asset Version being created. 334 335 Raises: 336 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 337 Exception: Raised if the query fails. 338 339 Returns: 340 dict: createAssetVersion Object 341 """ 342 if not asset_id: 343 raise ValueError("Asset ID is required") 344 if not asset_version_name: 345 raise ValueError("Asset version name is required") 346 347 graphql_query = """ 348 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) { 349 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) { 350 id 351 assetVersion { 352 id 353 } 354 } 355 } 356 """ 357 358 # Asset name, business unit context, and creating user are required 359 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 360 361 if created_by_user_id: 362 variables["createdByUserId"] = created_by_user_id 363 364 if product_id: 365 variables["productId"] = product_id 366 367 response = send_graphql_query(token, organization_context, graphql_query, variables) 368 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- created_by_user_id (str, optional): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
371def create_new_asset_version_artifact_and_test_for_upload( 372 token, 373 organization_context, 374 business_unit_id=None, 375 created_by_user_id=None, 376 asset_id=None, 377 version=None, 378 product_id=None, 379 test_type=None, 380 artifact_description=None, 381 upload_method: UploadMethod = UploadMethod.API, 382): 383 """ 384 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 385 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 386 387 Args: 388 token (str): 389 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 390 organization_context (str): 391 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 392 business_unit_id (str, optional): 393 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 394 created_by_user_id (str, optional): 395 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 396 asset_id (str, required): 397 Asset ID to create the asset version for. If not provided, the default asset will be used. 398 version (str, required): 399 Version to create the asset version for. 400 product_id (str, optional): 401 Product ID to create the entities for. If not provided, the default product will be used. 402 test_type (str, required): 403 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 404 artifact_description (str, optional): 405 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 406 upload_method (UploadMethod, optional): 407 The method of uploading the test results. Default is UploadMethod.API. 408 409 410 Raises: 411 ValueError: Raised if asset_id or version are not provided. 412 Exception: Raised if the query fails. 413 414 Returns: 415 str: The Test ID of the newly created test that is used for uploading the file. 416 """ 417 if not asset_id: 418 raise ValueError("Asset ID is required") 419 if not version: 420 raise ValueError("Version is required") 421 422 assets = get_all_assets(token, organization_context, asset_id=asset_id) 423 if not assets: 424 raise ValueError("No assets found with the provided asset ID") 425 asset = assets[0] 426 427 # get the asset name 428 asset_name = asset['name'] 429 430 # get the existing asset product IDs 431 asset_product_ids = asset['ctx']['products'] 432 433 # get the asset product ID 434 if product_id and product_id not in asset_product_ids: 435 asset_product_ids.append(product_id) 436 437 # if business_unit_id or created_by_user_id are not provided, get the existing asset 438 if not business_unit_id or not created_by_user_id: 439 if not business_unit_id: 440 business_unit_id = asset['group']['id'] 441 if not created_by_user_id: 442 created_by_user_id = asset['createdBy']['id'] 443 444 if not business_unit_id: 445 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 446 if not created_by_user_id: 447 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 448 449 # create the asset version 450 response = create_asset_version_on_asset( 451 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id 452 ) 453 # get the asset version ID 454 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 455 456 # create the test 457 if test_type == "finite_state_binary_analysis": 458 # create the artifact 459 if not artifact_description: 460 artifact_description = "Binary" 461 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 462 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 463 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 464 artifact_name=binary_artifact_name, product_id=asset_product_ids) 465 466 # get the artifact ID 467 binary_artifact_id = response['createArtifact']['id'] 468 469 # create the test 470 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 471 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 472 created_by_user_id=created_by_user_id, asset_id=asset_id, 473 artifact_id=binary_artifact_id, product_id=asset_product_ids, 474 test_name=test_name, upload_method=upload_method) 475 test_id = response['createTest']['id'] 476 return test_id 477 478 else: 479 # create the artifact 480 if not artifact_description: 481 artifact_description = "Unspecified Artifact" 482 artifact_name = f"{asset_name} {version} - {artifact_description}" 483 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 484 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 485 artifact_name=artifact_name, product_id=asset_product_ids) 486 487 # get the artifact ID 488 binary_artifact_id = response['createArtifact']['id'] 489 490 # create the test 491 test_name = f"{asset_name} {version} - {test_type}" 492 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 493 created_by_user_id=created_by_user_id, asset_id=asset_id, 494 artifact_id=binary_artifact_id, product_id=asset_product_ids, 495 test_name=test_name, test_type=test_type, 496 upload_method=upload_method) 497 test_id = response['createTest']['id'] 498 return test_id
Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
- created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
- version (str, required): Version to create the asset version for.
- product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
- test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
- artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if asset_id or version are not provided.
- Exception: Raised if the query fails.
Returns:
str: The Test ID of the newly created test that is used for uploading the file.
501def create_new_asset_version_and_upload_binary( 502 token, 503 organization_context, 504 business_unit_id=None, 505 created_by_user_id=None, 506 asset_id=None, 507 version=None, 508 file_path=None, 509 product_id=None, 510 artifact_description=None, 511 quick_scan=False, 512 upload_method: UploadMethod = UploadMethod.API, 513 enable_bandit_scan: bool = False, 514): 515 """ 516 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 517 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 518 519 Args: 520 token (str): 521 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 522 organization_context (str): 523 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 524 business_unit_id (str, optional): 525 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 526 created_by_user_id (str, optional): 527 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 528 asset_id (str, required): 529 Asset ID to create the asset version for. 530 version (str, required): 531 Version to create the asset version for. 532 file_path (str, required): 533 Local path to the file to upload. 534 product_id (str, optional): 535 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 536 artifact_description (str, optional): 537 Description of the artifact. If not provided, the default is "Firmware Binary". 538 quick_scan (bool, optional): 539 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 540 upload_method (UploadMethod, optional): 541 The method of uploading the test results. Default is UploadMethod.API. 542 enable_bandit_scan (bool, optional): 543 If True, will create an additional bandit scan in addition to the default binary analysis scan. 544 545 Raises: 546 ValueError: Raised if asset_id, version, or file_path are not provided. 547 Exception: Raised if any of the queries fail. 548 549 Returns: 550 dict: The response from the GraphQL query, a createAssetVersion Object. 551 """ 552 if not asset_id: 553 raise ValueError("Asset ID is required") 554 if not version: 555 raise ValueError("Version is required") 556 if not file_path: 557 raise ValueError("File path is required") 558 559 # create the asset version and binary test 560 if not artifact_description: 561 artifact_description = "Firmware Binary" 562 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 563 token, 564 organization_context, 565 business_unit_id=business_unit_id, 566 created_by_user_id=created_by_user_id, 567 asset_id=asset_id, 568 version=version, 569 product_id=product_id, 570 test_type="finite_state_binary_analysis", 571 artifact_description=artifact_description, 572 upload_method=upload_method, 573 ) 574 575 # upload file for binary test 576 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 577 quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan) 578 return response
Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Local path to the file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
- artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
- quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
- enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
- ValueError: Raised if asset_id, version, or file_path are not provided.
- Exception: Raised if any of the queries fail.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
581def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 582 created_by_user_id=None, asset_id=None, version=None, 583 file_path=None, product_id=None, test_type=None, 584 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 585 """ 586 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 587 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 588 589 Args: 590 token (str): 591 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 592 organization_context (str): 593 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 594 business_unit_id (str, optional): 595 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 596 created_by_user_id (str, optional): 597 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 598 asset_id (str, required): 599 Asset ID to create the asset version for. 600 version (str, required): 601 Version to create the asset version for. 602 file_path (str, required): 603 Path to the test results file to upload. 604 product_id (str, optional): 605 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 606 test_type (str, required): 607 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 608 artifact_description (str, optional): 609 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 610 upload_method (UploadMethod, optional): 611 The method of uploading the test results. Default is UploadMethod.API. 612 613 Raises: 614 ValueError: If the asset_id, version, or file_path are not provided. 615 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 616 617 Returns: 618 dict: The response from the GraphQL query, a createAssetVersion Object. 619 """ 620 if not asset_id: 621 raise ValueError("Asset ID is required") 622 if not version: 623 raise ValueError("Version is required") 624 if not file_path: 625 raise ValueError("File path is required") 626 if not test_type: 627 raise ValueError("Test type is required") 628 629 # create the asset version and test 630 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 631 business_unit_id=business_unit_id, 632 created_by_user_id=created_by_user_id, 633 asset_id=asset_id, version=version, 634 product_id=product_id, test_type=test_type, 635 artifact_description=artifact_description, 636 upload_method=upload_method) 637 638 # upload test results file 639 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 640 return response
Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Path to the test results file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
- test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
- artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: If the asset_id, version, or file_path are not provided.
- Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
643def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 644 product_description=None, vendor_id=None, vendor_name=None): 645 """ 646 Create a new Product. 647 648 Args: 649 token (str): 650 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 651 organization_context (str): 652 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 653 business_unit_id (str, required): 654 Business Unit ID to associate the product with. 655 created_by_user_id (str, required): 656 User ID of the user creating the product. 657 product_name (str, required): 658 The name of the Product being created. 659 product_description (str, optional): 660 The description of the Product being created. 661 vendor_id (str, optional): 662 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 663 vendor_name (str, optional): 664 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 665 666 Raises: 667 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 668 Exception: Raised if the query fails. 669 670 Returns: 671 dict: createProduct Object 672 """ 673 674 if not business_unit_id: 675 raise ValueError("Business unit ID is required") 676 if not created_by_user_id: 677 raise ValueError("Created by user ID is required") 678 if not product_name: 679 raise ValueError("Product name is required") 680 681 graphql_query = """ 682 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 683 createProduct(input: $input) { 684 id 685 name 686 vendor { 687 name 688 } 689 group { 690 id 691 name 692 } 693 createdBy { 694 id 695 email 696 } 697 ctx { 698 businessUnit 699 } 700 } 701 } 702 """ 703 704 # Product name, business unit context, and creating user are required 705 variables = { 706 "input": { 707 "name": product_name, 708 "group": business_unit_id, 709 "createdBy": created_by_user_id, 710 "ctx": { 711 "businessUnit": business_unit_id 712 } 713 } 714 } 715 716 if product_description is not None: 717 variables["input"]["description"] = product_description 718 719 # If the vendor ID is specified, this will link the new product to the existing vendor 720 if vendor_id is not None: 721 variables["input"]["vendor"] = { 722 "id": vendor_id 723 } 724 725 # If the vendor name is specified, this will create a new vendor and link it to the new product 726 if vendor_name is not None: 727 variables["input"]["createVendor"] = { 728 "name": vendor_name 729 } 730 731 response = send_graphql_query(token, organization_context, graphql_query, variables) 732 733 return response['data']
Create a new Product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the product with.
- created_by_user_id (str, required): User ID of the user creating the product.
- product_name (str, required): The name of the Product being created.
- product_description (str, optional): The description of the Product being created.
- vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
- vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createProduct Object
736def create_test( 737 token, 738 organization_context, 739 business_unit_id=None, 740 created_by_user_id=None, 741 asset_id=None, 742 artifact_id=None, 743 test_name=None, 744 product_id=None, 745 test_type=None, 746 tools=[], 747 upload_method: UploadMethod = UploadMethod.API, 748): 749 """ 750 Create a new Test object for uploading files. 751 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 752 Please see the examples in the Github repository for more information: 753 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 754 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 755 756 Args: 757 token (str): 758 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 759 organization_context (str): 760 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 761 business_unit_id (str, required): 762 Business Unit ID to associate the Test with. 763 created_by_user_id (str, required): 764 User ID of the user creating the Test. 765 asset_id (str, required): 766 Asset ID to associate the Test with. 767 artifact_id (str, required): 768 Artifact ID to associate the Test with. 769 test_name (str, required): 770 The name of the Test being created. 771 product_id (str, optional): 772 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 773 test_type (str, required): 774 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 775 tools (list, optional): 776 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 777 upload_method (UploadMethod, required): 778 The method of uploading the test results. 779 780 Raises: 781 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 782 Exception: Raised if the query fails. 783 784 Returns: 785 dict: createTest Object 786 """ 787 if not business_unit_id: 788 raise ValueError("Business unit ID is required") 789 if not created_by_user_id: 790 raise ValueError("Created by user ID is required") 791 if not asset_id: 792 raise ValueError("Asset ID is required") 793 if not artifact_id: 794 raise ValueError("Artifact ID is required") 795 if not test_name: 796 raise ValueError("Test name is required") 797 if not test_type: 798 raise ValueError("Test type is required") 799 800 graphql_query = """ 801 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 802 createTest(input: $input) { 803 id 804 name 805 artifactUnderTest { 806 id 807 name 808 assetVersion { 809 id 810 name 811 asset { 812 id 813 name 814 dependentProducts { 815 id 816 name 817 } 818 } 819 } 820 } 821 createdBy { 822 id 823 email 824 } 825 ctx { 826 asset 827 products 828 businessUnits 829 } 830 uploadMethod 831 } 832 } 833 """ 834 835 # Asset name, business unit context, and creating user are required 836 variables = { 837 "input": { 838 "name": test_name, 839 "createdBy": created_by_user_id, 840 "artifactUnderTest": artifact_id, 841 "testResultFileFormat": test_type, 842 "ctx": { 843 "asset": asset_id, 844 "businessUnits": [business_unit_id] 845 }, 846 "tools": tools, 847 "uploadMethod": upload_method.value 848 } 849 } 850 851 if product_id is not None: 852 variables["input"]["ctx"]["products"] = product_id 853 854 response = send_graphql_query(token, organization_context, graphql_query, variables) 855 return response['data']
Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
- tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
- upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
858def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 859 asset_id=None, artifact_id=None, test_name=None, product_id=None, 860 upload_method: UploadMethod = UploadMethod.API): 861 """ 862 Create a new Test object for uploading files for Finite State Binary Analysis. 863 864 Args: 865 token (str): 866 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 867 organization_context (str): 868 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 869 business_unit_id (str, required): 870 Business Unit ID to associate the Test with. 871 created_by_user_id (str, required): 872 User ID of the user creating the Test. 873 asset_id (str, required): 874 Asset ID to associate the Test with. 875 artifact_id (str, required): 876 Artifact ID to associate the Test with. 877 test_name (str, required): 878 The name of the Test being created. 879 product_id (str, optional): 880 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 881 upload_method (UploadMethod, optional): 882 The method of uploading the test results. Default is UploadMethod.API. 883 884 Raises: 885 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 886 Exception: Raised if the query fails. 887 888 Returns: 889 dict: createTest Object 890 """ 891 tools = [ 892 { 893 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 894 "name": "Finite State Binary Analysis" 895 } 896 ] 897 return create_test(token, organization_context, business_unit_id=business_unit_id, 898 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 899 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 900 tools=tools, upload_method=upload_method)
Create a new Test object for uploading files for Finite State Binary Analysis.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
903def create_test_as_cyclone_dx( 904 token, 905 organization_context, 906 business_unit_id=None, 907 created_by_user_id=None, 908 asset_id=None, 909 artifact_id=None, 910 test_name=None, 911 product_id=None, 912 upload_method: UploadMethod = UploadMethod.API, 913): 914 """ 915 Create a new Test object for uploading CycloneDX files. 916 917 Args: 918 token (str): 919 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 920 organization_context (str): 921 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 922 business_unit_id (str, required): 923 Business Unit ID to associate the Test with. 924 created_by_user_id (str, required): 925 User ID of the user creating the Test. 926 asset_id (str, required): 927 Asset ID to associate the Test with. 928 artifact_id (str, required): 929 Artifact ID to associate the Test with. 930 test_name (str, required): 931 The name of the Test being created. 932 product_id (str, optional): 933 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 934 upload_method (UploadMethod, optional): 935 The method of uploading the test results. Default is UploadMethod.API. 936 937 Raises: 938 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 939 Exception: Raised if the query fails. 940 941 Returns: 942 dict: createTest Object 943 """ 944 return create_test(token, organization_context, business_unit_id=business_unit_id, 945 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 946 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
Create a new Test object for uploading CycloneDX files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
949def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 950 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 951 upload_method: UploadMethod = UploadMethod.API): 952 """ 953 Create a new Test object for uploading Third Party Scanner files. 954 955 Args: 956 token (str): 957 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 958 organization_context (str): 959 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 960 business_unit_id (str, required): 961 Business Unit ID to associate the Test with. 962 created_by_user_id (str, required): 963 User ID of the user creating the Test. 964 asset_id (str, required): 965 Asset ID to associate the Test with. 966 artifact_id (str, required): 967 Artifact ID to associate the Test with. 968 test_name (str, required): 969 The name of the Test being created. 970 product_id (str, optional): 971 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 972 test_type (str, required): 973 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 974 upload_method (UploadMethod, optional): 975 The method of uploading the test results. Default is UploadMethod.API. 976 977 Raises: 978 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 979 Exception: Raised if the query fails. 980 981 Returns: 982 dict: createTest Object 983 """ 984 return create_test(token, organization_context, business_unit_id=business_unit_id, 985 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 986 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
Create a new Test object for uploading Third Party Scanner files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
989def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 990 report_subtype=None, output_filename=None, verbose=False): 991 """ 992 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 993 994 Args: 995 token (str): 996 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 997 organization_context (str): 998 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 999 asset_version_id (str, required): 1000 The Asset Version ID to download the report for. 1001 report_type (str, required): 1002 The file type of the report to download. Valid values are "CSV" and "PDF". 1003 report_subtype (str, required): 1004 The type of report to download. Based on available reports for the `report_type` specified 1005 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1006 Valid values for PDF are "RISK_SUMMARY". 1007 output_filename (str, optional): 1008 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1009 verbose (bool, optional): 1010 If True, will print additional information to the console. Defaults to False. 1011 1012 Raises: 1013 ValueError: Raised if required parameters are not provided. 1014 Exception: Raised if the query fails. 1015 1016 Returns: 1017 None 1018 """ 1019 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1020 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1021 1022 # Send an HTTP GET request to the URL 1023 response = requests.get(url) 1024 1025 # Check if the request was successful (status code 200) 1026 if response.status_code == 200: 1027 # Open a local file in binary write mode and write the content to it 1028 if verbose: 1029 print("File downloaded successfully.") 1030 with open(output_filename, 'wb') as file: 1031 file.write(response.content) 1032 if verbose: 1033 print(f'Wrote file to {output_filename}') 1034 else: 1035 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, required): The Asset Version ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1038def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1039 output_filename=None, verbose=False): 1040 """ 1041 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1042 1043 Args: 1044 token (str): 1045 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1046 organization_context (str): 1047 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1048 product_id (str, required): 1049 The Product ID to download the report for. 1050 report_type (str, required): 1051 The file type of the report to download. Valid values are "CSV". 1052 report_subtype (str, required): 1053 The type of report to download. Based on available reports for the `report_type` specified 1054 Valid values for CSV are "ALL_FINDINGS". 1055 output_filename (str, optional): 1056 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1057 verbose (bool, optional): 1058 If True, will print additional information to the console. Defaults to False. 1059 """ 1060 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1061 report_subtype=report_subtype, verbose=verbose) 1062 1063 # Send an HTTP GET request to the URL 1064 response = requests.get(url) 1065 1066 # Check if the request was successful (status code 200) 1067 if response.status_code == 200: 1068 # Open a local file in binary write mode and write the content to it 1069 if verbose: 1070 print("File downloaded successfully.") 1071 with open(output_filename, 'wb') as file: 1072 file.write(response.content) 1073 if verbose: 1074 print(f'Wrote file to {output_filename}') 1075 else: 1076 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, required): The Product ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
1079def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1080 output_filename="sbom.json", verbose=False): 1081 """ 1082 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1083 1084 Args: 1085 token (str): 1086 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1087 organization_context (str): 1088 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1089 sbom_type (str, required): 1090 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1091 sbom_subtype (str, required): 1092 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1093 asset_version_id (str, required): 1094 The Asset Version ID to download the SBOM for. 1095 output_filename (str, required): 1096 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1097 verbose (bool, optional): 1098 If True, will print additional information to the console. Defaults to False. 1099 1100 Raises: 1101 ValueError: Raised if required parameters are not provided. 1102 Exception: Raised if the query fails. 1103 1104 Returns: 1105 None 1106 """ 1107 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1108 asset_version_id=asset_version_id, verbose=verbose) 1109 1110 # Send an HTTP GET request to the URL 1111 response = requests.get(url) 1112 1113 # Check if the request was successful (status code 200) 1114 if response.status_code == 200: 1115 # Open a local file in binary write mode and write the content to it 1116 if verbose: 1117 print("File downloaded successfully.") 1118 with open(output_filename, 'wb') as file: 1119 file.write(response.content) 1120 if verbose: 1121 print(f'Wrote file to {output_filename}') 1122 else: 1123 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
- asset_version_id (str, required): The Asset Version ID to download the SBOM for.
- output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1126def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1127 """ 1128 Helper method to read a file in chunks. 1129 1130 Args: 1131 file_path (str): 1132 Local path to the file to read. 1133 chunk_size (int, optional): 1134 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1135 1136 Yields: 1137 bytes: The next chunk of the file. 1138 1139 Raises: 1140 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1141 """ 1142 with open(file_path, 'rb') as f: 1143 while True: 1144 chunk = f.read(chunk_size) 1145 if chunk: 1146 yield chunk 1147 else: 1148 break
Helper method to read a file in chunks.
Arguments:
- file_path (str): Local path to the file to read.
- chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:
bytes: The next chunk of the file.
Raises:
- FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1151def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1152 """ 1153 Get all artifacts in the organization. Uses pagination to get all results. 1154 1155 Args: 1156 token (str): 1157 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1158 organization_context (str): 1159 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1160 artifact_id (str, optional): 1161 An optional Artifact ID if this is used to get a single artifact, by default None 1162 business_unit_id (str, optional): 1163 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1164 1165 Raises: 1166 Exception: Raised if the query fails. 1167 1168 Returns: 1169 list: List of Artifact Objects 1170 """ 1171 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1172 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
Get all artifacts in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
- business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Artifact Objects
1175def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1176 """ 1177 Gets all assets in the organization. Uses pagination to get all results. 1178 1179 Args: 1180 token (str): 1181 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1182 organization_context (str): 1183 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1184 asset_id (str, optional): 1185 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1186 business_unit_id (str, optional): 1187 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1188 1189 Raises: 1190 Exception: Raised if the query fails. 1191 1192 Returns: 1193 list: List of Asset Objects 1194 """ 1195 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1196 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets all assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1199def get_all_asset_versions(token, organization_context): 1200 """ 1201 Get all asset versions in the organization. Uses pagination to get all results. 1202 1203 Args: 1204 token (str): 1205 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1206 organization_context (str): 1207 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1208 1209 Raises: 1210 Exception: Raised if the query fails. 1211 1212 Returns: 1213 list: List of AssetVersion Objects 1214 """ 1215 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1216 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
Get all asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1219def get_all_asset_versions_for_product(token, organization_context, product_id): 1220 """ 1221 Get all asset versions for a product. Uses pagination to get all results. 1222 1223 Args: 1224 token (str): 1225 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1226 organization_context (str): 1227 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1228 product_id (str): 1229 The Product ID to get asset versions for 1230 1231 Returns: 1232 list: List of AssetVersion Objects 1233 """ 1234 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1235 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Get all asset versions for a product. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str): The Product ID to get asset versions for
Returns:
list: List of AssetVersion Objects
1238def get_all_business_units(token, organization_context): 1239 """ 1240 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1241 1242 Args: 1243 token (str): 1244 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1245 organization_context (str): 1246 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1247 1248 Raises: 1249 Exception: Raised if the query fails. 1250 1251 Returns: 1252 list: List of Group Objects 1253 """ 1254 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1255 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Group Objects
1258def get_all_organizations(token, organization_context): 1259 """ 1260 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1261 1262 Args: 1263 token (str): 1264 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1265 organization_context (str): 1266 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1267 1268 Raises: 1269 Exception: Raised if the query fails. 1270 1271 Returns: 1272 list: List of Organization Objects 1273 """ 1274 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1275 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Organization Objects
1278def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1279 """ 1280 Get all results from a paginated GraphQL query 1281 1282 Args: 1283 token (str): 1284 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1285 organization_context (str): 1286 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1287 query (str): 1288 The GraphQL query string 1289 variables (dict, optional): 1290 Variables to be used in the GraphQL query, by default None 1291 field (str, required): 1292 The field in the response JSON that contains the results 1293 limit (int, Optional): 1294 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1295 1296 Raises: 1297 Exception: If the response status code is not 200, or if the field is not in the response JSON 1298 1299 Returns: 1300 list: List of results 1301 """ 1302 1303 if not field: 1304 raise Exception("Error: field is required") 1305 if limit and limit > 1000: 1306 raise Exception("Error: limit cannot be greater than 1000") 1307 if limit and limit < 1: 1308 raise Exception("Error: limit cannot be less than 1") 1309 if not variables["first"]: 1310 raise Exception("Error: first is required") 1311 if variables["first"] < 1: 1312 raise Exception("Error: first cannot be less than 1") 1313 if variables["first"] > 1000: 1314 raise Exception("Error: limit cannot be greater than 1000") 1315 1316 # query the API for the first page of results 1317 response_data = send_graphql_query(token, organization_context, query, variables) 1318 1319 # if there are no results, return an empty list 1320 if not response_data: 1321 return [] 1322 1323 # create a list to store the results 1324 results = [] 1325 1326 # add the first page of results to the list 1327 if field in response_data['data']: 1328 results.extend(response_data['data'][field]) 1329 else: 1330 raise Exception(f"Error: {field} not in response JSON") 1331 1332 if len(response_data['data'][field]) > 0: 1333 # get the cursor from the last entry in the list 1334 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1335 1336 while cursor: 1337 if limit and len(results) == limit: 1338 break 1339 1340 variables['after'] = cursor 1341 1342 # add the next page of results to the list 1343 response_data = send_graphql_query(token, organization_context, query, variables) 1344 results.extend(response_data['data'][field]) 1345 1346 try: 1347 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1348 except IndexError: 1349 # when there is no additional cursor, stop getting more pages 1350 cursor = None 1351 1352 return results
Get all results from a paginated GraphQL query
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
- field (str, required): The field in the response JSON that contains the results
- limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
- Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:
list: List of results
1355def get_all_products(token, organization_context): 1356 """ 1357 Get all products in the organization. Uses pagination to get all results. 1358 1359 Args: 1360 token (str): 1361 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1362 organization_context (str): 1363 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1364 1365 Raises: 1366 Exception: Raised if the query fails. 1367 1368 Returns: 1369 list: List of Product Objects 1370 1371 .. deprecated:: 0.1.4. Use get_products instead. 1372 """ 1373 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1374 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1375 queries.ALL_PRODUCTS['variables'], 'allProducts')
Get all products in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Product Objects
Deprecated since version 0.1.4. Use get_products instead..
1378def get_all_users(token, organization_context): 1379 """ 1380 Get all users in the organization. Uses pagination to get all results. 1381 1382 Args: 1383 token (str): 1384 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1385 organization_context (str): 1386 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1387 1388 Raises: 1389 Exception: Raised if the query fails. 1390 1391 Returns: 1392 list: List of User Objects 1393 """ 1394 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1395 queries.ALL_USERS['variables'], 'allUsers')
Get all users in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of User Objects
1398def get_artifact_context(token, organization_context, artifact_id): 1399 """ 1400 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1401 1402 Args: 1403 token (str): 1404 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1405 organization_context (str): 1406 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1407 1408 Raises: 1409 Exception: Raised if the query fails. 1410 1411 Returns: 1412 dict: Artifact Context Object 1413 """ 1414 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1415 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1416 1417 return artifact[0]['ctx']
Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
dict: Artifact Context Object
1420def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1421 """ 1422 Gets assets in the organization. Uses pagination to get all results. 1423 1424 Args: 1425 token (str): 1426 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1427 organization_context (str): 1428 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1429 asset_id (str, optional): 1430 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1431 business_unit_id (str, optional): 1432 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1433 1434 Raises: 1435 Exception: Raised if the query fails. 1436 1437 Returns: 1438 list: List of Asset Objects 1439 """ 1440 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1441 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1444def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1445 """ 1446 Gets asset versions in the organization. Uses pagination to get all results. 1447 1448 Args: 1449 token (str): 1450 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1451 organization_context (str): 1452 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1453 asset_version_id (str, optional): 1454 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1455 asset_id (str, optional): 1456 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1457 business_unit_id (str, optional): 1458 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1459 1460 Raises: 1461 Exception: Raised if the query fails. 1462 1463 Returns: 1464 list: List of AssetVersion Objects 1465 """ 1466 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1467 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1468 asset_id=asset_id, 1469 business_unit_id=business_unit_id), 1470 'allAssetVersions')
Gets asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
- asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1473def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1474 """ 1475 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1476 1477 Args: 1478 client_id (str): 1479 CLIENT_ID as specified in the API documentation 1480 client_secret (str): 1481 CLIENT_SECRET as specified in the API documentation 1482 token_url (str, optional): 1483 Token URL, by default TOKEN_URL 1484 audience (str, optional): 1485 Audience, by default AUDIENCE 1486 1487 Raises: 1488 Exception: If the response status code is not 200 1489 1490 Returns: 1491 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1492 """ 1493 payload = { 1494 "client_id": client_id, 1495 "client_secret": client_secret, 1496 "audience": AUDIENCE, 1497 "grant_type": "client_credentials" 1498 } 1499 1500 headers = { 1501 'content-type': "application/json" 1502 } 1503 1504 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1505 if response.status_code == 200: 1506 auth_token = response.json()['access_token'] 1507 else: 1508 raise Exception(f"Error: {response.status_code} - {response.text}") 1509 1510 return auth_token
Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
Arguments:
- client_id (str): CLIENT_ID as specified in the API documentation
- client_secret (str): CLIENT_SECRET as specified in the API documentation
- token_url (str, optional): Token URL, by default TOKEN_URL
- audience (str, optional): Audience, by default AUDIENCE
Raises:
- Exception: If the response status code is not 200
Returns:
str: Auth token. Use this token as the Authorization header in subsequent API calls.
1513def get_findings( 1514 token, 1515 organization_context, 1516 asset_version_id=None, 1517 finding_id=None, 1518 category=None, 1519 status=None, 1520 severity=None, 1521 count=False, 1522 limit=None, 1523): 1524 """ 1525 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1526 Args: 1527 token (str): 1528 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1529 organization_context (str): 1530 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1531 asset_version_id (str, optional): 1532 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1533 finding_id (str, optional): 1534 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1535 category (str, optional): 1536 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1537 This can be a single string, or an array of values. 1538 status (str, optional): 1539 The status of Findings to return. 1540 severity (str, optional): 1541 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1542 count (bool, optional): 1543 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1544 limit (int, optional): 1545 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1546 1547 Raises: 1548 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1549 1550 Returns: 1551 list: List of Finding Objects 1552 """ 1553 1554 if limit and limit > 1000: 1555 raise Exception("Error: limit must be less than 1000") 1556 if limit and limit < 1: 1557 raise Exception("Error: limit must be greater than 0") 1558 1559 if count: 1560 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1561 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1562 finding_id=finding_id, category=category, 1563 status=status, severity=severity, 1564 limit=limit))["data"]["_allFindingsMeta"] 1565 else: 1566 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1567 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1568 finding_id=finding_id, category=category, 1569 status=status, severity=severity, 1570 limit=limit), 'allFindings', limit=limit)
Gets all the Findings for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
- finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
- category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
- status (str, optional): The status of Findings to return.
- severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
- count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
- limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Finding Objects
1573def get_product_asset_versions(token, organization_context, product_id=None): 1574 """ 1575 Gets all the asset versions for a product. 1576 Args: 1577 token (str): 1578 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1579 organization_context (str): 1580 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1581 product_id (str, optional): 1582 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1583 Raises: 1584 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1585 Returns: 1586 list: List of AssetVersion Objects 1587 """ 1588 if not product_id: 1589 raise Exception("Product ID is required") 1590 1591 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1592 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Gets all the asset versions for a product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of AssetVersion Objects
1595def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1596 """ 1597 Gets all the products for the specified business unit. 1598 Args: 1599 token (str): 1600 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1601 organization_context (str): 1602 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1603 product_id (str, optional): 1604 Product ID to get. If not provided, will get all products in the organization. 1605 business_unit_id (str, optional): 1606 Business Unit ID to get products for. If not provided, will get all products in the organization. 1607 Raises: 1608 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1609 Returns: 1610 list: List of Product Objects 1611 """ 1612 1613 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1614 queries.GET_PRODUCTS['variables'](product_id=product_id, 1615 business_unit_id=business_unit_id), 1616 'allProducts')
Gets all the products for the specified business unit.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
- business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Product Objects
1619def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1620 report_subtype=None, verbose=False) -> str: 1621 """ 1622 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1623 This may take several minutes to complete, depending on the size of the report. 1624 1625 Args: 1626 token (str): 1627 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1628 organization_context (str): 1629 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1630 asset_version_id (str, optional): 1631 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1632 product_id (str, optional): 1633 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1634 report_type (str, required): 1635 The file type of the report to download. Valid values are "CSV" and "PDF". 1636 report_subtype (str, required): 1637 The type of report to download. Based on available reports for the `report_type` specified 1638 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1639 Valid values for PDF are "RISK_SUMMARY". 1640 verbose (bool, optional): 1641 If True, print additional information to the console. Defaults to False. 1642 """ 1643 if not report_type: 1644 raise ValueError("Report Type is required") 1645 if not report_subtype: 1646 raise ValueError("Report Subtype is required") 1647 if not asset_version_id and not product_id: 1648 raise ValueError("Asset Version ID or Product ID is required") 1649 1650 if asset_version_id and product_id: 1651 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1652 1653 if report_type not in ["CSV", "PDF"]: 1654 raise Exception(f"Report Type {report_type} not supported") 1655 1656 if report_type == "CSV": 1657 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1658 raise Exception(f"Report Subtype {report_subtype} not supported") 1659 1660 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1661 report_type=report_type, report_subtype=report_subtype) 1662 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1663 report_type=report_type, report_subtype=report_subtype) 1664 1665 response_data = send_graphql_query(token, organization_context, mutation, variables) 1666 if verbose: 1667 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1668 1669 # get exportJobId from the result 1670 if asset_version_id: 1671 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1672 elif product_id: 1673 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1674 else: 1675 raise Exception( 1676 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1677 1678 if verbose: 1679 print(f'Export Job ID: {export_job_id}') 1680 1681 if report_type == "PDF": 1682 if report_subtype not in ["RISK_SUMMARY"]: 1683 raise Exception(f"Report Subtype {report_subtype} not supported") 1684 1685 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1686 report_type=report_type, report_subtype=report_subtype) 1687 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1688 report_type=report_type, report_subtype=report_subtype) 1689 1690 response_data = send_graphql_query(token, organization_context, mutation, variables) 1691 if verbose: 1692 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1693 1694 # get exportJobId from the result 1695 if asset_version_id: 1696 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1697 elif product_id: 1698 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1699 else: 1700 raise Exception( 1701 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1702 1703 if verbose: 1704 print(f'Export Job ID: {export_job_id}') 1705 1706 if not export_job_id: 1707 raise Exception( 1708 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1709 1710 # poll the API until the export job is complete 1711 sleep_time = 10 1712 total_time = 0 1713 if verbose: 1714 print(f'Polling every {sleep_time} seconds for export job to complete') 1715 1716 while True: 1717 time.sleep(sleep_time) 1718 total_time += sleep_time 1719 if verbose: 1720 print(f'Total time elapsed: {total_time} seconds') 1721 1722 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1723 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1724 1725 response_data = send_graphql_query(token, organization_context, query, variables) 1726 1727 if verbose: 1728 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1729 1730 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1731 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1732 if verbose: 1733 print( 1734 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1735 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to download the report for. Either
asset_version_id
orproduct_id
are required. - product_id (str, optional): Product ID to download the report for. Either
asset_version_id
orproduct_id
are required. - report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - verbose (bool, optional): If True, print additional information to the console. Defaults to False.
1738def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1739 verbose=False) -> str: 1740 """ 1741 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1742 This may take several minutes to complete, depending on the size of SBOM. 1743 1744 Args: 1745 token (str): 1746 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1747 organization_context (str): 1748 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1749 sbom_type (str, required): 1750 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1751 sbom_subtype (str, required): 1752 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1753 asset_version_id (str, required): 1754 Asset Version ID to download the SBOM for. 1755 verbose (bool, optional): 1756 If True, print additional information to the console. Defaults to False. 1757 1758 Raises: 1759 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1760 Exception: Raised if the query fails. 1761 1762 Returns: 1763 str: URL to download the SBOM from. 1764 """ 1765 1766 if not sbom_type: 1767 raise ValueError("SBOM Type is required") 1768 if not sbom_subtype: 1769 raise ValueError("SBOM Subtype is required") 1770 if not asset_version_id: 1771 raise ValueError("Asset Version ID is required") 1772 1773 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1774 raise Exception(f"SBOM Type {sbom_type} not supported") 1775 1776 if sbom_type == "CYCLONEDX": 1777 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1778 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1779 1780 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1781 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1782 1783 response_data = send_graphql_query(token, organization_context, mutation, variables) 1784 if verbose: 1785 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1786 1787 # get exportJobId from the result 1788 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1789 if verbose: 1790 print(f'Export Job ID: {export_job_id}') 1791 1792 if sbom_type == "SPDX": 1793 if sbom_subtype not in ["SBOM_ONLY"]: 1794 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1795 1796 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1797 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1798 1799 response_data = send_graphql_query(token, organization_context, mutation, variables) 1800 if verbose: 1801 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1802 1803 # get exportJobId from the result 1804 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1805 if verbose: 1806 print(f'Export Job ID: {export_job_id}') 1807 1808 if not export_job_id: 1809 raise Exception( 1810 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1811 1812 # poll the API until the export job is complete 1813 sleep_time = 10 1814 total_time = 0 1815 if verbose: 1816 print(f'Polling every {sleep_time} seconds for export job to complete') 1817 while True: 1818 time.sleep(sleep_time) 1819 total_time += sleep_time 1820 if verbose: 1821 print(f'Total time elapsed: {total_time} seconds') 1822 1823 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1824 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1825 1826 response_data = send_graphql_query(token, organization_context, query, variables) 1827 1828 if verbose: 1829 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1830 1831 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1832 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1833 if verbose: 1834 print( 1835 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1836 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
- asset_version_id (str, required): Asset Version ID to download the SBOM for.
- verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
- Exception: Raised if the query fails.
Returns:
str: URL to download the SBOM from.
1839def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1840 """ 1841 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1842 Args: 1843 token (str): 1844 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1845 organization_context (str): 1846 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1847 asset_version_id (str, optional): 1848 Asset Version ID to get software components for. 1849 type (str, optional): 1850 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1851 Raises: 1852 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1853 Returns: 1854 list: List of Software Component Objects 1855 """ 1856 if not asset_version_id: 1857 raise Exception("Asset Version ID is required") 1858 1859 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1860 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1861 type=type), 1862 'allSoftwareComponentInstances')
Gets all the Software Components for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get software components for.
- type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Software Component Objects
1865def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1866 case_sensitive=False) -> list: 1867 """ 1868 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1869 Search Methods: EXACT or CONTAINS 1870 An exact match will return only the software component whose name matches the name exactly. 1871 A contains match will return all software components whose name contains the search string. 1872 1873 Args: 1874 token (str): 1875 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1876 organization_context (str): 1877 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1878 name (str, required): 1879 Name of the software component to search for. 1880 version (str, optional): 1881 Version of the software component to search for. If not specified, will search for all versions of the software component. 1882 asset_version_id (str, optional): 1883 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1884 search_method (str, optional): 1885 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1886 case_sensitive (bool, optional): 1887 Whether or not to perform a case sensitive search. Defaults to False. 1888 Raises: 1889 ValueError: Raised if name is not provided. 1890 Exception: Raised if the query fails. 1891 Returns: 1892 list: List of SoftwareComponentInstance Objects 1893 """ 1894 if asset_version_id: 1895 query = """ 1896query GetSoftwareComponentInstances_SDK( 1897 $filter: SoftwareComponentInstanceFilter 1898 $after: String 1899 $first: Int 1900) { 1901 allSoftwareComponentInstances( 1902 filter: $filter 1903 after: $after 1904 first: $first 1905 ) { 1906 _cursor 1907 id 1908 name 1909 version 1910 originalComponents { 1911 id 1912 name 1913 version 1914 } 1915 } 1916} 1917""" 1918 else: 1919 # gets the asset version info that contains the software component 1920 query = """ 1921query GetSoftwareComponentInstances_SDK( 1922 $filter: SoftwareComponentInstanceFilter 1923 $after: String 1924 $first: Int 1925) { 1926 allSoftwareComponentInstances( 1927 filter: $filter 1928 after: $after 1929 first: $first 1930 ) { 1931 _cursor 1932 id 1933 name 1934 version 1935 assetVersion { 1936 id 1937 name 1938 asset { 1939 id 1940 name 1941 } 1942 } 1943 } 1944} 1945""" 1946 1947 variables = { 1948 "filter": { 1949 "mergedComponentRefId": None 1950 }, 1951 "after": None, 1952 "first": 100 1953 } 1954 1955 if asset_version_id: 1956 variables["filter"]["assetVersionRefId"] = asset_version_id 1957 1958 if search_method == 'EXACT': 1959 if case_sensitive: 1960 variables["filter"]["name"] = name 1961 else: 1962 variables["filter"]["name_like"] = name 1963 elif search_method == 'CONTAINS': 1964 variables["filter"]["name_contains"] = name 1965 1966 if version: 1967 if search_method == 'EXACT': 1968 variables["filter"]["version"] = version 1969 elif search_method == 'CONTAINS': 1970 variables["filter"]["version_contains"] = version 1971 1972 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1973 field="allSoftwareComponentInstances") 1974 1975 return records
Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- name (str, required): Name of the software component to search for.
- version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
- asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
- search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
- case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
- ValueError: Raised if name is not provided.
- Exception: Raised if the query fails.
Returns:
list: List of SoftwareComponentInstance Objects
1978@retry(stop=stop_after_attempt(5), wait=wait_fixed(5), retry=retry_if_exception(is_not_breakout_exception)) 1979def send_graphql_query(token, organization_context, query, variables=None): 1980 """ 1981 Send a GraphQL query to the API 1982 1983 Args: 1984 token (str): 1985 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1986 organization_context (str): 1987 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1988 query (str): 1989 The GraphQL query string 1990 variables (dict, optional): 1991 Variables to be used in the GraphQL query, by default None 1992 1993 Raises: 1994 Exception: If the response status code is not 200 1995 1996 Returns: 1997 dict: Response JSON 1998 """ 1999 headers = { 2000 "Content-Type": "application/json", 2001 "Authorization": f"Bearer {token}", 2002 "Organization-Context": organization_context, 2003 } 2004 data = {"query": query, "variables": variables} 2005 2006 response = requests.post(API_URL, headers=headers, json=data) 2007 if response.status_code == 200: 2008 thejson = response.json() 2009 2010 if "errors" in thejson: 2011 # Raise a BreakoutException for GraphQL errors 2012 raise BreakoutException(f"Error: {thejson['errors']}") 2013 2014 return thejson 2015 else: 2016 is_mutation_operation = is_mutation(query) 2017 if is_mutation_operation: 2018 raise BreakoutException(f"Error: {response.status_code} - {response.text}") 2019 else: 2020 raise Exception(f"Error: {response.status_code} - {response.text}")
Send a GraphQL query to the API
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
- Exception: If the response status code is not 200
Returns:
dict: Response JSON
2023def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2024 justification=None, response=None, comment=None): 2025 """ 2026 Updates the status of a findings or multiple findings. This is a blocking call. 2027 2028 Args: 2029 token (str): 2030 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2031 organization_context (str): 2032 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2033 user_id (str, required): 2034 User ID to update the finding status for. 2035 finding_ids (str, required): 2036 Finding ID to update the status for. 2037 status (str, required): 2038 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2039 justification (str, optional): 2040 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2041 response (str, optional): 2042 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2043 comment (str, optional): 2044 Optional comment to add to the finding status update. 2045 2046 Raises: 2047 ValueError: Raised if required parameters are not provided. 2048 Exception: Raised if the query fails. 2049 2050 Returns: 2051 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2052 """ 2053 if not user_id: 2054 raise ValueError("User Id is required") 2055 if not finding_ids: 2056 raise ValueError("Finding Ids is required") 2057 if not status: 2058 raise ValueError("Status is required") 2059 2060 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2061 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2062 justification=justification, response=response, 2063 comment=comment) 2064 2065 return send_graphql_query(token, organization_context, mutation, variables)
Updates the status of a findings or multiple findings. This is a blocking call.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- user_id (str, required): User ID to update the finding status for.
- finding_ids (str, required): Finding ID to update the status for.
- status (str, required): Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
- justification (str, optional): Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
- response (str, optional): Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
- comment (str, optional): Optional comment to add to the finding status update.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2068def upload_file_for_binary_analysis( 2069 token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False 2070): 2071 """ 2072 Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. 2073 NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that. 2074 2075 Args: 2076 token (str): 2077 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2078 organization_context (str): 2079 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2080 test_id (str, required): 2081 Test ID to upload the file for. 2082 file_path (str, required): 2083 Local path to the file to upload. 2084 chunk_size (int, optional): 2085 The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB. 2086 quick_scan (bool, optional): 2087 If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation. 2088 enable_bandit_scan (bool, optional): 2089 If True, will create an additional bandit scan in addition to the default binary analysis scan. 2090 2091 Raises: 2092 ValueError: Raised if test_id or file_path are not provided. 2093 Exception: Raised if the query fails. 2094 2095 Returns: 2096 dict: The response from the GraphQL query, a completeMultipartUpload Object. 2097 """ 2098 # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation 2099 if not test_id: 2100 raise ValueError("Test Id is required") 2101 if not file_path: 2102 raise ValueError("File Path is required") 2103 if chunk_size < MIN_CHUNK_SIZE: 2104 raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes") 2105 if chunk_size >= MAX_CHUNK_SIZE: 2106 raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes") 2107 2108 # Start Multi-part Upload 2109 graphql_query = """ 2110 mutation Start_SDK($testId: ID!) { 2111 startMultipartUploadV2(testId: $testId) { 2112 uploadId 2113 key 2114 } 2115 } 2116 """ 2117 2118 variables = { 2119 "testId": test_id 2120 } 2121 2122 response = send_graphql_query(token, organization_context, graphql_query, variables) 2123 2124 upload_id = response['data']['startMultipartUploadV2']['uploadId'] 2125 upload_key = response['data']['startMultipartUploadV2']['key'] 2126 2127 # if the file is greater than max chunk size (or 5 GB), split the file in chunks, 2128 # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part) 2129 # and upload the file to the returned upload URL 2130 i = 0 2131 part_data = [] 2132 for chunk in file_chunks(file_path, chunk_size): 2133 i = i + 1 2134 graphql_query = """ 2135 mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) { 2136 generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) { 2137 key 2138 uploadUrl 2139 } 2140 } 2141 """ 2142 2143 variables = { 2144 "partNumber": i, 2145 "uploadId": upload_id, 2146 "uploadKey": upload_key 2147 } 2148 2149 response = send_graphql_query(token, organization_context, graphql_query, variables) 2150 2151 chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl'] 2152 2153 # upload the chunk to the upload URL 2154 response = upload_bytes_to_url(chunk_upload_url, chunk) 2155 2156 part_data.append({ 2157 "ETag": response.headers['ETag'], 2158 "PartNumber": i 2159 }) 2160 2161 # call completeMultipartUploadV2 2162 graphql_query = """ 2163 mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) { 2164 completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) { 2165 key 2166 } 2167 } 2168 """ 2169 2170 variables = { 2171 "partData": part_data, 2172 "uploadId": upload_id, 2173 "uploadKey": upload_key 2174 } 2175 2176 response = send_graphql_query(token, organization_context, graphql_query, variables) 2177 2178 # get key from the result 2179 key = response['data']['completeMultipartUploadV2']['key'] 2180 2181 variables = { 2182 "key": key, 2183 "testId": test_id 2184 } 2185 2186 # call launchBinaryUploadProcessing 2187 if quick_scan or enable_bandit_scan: 2188 graphql_query = """ 2189 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) { 2190 launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) { 2191 key 2192 newBanditScanId 2193 } 2194 } 2195 """ 2196 else: 2197 graphql_query = """ 2198 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) { 2199 launchBinaryUploadProcessing(key: $key, testId: $testId) { 2200 key 2201 newBanditScanId 2202 } 2203 } 2204 """ 2205 2206 if quick_scan: 2207 variables["configurationOptions"] = ["QUICK_SCAN"] 2208 2209 if enable_bandit_scan: 2210 config_options = variables.get("configurationOptions", []) 2211 config_options.append("ENABLE_BANDIT_SCAN") 2212 variables["configurationOptions"] = config_options 2213 2214 response = send_graphql_query(token, organization_context, graphql_query, variables) 2215 2216 return response['data']
Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- test_id (str, required): Test ID to upload the file for.
- file_path (str, required): Local path to the file to upload.
- chunk_size (int, optional): The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
- quick_scan (bool, optional): If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
- enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
- ValueError: Raised if test_id or file_path are not provided.
- Exception: Raised if the query fails.
Returns:
dict: The response from the GraphQL query, a completeMultipartUpload Object.
2219def upload_test_results_file(token, organization_context, test_id=None, file_path=None): 2220 """ 2221 Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that. 2222 2223 Args: 2224 token (str): 2225 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2226 organization_context (str): 2227 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2228 test_id (str, required): 2229 Test ID to upload the file for. 2230 file_path (str, required): 2231 Local path to the file to upload. 2232 2233 Raises: 2234 ValueError: Raised if test_id or file_path are not provided. 2235 Exception: Raised if the query fails. 2236 2237 Returns: 2238 dict: The response from the GraphQL query, a completeTestResultUpload Object. 2239 """ 2240 if not test_id: 2241 raise ValueError("Test Id is required") 2242 if not file_path: 2243 raise ValueError("File Path is required") 2244 2245 # Gerneate Test Result Upload URL 2246 graphql_query = """ 2247 mutation GenerateTestResultUploadUrl_SDK($testId: ID!) { 2248 generateSinglePartUploadUrl(testId: $testId) { 2249 uploadUrl 2250 key 2251 } 2252 } 2253 """ 2254 2255 variables = { 2256 "testId": test_id 2257 } 2258 2259 response = send_graphql_query(token, organization_context, graphql_query, variables) 2260 2261 # get the upload URL and key 2262 upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl'] 2263 key = response['data']['generateSinglePartUploadUrl']['key'] 2264 2265 # upload the file 2266 upload_file_to_url(upload_url, file_path) 2267 2268 # complete the upload 2269 graphql_query = """ 2270 mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) { 2271 launchTestResultProcessing(key: $key, testId: $testId) { 2272 key 2273 } 2274 } 2275 """ 2276 2277 variables = { 2278 "testId": test_id, 2279 "key": key 2280 } 2281 2282 response = send_graphql_query(token, organization_context, graphql_query, variables) 2283 return response['data']
Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- test_id (str, required): Test ID to upload the file for.
- file_path (str, required): Local path to the file to upload.
Raises:
- ValueError: Raised if test_id or file_path are not provided.
- Exception: Raised if the query fails.
Returns:
dict: The response from the GraphQL query, a completeTestResultUpload Object.
2286def upload_bytes_to_url(url, bytes): 2287 """ 2288 Used for uploading a file to a pre-signed S3 URL 2289 2290 Args: 2291 url (str): 2292 (Pre-signed S3) URL 2293 bytes (bytes): 2294 Bytes to upload 2295 2296 Raises: 2297 Exception: If the response status code is not 200 2298 2299 Returns: 2300 requests.Response: Response object 2301 """ 2302 response = requests.put(url, data=bytes) 2303 2304 if response.status_code == 200: 2305 return response 2306 else: 2307 raise Exception(f"Error: {response.status_code} - {response.text}")
Used for uploading a file to a pre-signed S3 URL
Arguments:
- url (str): (Pre-signed S3) URL
- bytes (bytes): Bytes to upload
Raises:
- Exception: If the response status code is not 200
Returns:
requests.Response: Response object
2310def upload_file_to_url(url, file_path): 2311 """ 2312 Used for uploading a file to a pre-signed S3 URL 2313 2314 Args: 2315 url (str): 2316 (Pre-signed S3) URL 2317 file_path (str): 2318 Local path to file to upload 2319 2320 Raises: 2321 Exception: If the response status code is not 200 2322 2323 Returns: 2324 requests.Response: Response object 2325 """ 2326 with open(file_path, 'rb') as file: 2327 response = requests.put(url, data=file) 2328 2329 if response.status_code == 200: 2330 return response 2331 else: 2332 raise Exception(f"Error: {response.status_code} - {response.text}")
Used for uploading a file to a pre-signed S3 URL
Arguments:
- url (str): (Pre-signed S3) URL
- file_path (str): Local path to file to upload
Raises:
- Exception: If the response status code is not 200
Returns:
requests.Response: Response object