finite_state_sdk
1import json 2from enum import Enum 3 4import requests 5import time 6from warnings import warn 7import finite_state_sdk.queries as queries 8 9API_URL = 'https://platform.finitestate.io/api/v1/graphql' 10AUDIENCE = "https://platform.finitestate.io/api/v1/graphql" 11TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token" 12 13""" 14DEFAULT CHUNK SIZE: 1000 MiB 15""" 16DEFAULT_CHUNK_SIZE = 1024**2 * 1000 17""" 18MAX CHUNK SIZE: 2 GiB 19""" 20MAX_CHUNK_SIZE = 1024**2 * 2000 21""" 22MIN CHUNK SIZE: 5 MiB 23""" 24MIN_CHUNK_SIZE = 1024**2 * 5 25 26 27class UploadMethod(Enum): 28 """ 29 Enumeration class representing different upload methods. 30 31 Attributes: 32 WEB_APP_UI: Upload method via web application UI. 33 API: Upload method via API. 34 GITHUB_INTEGRATION: Upload method via GitHub integration. 35 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 36 37 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 38 """ 39 WEB_APP_UI = "WEB_APP_UI" 40 API = "API" 41 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 42 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION" 43 44 45def create_artifact( 46 token, 47 organization_context, 48 business_unit_id=None, 49 created_by_user_id=None, 50 asset_version_id=None, 51 artifact_name=None, 52 product_id=None, 53): 54 """ 55 Create a new Artifact. 56 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 57 Please see the examples in the Github repository for more information: 58 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 59 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 60 61 Args: 62 token (str): 63 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 64 organization_context (str): 65 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 66 business_unit_id (str, required): 67 Business Unit ID to associate the artifact with. 68 created_by_user_id (str, required): 69 User ID of the user creating the artifact. 70 asset_version_id (str, required): 71 Asset Version ID to associate the artifact with. 72 artifact_name (str, required): 73 The name of the Artifact being created. 74 product_id (str, optional): 75 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 76 77 Raises: 78 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 79 Exception: Raised if the query fails. 80 81 Returns: 82 dict: createArtifact Object 83 """ 84 if not business_unit_id: 85 raise ValueError("Business unit ID is required") 86 if not created_by_user_id: 87 raise ValueError("Created by user ID is required") 88 if not asset_version_id: 89 raise ValueError("Asset version ID is required") 90 if not artifact_name: 91 raise ValueError("Artifact name is required") 92 93 graphql_query = """ 94 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 95 createArtifact(input: $input) { 96 id 97 name 98 assetVersion { 99 id 100 name 101 asset { 102 id 103 name 104 } 105 } 106 createdBy { 107 id 108 email 109 } 110 ctx { 111 asset 112 products 113 businessUnits 114 } 115 } 116 } 117 """ 118 119 # Asset name, business unit context, and creating user are required 120 variables = { 121 "input": { 122 "name": artifact_name, 123 "createdBy": created_by_user_id, 124 "assetVersion": asset_version_id, 125 "ctx": { 126 "asset": asset_version_id, 127 "businessUnits": [business_unit_id] 128 } 129 } 130 } 131 132 if product_id is not None: 133 variables["input"]["ctx"]["products"] = product_id 134 135 response = send_graphql_query(token, organization_context, graphql_query, variables) 136 return response['data'] 137 138 139def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 140 """ 141 Create a new Asset. 142 143 Args: 144 token (str): 145 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 146 organization_context (str): 147 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 148 business_unit_id (str, required): 149 Business Unit ID to associate the asset with. 150 created_by_user_id (str, required): 151 User ID of the user creating the asset. 152 asset_name (str, required): 153 The name of the Asset being created. 154 product_id (str, optional): 155 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 156 157 Raises: 158 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 159 Exception: Raised if the query fails. 160 161 Returns: 162 dict: createAsset Object 163 """ 164 if not business_unit_id: 165 raise ValueError("Business unit ID is required") 166 if not created_by_user_id: 167 raise ValueError("Created by user ID is required") 168 if not asset_name: 169 raise ValueError("Asset name is required") 170 171 graphql_query = """ 172 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 173 createAsset(input: $input) { 174 id 175 name 176 dependentProducts { 177 id 178 name 179 } 180 group { 181 id 182 name 183 } 184 createdBy { 185 id 186 email 187 } 188 ctx { 189 asset 190 products 191 businessUnits 192 } 193 } 194 } 195 """ 196 197 # Asset name, business unit context, and creating user are required 198 variables = { 199 "input": { 200 "name": asset_name, 201 "group": business_unit_id, 202 "createdBy": created_by_user_id, 203 "ctx": { 204 "businessUnits": [business_unit_id] 205 } 206 } 207 } 208 209 if product_id is not None: 210 variables["input"]["ctx"]["products"] = product_id 211 212 response = send_graphql_query(token, organization_context, graphql_query, variables) 213 return response['data'] 214 215 216def create_asset_version( 217 token, 218 organization_context, 219 business_unit_id=None, 220 created_by_user_id=None, 221 asset_id=None, 222 asset_version_name=None, 223 product_id=None, 224): 225 """ 226 Create a new Asset Version. 227 228 Args: 229 token (str): 230 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 231 organization_context (str): 232 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 233 business_unit_id (str, required): 234 Business Unit ID to associate the asset version with. 235 created_by_user_id (str, required): 236 User ID of the user creating the asset version. 237 asset_id (str, required): 238 Asset ID to associate the asset version with. 239 asset_version_name (str, required): 240 The name of the Asset Version being created. 241 product_id (str, optional): 242 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 243 244 Raises: 245 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 246 Exception: Raised if the query fails. 247 248 Returns: 249 dict: createAssetVersion Object 250 251 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 252 """ 253 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 254 if not business_unit_id: 255 raise ValueError("Business unit ID is required") 256 if not created_by_user_id: 257 raise ValueError("Created by user ID is required") 258 if not asset_id: 259 raise ValueError("Asset ID is required") 260 if not asset_version_name: 261 raise ValueError("Asset version name is required") 262 263 graphql_query = """ 264 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 265 createAssetVersion(input: $input) { 266 id 267 name 268 asset { 269 id 270 name 271 } 272 createdBy { 273 id 274 email 275 } 276 ctx { 277 asset 278 products 279 businessUnits 280 } 281 } 282 } 283 """ 284 285 # Asset name, business unit context, and creating user are required 286 variables = { 287 "input": { 288 "name": asset_version_name, 289 "createdBy": created_by_user_id, 290 "asset": asset_id, 291 "ctx": { 292 "asset": asset_id, 293 "businessUnits": [business_unit_id] 294 } 295 } 296 } 297 298 if product_id is not None: 299 variables["input"]["ctx"]["products"] = product_id 300 301 response = send_graphql_query(token, organization_context, graphql_query, variables) 302 return response['data'] 303 304 305def create_asset_version_on_asset( 306 token, 307 organization_context, 308 created_by_user_id=None, 309 asset_id=None, 310 asset_version_name=None, 311 product_id=None, 312): 313 """ 314 Create a new Asset Version. 315 316 Args: 317 token (str): 318 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 319 organization_context (str): 320 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 321 created_by_user_id (str, optional): 322 User ID of the user creating the asset version. 323 asset_id (str, required): 324 Asset ID to associate the asset version with. 325 asset_version_name (str, required): 326 The name of the Asset Version being created. 327 328 Raises: 329 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 330 Exception: Raised if the query fails. 331 332 Returns: 333 dict: createAssetVersion Object 334 """ 335 if not asset_id: 336 raise ValueError("Asset ID is required") 337 if not asset_version_name: 338 raise ValueError("Asset version name is required") 339 340 graphql_query = """ 341 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) { 342 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) { 343 id 344 assetVersion { 345 id 346 } 347 } 348 } 349 """ 350 351 # Asset name, business unit context, and creating user are required 352 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 353 354 if created_by_user_id: 355 variables["createdByUserId"] = created_by_user_id 356 357 if product_id: 358 variables["productId"] = product_id 359 360 response = send_graphql_query(token, organization_context, graphql_query, variables) 361 return response['data'] 362 363 364def create_new_asset_version_artifact_and_test_for_upload( 365 token, 366 organization_context, 367 business_unit_id=None, 368 created_by_user_id=None, 369 asset_id=None, 370 version=None, 371 product_id=None, 372 test_type=None, 373 artifact_description=None, 374 upload_method: UploadMethod = UploadMethod.API, 375): 376 """ 377 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 378 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 379 380 Args: 381 token (str): 382 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 383 organization_context (str): 384 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 385 business_unit_id (str, optional): 386 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 387 created_by_user_id (str, optional): 388 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 389 asset_id (str, required): 390 Asset ID to create the asset version for. If not provided, the default asset will be used. 391 version (str, required): 392 Version to create the asset version for. 393 product_id (str, optional): 394 Product ID to create the entities for. If not provided, the default product will be used. 395 test_type (str, required): 396 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 397 artifact_description (str, optional): 398 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 399 upload_method (UploadMethod, optional): 400 The method of uploading the test results. Default is UploadMethod.API. 401 402 403 Raises: 404 ValueError: Raised if asset_id or version are not provided. 405 Exception: Raised if the query fails. 406 407 Returns: 408 str: The Test ID of the newly created test that is used for uploading the file. 409 """ 410 if not asset_id: 411 raise ValueError("Asset ID is required") 412 if not version: 413 raise ValueError("Version is required") 414 415 assets = get_all_assets(token, organization_context, asset_id=asset_id) 416 if not assets: 417 raise ValueError("No assets found with the provided asset ID") 418 asset = assets[0] 419 420 # get the asset name 421 asset_name = asset['name'] 422 423 # get the existing asset product IDs 424 asset_product_ids = asset['ctx']['products'] 425 426 # get the asset product ID 427 if product_id and product_id not in asset_product_ids: 428 asset_product_ids.append(product_id) 429 430 # if business_unit_id or created_by_user_id are not provided, get the existing asset 431 if not business_unit_id or not created_by_user_id: 432 if not business_unit_id: 433 business_unit_id = asset['group']['id'] 434 if not created_by_user_id: 435 created_by_user_id = asset['createdBy']['id'] 436 437 if not business_unit_id: 438 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 439 if not created_by_user_id: 440 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 441 442 # create the asset version 443 response = create_asset_version_on_asset( 444 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id 445 ) 446 # get the asset version ID 447 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 448 449 # create the test 450 if test_type == "finite_state_binary_analysis": 451 # create the artifact 452 if not artifact_description: 453 artifact_description = "Binary" 454 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 455 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 456 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 457 artifact_name=binary_artifact_name, product_id=asset_product_ids) 458 459 # get the artifact ID 460 binary_artifact_id = response['createArtifact']['id'] 461 462 # create the test 463 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 464 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 465 created_by_user_id=created_by_user_id, asset_id=asset_id, 466 artifact_id=binary_artifact_id, product_id=asset_product_ids, 467 test_name=test_name, upload_method=upload_method) 468 test_id = response['createTest']['id'] 469 return test_id 470 471 else: 472 # create the artifact 473 if not artifact_description: 474 artifact_description = "Unspecified Artifact" 475 artifact_name = f"{asset_name} {version} - {artifact_description}" 476 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 477 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 478 artifact_name=artifact_name, product_id=asset_product_ids) 479 480 # get the artifact ID 481 binary_artifact_id = response['createArtifact']['id'] 482 483 # create the test 484 test_name = f"{asset_name} {version} - {test_type}" 485 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 486 created_by_user_id=created_by_user_id, asset_id=asset_id, 487 artifact_id=binary_artifact_id, product_id=asset_product_ids, 488 test_name=test_name, test_type=test_type, 489 upload_method=upload_method) 490 test_id = response['createTest']['id'] 491 return test_id 492 493 494def create_new_asset_version_and_upload_binary( 495 token, 496 organization_context, 497 business_unit_id=None, 498 created_by_user_id=None, 499 asset_id=None, 500 version=None, 501 file_path=None, 502 product_id=None, 503 artifact_description=None, 504 quick_scan=False, 505 upload_method: UploadMethod = UploadMethod.API, 506 enable_bandit_scan: bool = False, 507): 508 """ 509 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 510 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 511 512 Args: 513 token (str): 514 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 515 organization_context (str): 516 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 517 business_unit_id (str, optional): 518 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 519 created_by_user_id (str, optional): 520 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 521 asset_id (str, required): 522 Asset ID to create the asset version for. 523 version (str, required): 524 Version to create the asset version for. 525 file_path (str, required): 526 Local path to the file to upload. 527 product_id (str, optional): 528 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 529 artifact_description (str, optional): 530 Description of the artifact. If not provided, the default is "Firmware Binary". 531 quick_scan (bool, optional): 532 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 533 upload_method (UploadMethod, optional): 534 The method of uploading the test results. Default is UploadMethod.API. 535 enable_bandit_scan (bool, optional): 536 If True, will create an additional bandit scan in addition to the default binary analysis scan. 537 538 Raises: 539 ValueError: Raised if asset_id, version, or file_path are not provided. 540 Exception: Raised if any of the queries fail. 541 542 Returns: 543 dict: The response from the GraphQL query, a createAssetVersion Object. 544 """ 545 if not asset_id: 546 raise ValueError("Asset ID is required") 547 if not version: 548 raise ValueError("Version is required") 549 if not file_path: 550 raise ValueError("File path is required") 551 552 # create the asset version and binary test 553 if not artifact_description: 554 artifact_description = "Firmware Binary" 555 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 556 token, 557 organization_context, 558 business_unit_id=business_unit_id, 559 created_by_user_id=created_by_user_id, 560 asset_id=asset_id, 561 version=version, 562 product_id=product_id, 563 test_type="finite_state_binary_analysis", 564 artifact_description=artifact_description, 565 upload_method=upload_method, 566 ) 567 568 # upload file for binary test 569 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 570 quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan) 571 return response 572 573 574def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 575 created_by_user_id=None, asset_id=None, version=None, 576 file_path=None, product_id=None, test_type=None, 577 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 578 """ 579 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 580 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 581 582 Args: 583 token (str): 584 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 585 organization_context (str): 586 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 587 business_unit_id (str, optional): 588 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 589 created_by_user_id (str, optional): 590 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 591 asset_id (str, required): 592 Asset ID to create the asset version for. 593 version (str, required): 594 Version to create the asset version for. 595 file_path (str, required): 596 Path to the test results file to upload. 597 product_id (str, optional): 598 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 599 test_type (str, required): 600 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 601 artifact_description (str, optional): 602 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 603 upload_method (UploadMethod, optional): 604 The method of uploading the test results. Default is UploadMethod.API. 605 606 Raises: 607 ValueError: If the asset_id, version, or file_path are not provided. 608 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 609 610 Returns: 611 dict: The response from the GraphQL query, a createAssetVersion Object. 612 """ 613 if not asset_id: 614 raise ValueError("Asset ID is required") 615 if not version: 616 raise ValueError("Version is required") 617 if not file_path: 618 raise ValueError("File path is required") 619 if not test_type: 620 raise ValueError("Test type is required") 621 622 # create the asset version and test 623 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 624 business_unit_id=business_unit_id, 625 created_by_user_id=created_by_user_id, 626 asset_id=asset_id, version=version, 627 product_id=product_id, test_type=test_type, 628 artifact_description=artifact_description, 629 upload_method=upload_method) 630 631 # upload test results file 632 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 633 return response 634 635 636def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 637 product_description=None, vendor_id=None, vendor_name=None): 638 """ 639 Create a new Product. 640 641 Args: 642 token (str): 643 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 644 organization_context (str): 645 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 646 business_unit_id (str, required): 647 Business Unit ID to associate the product with. 648 created_by_user_id (str, required): 649 User ID of the user creating the product. 650 product_name (str, required): 651 The name of the Product being created. 652 product_description (str, optional): 653 The description of the Product being created. 654 vendor_id (str, optional): 655 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 656 vendor_name (str, optional): 657 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 658 659 Raises: 660 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 661 Exception: Raised if the query fails. 662 663 Returns: 664 dict: createProduct Object 665 """ 666 667 if not business_unit_id: 668 raise ValueError("Business unit ID is required") 669 if not created_by_user_id: 670 raise ValueError("Created by user ID is required") 671 if not product_name: 672 raise ValueError("Product name is required") 673 674 graphql_query = """ 675 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 676 createProduct(input: $input) { 677 id 678 name 679 vendor { 680 name 681 } 682 group { 683 id 684 name 685 } 686 createdBy { 687 id 688 email 689 } 690 ctx { 691 businessUnit 692 } 693 } 694 } 695 """ 696 697 # Product name, business unit context, and creating user are required 698 variables = { 699 "input": { 700 "name": product_name, 701 "group": business_unit_id, 702 "createdBy": created_by_user_id, 703 "ctx": { 704 "businessUnit": business_unit_id 705 } 706 } 707 } 708 709 if product_description is not None: 710 variables["input"]["description"] = product_description 711 712 # If the vendor ID is specified, this will link the new product to the existing vendor 713 if vendor_id is not None: 714 variables["input"]["vendor"] = { 715 "id": vendor_id 716 } 717 718 # If the vendor name is specified, this will create a new vendor and link it to the new product 719 if vendor_name is not None: 720 variables["input"]["createVendor"] = { 721 "name": vendor_name 722 } 723 724 response = send_graphql_query(token, organization_context, graphql_query, variables) 725 726 return response['data'] 727 728 729def create_test( 730 token, 731 organization_context, 732 business_unit_id=None, 733 created_by_user_id=None, 734 asset_id=None, 735 artifact_id=None, 736 test_name=None, 737 product_id=None, 738 test_type=None, 739 tools=[], 740 upload_method: UploadMethod = UploadMethod.API, 741): 742 """ 743 Create a new Test object for uploading files. 744 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 745 Please see the examples in the Github repository for more information: 746 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 747 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 748 749 Args: 750 token (str): 751 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 752 organization_context (str): 753 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 754 business_unit_id (str, required): 755 Business Unit ID to associate the Test with. 756 created_by_user_id (str, required): 757 User ID of the user creating the Test. 758 asset_id (str, required): 759 Asset ID to associate the Test with. 760 artifact_id (str, required): 761 Artifact ID to associate the Test with. 762 test_name (str, required): 763 The name of the Test being created. 764 product_id (str, optional): 765 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 766 test_type (str, required): 767 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 768 tools (list, optional): 769 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 770 upload_method (UploadMethod, required): 771 The method of uploading the test results. 772 773 Raises: 774 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 775 Exception: Raised if the query fails. 776 777 Returns: 778 dict: createTest Object 779 """ 780 if not business_unit_id: 781 raise ValueError("Business unit ID is required") 782 if not created_by_user_id: 783 raise ValueError("Created by user ID is required") 784 if not asset_id: 785 raise ValueError("Asset ID is required") 786 if not artifact_id: 787 raise ValueError("Artifact ID is required") 788 if not test_name: 789 raise ValueError("Test name is required") 790 if not test_type: 791 raise ValueError("Test type is required") 792 793 graphql_query = """ 794 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 795 createTest(input: $input) { 796 id 797 name 798 artifactUnderTest { 799 id 800 name 801 assetVersion { 802 id 803 name 804 asset { 805 id 806 name 807 dependentProducts { 808 id 809 name 810 } 811 } 812 } 813 } 814 createdBy { 815 id 816 email 817 } 818 ctx { 819 asset 820 products 821 businessUnits 822 } 823 uploadMethod 824 } 825 } 826 """ 827 828 # Asset name, business unit context, and creating user are required 829 variables = { 830 "input": { 831 "name": test_name, 832 "createdBy": created_by_user_id, 833 "artifactUnderTest": artifact_id, 834 "testResultFileFormat": test_type, 835 "ctx": { 836 "asset": asset_id, 837 "businessUnits": [business_unit_id] 838 }, 839 "tools": tools, 840 "uploadMethod": upload_method.value 841 } 842 } 843 844 if product_id is not None: 845 variables["input"]["ctx"]["products"] = product_id 846 847 response = send_graphql_query(token, organization_context, graphql_query, variables) 848 return response['data'] 849 850 851def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 852 asset_id=None, artifact_id=None, test_name=None, product_id=None, 853 upload_method: UploadMethod = UploadMethod.API): 854 """ 855 Create a new Test object for uploading files for Finite State Binary Analysis. 856 857 Args: 858 token (str): 859 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 860 organization_context (str): 861 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 862 business_unit_id (str, required): 863 Business Unit ID to associate the Test with. 864 created_by_user_id (str, required): 865 User ID of the user creating the Test. 866 asset_id (str, required): 867 Asset ID to associate the Test with. 868 artifact_id (str, required): 869 Artifact ID to associate the Test with. 870 test_name (str, required): 871 The name of the Test being created. 872 product_id (str, optional): 873 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 874 upload_method (UploadMethod, optional): 875 The method of uploading the test results. Default is UploadMethod.API. 876 877 Raises: 878 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 879 Exception: Raised if the query fails. 880 881 Returns: 882 dict: createTest Object 883 """ 884 tools = [ 885 { 886 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 887 "name": "Finite State Binary Analysis" 888 } 889 ] 890 return create_test(token, organization_context, business_unit_id=business_unit_id, 891 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 892 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 893 tools=tools, upload_method=upload_method) 894 895 896def create_test_as_cyclone_dx( 897 token, 898 organization_context, 899 business_unit_id=None, 900 created_by_user_id=None, 901 asset_id=None, 902 artifact_id=None, 903 test_name=None, 904 product_id=None, 905 upload_method: UploadMethod = UploadMethod.API, 906): 907 """ 908 Create a new Test object for uploading CycloneDX files. 909 910 Args: 911 token (str): 912 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 913 organization_context (str): 914 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 915 business_unit_id (str, required): 916 Business Unit ID to associate the Test with. 917 created_by_user_id (str, required): 918 User ID of the user creating the Test. 919 asset_id (str, required): 920 Asset ID to associate the Test with. 921 artifact_id (str, required): 922 Artifact ID to associate the Test with. 923 test_name (str, required): 924 The name of the Test being created. 925 product_id (str, optional): 926 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 927 upload_method (UploadMethod, optional): 928 The method of uploading the test results. Default is UploadMethod.API. 929 930 Raises: 931 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 932 Exception: Raised if the query fails. 933 934 Returns: 935 dict: createTest Object 936 """ 937 return create_test(token, organization_context, business_unit_id=business_unit_id, 938 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 939 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method) 940 941 942def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 943 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 944 upload_method: UploadMethod = UploadMethod.API): 945 """ 946 Create a new Test object for uploading Third Party Scanner files. 947 948 Args: 949 token (str): 950 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 951 organization_context (str): 952 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 953 business_unit_id (str, required): 954 Business Unit ID to associate the Test with. 955 created_by_user_id (str, required): 956 User ID of the user creating the Test. 957 asset_id (str, required): 958 Asset ID to associate the Test with. 959 artifact_id (str, required): 960 Artifact ID to associate the Test with. 961 test_name (str, required): 962 The name of the Test being created. 963 product_id (str, optional): 964 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 965 test_type (str, required): 966 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 967 upload_method (UploadMethod, optional): 968 The method of uploading the test results. Default is UploadMethod.API. 969 970 Raises: 971 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 972 Exception: Raised if the query fails. 973 974 Returns: 975 dict: createTest Object 976 """ 977 return create_test(token, organization_context, business_unit_id=business_unit_id, 978 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 979 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method) 980 981 982def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 983 report_subtype=None, output_filename=None, verbose=False): 984 """ 985 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 986 987 Args: 988 token (str): 989 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 990 organization_context (str): 991 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 992 asset_version_id (str, required): 993 The Asset Version ID to download the report for. 994 report_type (str, required): 995 The file type of the report to download. Valid values are "CSV" and "PDF". 996 report_subtype (str, required): 997 The type of report to download. Based on available reports for the `report_type` specified 998 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 999 Valid values for PDF are "RISK_SUMMARY". 1000 output_filename (str, optional): 1001 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1002 verbose (bool, optional): 1003 If True, will print additional information to the console. Defaults to False. 1004 1005 Raises: 1006 ValueError: Raised if required parameters are not provided. 1007 Exception: Raised if the query fails. 1008 1009 Returns: 1010 None 1011 """ 1012 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1013 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1014 1015 # Send an HTTP GET request to the URL 1016 response = requests.get(url) 1017 1018 # Check if the request was successful (status code 200) 1019 if response.status_code == 200: 1020 # Open a local file in binary write mode and write the content to it 1021 if verbose: 1022 print("File downloaded successfully.") 1023 with open(output_filename, 'wb') as file: 1024 file.write(response.content) 1025 if verbose: 1026 print(f'Wrote file to {output_filename}') 1027 else: 1028 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1029 1030 1031def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1032 output_filename=None, verbose=False): 1033 """ 1034 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1035 1036 Args: 1037 token (str): 1038 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1039 organization_context (str): 1040 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1041 product_id (str, required): 1042 The Product ID to download the report for. 1043 report_type (str, required): 1044 The file type of the report to download. Valid values are "CSV". 1045 report_subtype (str, required): 1046 The type of report to download. Based on available reports for the `report_type` specified 1047 Valid values for CSV are "ALL_FINDINGS". 1048 output_filename (str, optional): 1049 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1050 verbose (bool, optional): 1051 If True, will print additional information to the console. Defaults to False. 1052 """ 1053 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1054 report_subtype=report_subtype, verbose=verbose) 1055 1056 # Send an HTTP GET request to the URL 1057 response = requests.get(url) 1058 1059 # Check if the request was successful (status code 200) 1060 if response.status_code == 200: 1061 # Open a local file in binary write mode and write the content to it 1062 if verbose: 1063 print("File downloaded successfully.") 1064 with open(output_filename, 'wb') as file: 1065 file.write(response.content) 1066 if verbose: 1067 print(f'Wrote file to {output_filename}') 1068 else: 1069 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1070 1071 1072def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1073 output_filename="sbom.json", verbose=False): 1074 """ 1075 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1076 1077 Args: 1078 token (str): 1079 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1080 organization_context (str): 1081 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1082 sbom_type (str, required): 1083 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1084 sbom_subtype (str, required): 1085 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1086 asset_version_id (str, required): 1087 The Asset Version ID to download the SBOM for. 1088 output_filename (str, required): 1089 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1090 verbose (bool, optional): 1091 If True, will print additional information to the console. Defaults to False. 1092 1093 Raises: 1094 ValueError: Raised if required parameters are not provided. 1095 Exception: Raised if the query fails. 1096 1097 Returns: 1098 None 1099 """ 1100 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1101 asset_version_id=asset_version_id, verbose=verbose) 1102 1103 # Send an HTTP GET request to the URL 1104 response = requests.get(url) 1105 1106 # Check if the request was successful (status code 200) 1107 if response.status_code == 200: 1108 # Open a local file in binary write mode and write the content to it 1109 if verbose: 1110 print("File downloaded successfully.") 1111 with open(output_filename, 'wb') as file: 1112 file.write(response.content) 1113 if verbose: 1114 print(f'Wrote file to {output_filename}') 1115 else: 1116 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1117 1118 1119def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1120 """ 1121 Helper method to read a file in chunks. 1122 1123 Args: 1124 file_path (str): 1125 Local path to the file to read. 1126 chunk_size (int, optional): 1127 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1128 1129 Yields: 1130 bytes: The next chunk of the file. 1131 1132 Raises: 1133 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1134 """ 1135 with open(file_path, 'rb') as f: 1136 while True: 1137 chunk = f.read(chunk_size) 1138 if chunk: 1139 yield chunk 1140 else: 1141 break 1142 1143 1144def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1145 """ 1146 Get all artifacts in the organization. Uses pagination to get all results. 1147 1148 Args: 1149 token (str): 1150 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1151 organization_context (str): 1152 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1153 artifact_id (str, optional): 1154 An optional Artifact ID if this is used to get a single artifact, by default None 1155 business_unit_id (str, optional): 1156 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1157 1158 Raises: 1159 Exception: Raised if the query fails. 1160 1161 Returns: 1162 list: List of Artifact Objects 1163 """ 1164 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1165 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets') 1166 1167 1168def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1169 """ 1170 Gets all assets in the organization. Uses pagination to get all results. 1171 1172 Args: 1173 token (str): 1174 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1175 organization_context (str): 1176 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1177 asset_id (str, optional): 1178 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1179 business_unit_id (str, optional): 1180 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1181 1182 Raises: 1183 Exception: Raised if the query fails. 1184 1185 Returns: 1186 list: List of Asset Objects 1187 """ 1188 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1189 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1190 1191 1192def get_all_asset_versions(token, organization_context): 1193 """ 1194 Get all asset versions in the organization. Uses pagination to get all results. 1195 1196 Args: 1197 token (str): 1198 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1199 organization_context (str): 1200 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1201 1202 Raises: 1203 Exception: Raised if the query fails. 1204 1205 Returns: 1206 list: List of AssetVersion Objects 1207 """ 1208 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1209 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions') 1210 1211 1212def get_all_asset_versions_for_product(token, organization_context, product_id): 1213 """ 1214 Get all asset versions for a product. Uses pagination to get all results. 1215 1216 Args: 1217 token (str): 1218 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1219 organization_context (str): 1220 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1221 product_id (str): 1222 The Product ID to get asset versions for 1223 1224 Returns: 1225 list: List of AssetVersion Objects 1226 """ 1227 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1228 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1229 1230 1231def get_all_business_units(token, organization_context): 1232 """ 1233 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1234 1235 Args: 1236 token (str): 1237 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1238 organization_context (str): 1239 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1240 1241 Raises: 1242 Exception: Raised if the query fails. 1243 1244 Returns: 1245 list: List of Group Objects 1246 """ 1247 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1248 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups') 1249 1250 1251def get_all_organizations(token, organization_context): 1252 """ 1253 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1254 1255 Args: 1256 token (str): 1257 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1258 organization_context (str): 1259 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1260 1261 Raises: 1262 Exception: Raised if the query fails. 1263 1264 Returns: 1265 list: List of Organization Objects 1266 """ 1267 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1268 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations') 1269 1270 1271def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1272 """ 1273 Get all results from a paginated GraphQL query 1274 1275 Args: 1276 token (str): 1277 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1278 organization_context (str): 1279 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1280 query (str): 1281 The GraphQL query string 1282 variables (dict, optional): 1283 Variables to be used in the GraphQL query, by default None 1284 field (str, required): 1285 The field in the response JSON that contains the results 1286 limit (int, Optional): 1287 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1288 1289 Raises: 1290 Exception: If the response status code is not 200, or if the field is not in the response JSON 1291 1292 Returns: 1293 list: List of results 1294 """ 1295 1296 if not field: 1297 raise Exception("Error: field is required") 1298 if limit and limit > 1000: 1299 raise Exception("Error: limit cannot be greater than 1000") 1300 if limit and limit < 1: 1301 raise Exception("Error: limit cannot be less than 1") 1302 if not variables["first"]: 1303 raise Exception("Error: first is required") 1304 if variables["first"] < 1: 1305 raise Exception("Error: first cannot be less than 1") 1306 if variables["first"] > 1000: 1307 raise Exception("Error: limit cannot be greater than 1000") 1308 1309 # query the API for the first page of results 1310 response_data = send_graphql_query(token, organization_context, query, variables) 1311 1312 # if there are no results, return an empty list 1313 if not response_data: 1314 return [] 1315 1316 # create a list to store the results 1317 results = [] 1318 1319 # add the first page of results to the list 1320 if field in response_data['data']: 1321 results.extend(response_data['data'][field]) 1322 else: 1323 raise Exception(f"Error: {field} not in response JSON") 1324 1325 if len(response_data['data'][field]) > 0: 1326 # get the cursor from the last entry in the list 1327 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1328 1329 while cursor: 1330 if limit and len(results) == limit: 1331 break 1332 1333 variables['after'] = cursor 1334 1335 # add the next page of results to the list 1336 response_data = send_graphql_query(token, organization_context, query, variables) 1337 results.extend(response_data['data'][field]) 1338 1339 try: 1340 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1341 except IndexError: 1342 # when there is no additional cursor, stop getting more pages 1343 cursor = None 1344 1345 return results 1346 1347 1348def get_all_products(token, organization_context): 1349 """ 1350 Get all products in the organization. Uses pagination to get all results. 1351 1352 Args: 1353 token (str): 1354 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1355 organization_context (str): 1356 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1357 1358 Raises: 1359 Exception: Raised if the query fails. 1360 1361 Returns: 1362 list: List of Product Objects 1363 1364 .. deprecated:: 0.1.4. Use get_products instead. 1365 """ 1366 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1367 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1368 queries.ALL_PRODUCTS['variables'], 'allProducts') 1369 1370 1371def get_all_users(token, organization_context): 1372 """ 1373 Get all users in the organization. Uses pagination to get all results. 1374 1375 Args: 1376 token (str): 1377 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1378 organization_context (str): 1379 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1380 1381 Raises: 1382 Exception: Raised if the query fails. 1383 1384 Returns: 1385 list: List of User Objects 1386 """ 1387 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1388 queries.ALL_USERS['variables'], 'allUsers') 1389 1390 1391def get_artifact_context(token, organization_context, artifact_id): 1392 """ 1393 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1394 1395 Args: 1396 token (str): 1397 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1398 organization_context (str): 1399 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1400 1401 Raises: 1402 Exception: Raised if the query fails. 1403 1404 Returns: 1405 dict: Artifact Context Object 1406 """ 1407 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1408 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1409 1410 return artifact[0]['ctx'] 1411 1412 1413def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1414 """ 1415 Gets assets in the organization. Uses pagination to get all results. 1416 1417 Args: 1418 token (str): 1419 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1420 organization_context (str): 1421 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1422 asset_id (str, optional): 1423 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1424 business_unit_id (str, optional): 1425 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1426 1427 Raises: 1428 Exception: Raised if the query fails. 1429 1430 Returns: 1431 list: List of Asset Objects 1432 """ 1433 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1434 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1435 1436 1437def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1438 """ 1439 Gets asset versions in the organization. Uses pagination to get all results. 1440 1441 Args: 1442 token (str): 1443 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1444 organization_context (str): 1445 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1446 asset_version_id (str, optional): 1447 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1448 asset_id (str, optional): 1449 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1450 business_unit_id (str, optional): 1451 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1452 1453 Raises: 1454 Exception: Raised if the query fails. 1455 1456 Returns: 1457 list: List of AssetVersion Objects 1458 """ 1459 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1460 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1461 asset_id=asset_id, 1462 business_unit_id=business_unit_id), 1463 'allAssetVersions') 1464 1465 1466def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1467 """ 1468 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1469 1470 Args: 1471 client_id (str): 1472 CLIENT_ID as specified in the API documentation 1473 client_secret (str): 1474 CLIENT_SECRET as specified in the API documentation 1475 token_url (str, optional): 1476 Token URL, by default TOKEN_URL 1477 audience (str, optional): 1478 Audience, by default AUDIENCE 1479 1480 Raises: 1481 Exception: If the response status code is not 200 1482 1483 Returns: 1484 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1485 """ 1486 payload = { 1487 "client_id": client_id, 1488 "client_secret": client_secret, 1489 "audience": AUDIENCE, 1490 "grant_type": "client_credentials" 1491 } 1492 1493 headers = { 1494 'content-type': "application/json" 1495 } 1496 1497 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1498 if response.status_code == 200: 1499 auth_token = response.json()['access_token'] 1500 else: 1501 raise Exception(f"Error: {response.status_code} - {response.text}") 1502 1503 return auth_token 1504 1505 1506def get_findings( 1507 token, 1508 organization_context, 1509 asset_version_id=None, 1510 finding_id=None, 1511 category=None, 1512 status=None, 1513 severity=None, 1514 count=False, 1515 limit=None, 1516): 1517 """ 1518 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1519 Args: 1520 token (str): 1521 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1522 organization_context (str): 1523 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1524 asset_version_id (str, optional): 1525 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1526 finding_id (str, optional): 1527 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1528 category (str, optional): 1529 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1530 This can be a single string, or an array of values. 1531 status (str, optional): 1532 The status of Findings to return. 1533 severity (str, optional): 1534 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1535 count (bool, optional): 1536 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1537 limit (int, optional): 1538 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1539 1540 Raises: 1541 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1542 1543 Returns: 1544 list: List of Finding Objects 1545 """ 1546 1547 if limit and limit > 1000: 1548 raise Exception("Error: limit must be less than 1000") 1549 if limit and limit < 1: 1550 raise Exception("Error: limit must be greater than 0") 1551 1552 if count: 1553 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1554 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1555 finding_id=finding_id, category=category, 1556 status=status, severity=severity, 1557 limit=limit))["data"]["_allFindingsMeta"] 1558 else: 1559 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1560 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1561 finding_id=finding_id, category=category, 1562 status=status, severity=severity, 1563 limit=limit), 'allFindings', limit=limit) 1564 1565 1566def get_product_asset_versions(token, organization_context, product_id=None): 1567 """ 1568 Gets all the asset versions for a product. 1569 Args: 1570 token (str): 1571 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1572 organization_context (str): 1573 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1574 product_id (str, optional): 1575 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1576 Raises: 1577 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1578 Returns: 1579 list: List of AssetVersion Objects 1580 """ 1581 if not product_id: 1582 raise Exception("Product ID is required") 1583 1584 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1585 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1586 1587 1588def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1589 """ 1590 Gets all the products for the specified business unit. 1591 Args: 1592 token (str): 1593 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1594 organization_context (str): 1595 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1596 product_id (str, optional): 1597 Product ID to get. If not provided, will get all products in the organization. 1598 business_unit_id (str, optional): 1599 Business Unit ID to get products for. If not provided, will get all products in the organization. 1600 Raises: 1601 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1602 Returns: 1603 list: List of Product Objects 1604 """ 1605 1606 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1607 queries.GET_PRODUCTS['variables'](product_id=product_id, 1608 business_unit_id=business_unit_id), 1609 'allProducts') 1610 1611 1612def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1613 report_subtype=None, verbose=False) -> str: 1614 """ 1615 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1616 This may take several minutes to complete, depending on the size of the report. 1617 1618 Args: 1619 token (str): 1620 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1621 organization_context (str): 1622 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1623 asset_version_id (str, optional): 1624 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1625 product_id (str, optional): 1626 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1627 report_type (str, required): 1628 The file type of the report to download. Valid values are "CSV" and "PDF". 1629 report_subtype (str, required): 1630 The type of report to download. Based on available reports for the `report_type` specified 1631 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1632 Valid values for PDF are "RISK_SUMMARY". 1633 verbose (bool, optional): 1634 If True, print additional information to the console. Defaults to False. 1635 """ 1636 if not report_type: 1637 raise ValueError("Report Type is required") 1638 if not report_subtype: 1639 raise ValueError("Report Subtype is required") 1640 if not asset_version_id and not product_id: 1641 raise ValueError("Asset Version ID or Product ID is required") 1642 1643 if asset_version_id and product_id: 1644 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1645 1646 if report_type not in ["CSV", "PDF"]: 1647 raise Exception(f"Report Type {report_type} not supported") 1648 1649 if report_type == "CSV": 1650 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1651 raise Exception(f"Report Subtype {report_subtype} not supported") 1652 1653 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1654 report_type=report_type, report_subtype=report_subtype) 1655 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1656 report_type=report_type, report_subtype=report_subtype) 1657 1658 response_data = send_graphql_query(token, organization_context, mutation, variables) 1659 if verbose: 1660 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1661 1662 # get exportJobId from the result 1663 if asset_version_id: 1664 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1665 elif product_id: 1666 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1667 else: 1668 raise Exception( 1669 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1670 1671 if verbose: 1672 print(f'Export Job ID: {export_job_id}') 1673 1674 if report_type == "PDF": 1675 if report_subtype not in ["RISK_SUMMARY"]: 1676 raise Exception(f"Report Subtype {report_subtype} not supported") 1677 1678 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1679 report_type=report_type, report_subtype=report_subtype) 1680 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1681 report_type=report_type, report_subtype=report_subtype) 1682 1683 response_data = send_graphql_query(token, organization_context, mutation, variables) 1684 if verbose: 1685 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1686 1687 # get exportJobId from the result 1688 if asset_version_id: 1689 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1690 elif product_id: 1691 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1692 else: 1693 raise Exception( 1694 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1695 1696 if verbose: 1697 print(f'Export Job ID: {export_job_id}') 1698 1699 if not export_job_id: 1700 raise Exception( 1701 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1702 1703 # poll the API until the export job is complete 1704 sleep_time = 10 1705 total_time = 0 1706 if verbose: 1707 print(f'Polling every {sleep_time} seconds for export job to complete') 1708 1709 while True: 1710 time.sleep(sleep_time) 1711 total_time += sleep_time 1712 if verbose: 1713 print(f'Total time elapsed: {total_time} seconds') 1714 1715 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1716 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1717 1718 response_data = send_graphql_query(token, organization_context, query, variables) 1719 1720 if verbose: 1721 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1722 1723 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1724 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1725 if verbose: 1726 print( 1727 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1728 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1729 1730 1731def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1732 verbose=False) -> str: 1733 """ 1734 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1735 This may take several minutes to complete, depending on the size of SBOM. 1736 1737 Args: 1738 token (str): 1739 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1740 organization_context (str): 1741 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1742 sbom_type (str, required): 1743 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1744 sbom_subtype (str, required): 1745 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1746 asset_version_id (str, required): 1747 Asset Version ID to download the SBOM for. 1748 verbose (bool, optional): 1749 If True, print additional information to the console. Defaults to False. 1750 1751 Raises: 1752 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1753 Exception: Raised if the query fails. 1754 1755 Returns: 1756 str: URL to download the SBOM from. 1757 """ 1758 1759 if not sbom_type: 1760 raise ValueError("SBOM Type is required") 1761 if not sbom_subtype: 1762 raise ValueError("SBOM Subtype is required") 1763 if not asset_version_id: 1764 raise ValueError("Asset Version ID is required") 1765 1766 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1767 raise Exception(f"SBOM Type {sbom_type} not supported") 1768 1769 if sbom_type == "CYCLONEDX": 1770 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1771 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1772 1773 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1774 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1775 1776 response_data = send_graphql_query(token, organization_context, mutation, variables) 1777 if verbose: 1778 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1779 1780 # get exportJobId from the result 1781 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1782 if verbose: 1783 print(f'Export Job ID: {export_job_id}') 1784 1785 if sbom_type == "SPDX": 1786 if sbom_subtype not in ["SBOM_ONLY"]: 1787 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1788 1789 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1790 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1791 1792 response_data = send_graphql_query(token, organization_context, mutation, variables) 1793 if verbose: 1794 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1795 1796 # get exportJobId from the result 1797 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1798 if verbose: 1799 print(f'Export Job ID: {export_job_id}') 1800 1801 if not export_job_id: 1802 raise Exception( 1803 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1804 1805 # poll the API until the export job is complete 1806 sleep_time = 10 1807 total_time = 0 1808 if verbose: 1809 print(f'Polling every {sleep_time} seconds for export job to complete') 1810 while True: 1811 time.sleep(sleep_time) 1812 total_time += sleep_time 1813 if verbose: 1814 print(f'Total time elapsed: {total_time} seconds') 1815 1816 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1817 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1818 1819 response_data = send_graphql_query(token, organization_context, query, variables) 1820 1821 if verbose: 1822 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1823 1824 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1825 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1826 if verbose: 1827 print( 1828 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1829 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1830 1831 1832def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1833 """ 1834 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1835 Args: 1836 token (str): 1837 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1838 organization_context (str): 1839 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1840 asset_version_id (str, optional): 1841 Asset Version ID to get software components for. 1842 type (str, optional): 1843 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1844 Raises: 1845 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1846 Returns: 1847 list: List of Software Component Objects 1848 """ 1849 if not asset_version_id: 1850 raise Exception("Asset Version ID is required") 1851 1852 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1853 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1854 type=type), 1855 'allSoftwareComponentInstances') 1856 1857 1858def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1859 case_sensitive=False) -> list: 1860 """ 1861 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1862 Search Methods: EXACT or CONTAINS 1863 An exact match will return only the software component whose name matches the name exactly. 1864 A contains match will return all software components whose name contains the search string. 1865 1866 Args: 1867 token (str): 1868 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1869 organization_context (str): 1870 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1871 name (str, required): 1872 Name of the software component to search for. 1873 version (str, optional): 1874 Version of the software component to search for. If not specified, will search for all versions of the software component. 1875 asset_version_id (str, optional): 1876 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1877 search_method (str, optional): 1878 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1879 case_sensitive (bool, optional): 1880 Whether or not to perform a case sensitive search. Defaults to False. 1881 Raises: 1882 ValueError: Raised if name is not provided. 1883 Exception: Raised if the query fails. 1884 Returns: 1885 list: List of SoftwareComponentInstance Objects 1886 """ 1887 if asset_version_id: 1888 query = """ 1889query GetSoftwareComponentInstances_SDK( 1890 $filter: SoftwareComponentInstanceFilter 1891 $after: String 1892 $first: Int 1893) { 1894 allSoftwareComponentInstances( 1895 filter: $filter 1896 after: $after 1897 first: $first 1898 ) { 1899 _cursor 1900 id 1901 name 1902 version 1903 originalComponents { 1904 id 1905 name 1906 version 1907 } 1908 } 1909} 1910""" 1911 else: 1912 # gets the asset version info that contains the software component 1913 query = """ 1914query GetSoftwareComponentInstances_SDK( 1915 $filter: SoftwareComponentInstanceFilter 1916 $after: String 1917 $first: Int 1918) { 1919 allSoftwareComponentInstances( 1920 filter: $filter 1921 after: $after 1922 first: $first 1923 ) { 1924 _cursor 1925 id 1926 name 1927 version 1928 assetVersion { 1929 id 1930 name 1931 asset { 1932 id 1933 name 1934 } 1935 } 1936 } 1937} 1938""" 1939 1940 variables = { 1941 "filter": { 1942 "mergedComponentRefId": None 1943 }, 1944 "after": None, 1945 "first": 100 1946 } 1947 1948 if asset_version_id: 1949 variables["filter"]["assetVersionRefId"] = asset_version_id 1950 1951 if search_method == 'EXACT': 1952 if case_sensitive: 1953 variables["filter"]["name"] = name 1954 else: 1955 variables["filter"]["name_like"] = name 1956 elif search_method == 'CONTAINS': 1957 variables["filter"]["name_contains"] = name 1958 1959 if version: 1960 if search_method == 'EXACT': 1961 variables["filter"]["version"] = version 1962 elif search_method == 'CONTAINS': 1963 variables["filter"]["version_contains"] = version 1964 1965 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1966 field="allSoftwareComponentInstances") 1967 1968 return records 1969 1970 1971def send_graphql_query(token, organization_context, query, variables=None): 1972 """ 1973 Send a GraphQL query to the API 1974 1975 Args: 1976 token (str): 1977 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1978 organization_context (str): 1979 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1980 query (str): 1981 The GraphQL query string 1982 variables (dict, optional): 1983 Variables to be used in the GraphQL query, by default None 1984 1985 Raises: 1986 Exception: If the response status code is not 200 1987 1988 Returns: 1989 dict: Response JSON 1990 """ 1991 headers = { 1992 'Content-Type': 'application/json', 1993 'Authorization': f'Bearer {token}', 1994 'Organization-Context': organization_context 1995 } 1996 data = { 1997 'query': query, 1998 'variables': variables 1999 } 2000 2001 response = requests.post(API_URL, headers=headers, json=data) 2002 2003 if response.status_code == 200: 2004 thejson = response.json() 2005 2006 if "errors" in thejson: 2007 raise Exception(f"Error: {thejson['errors']}") 2008 2009 return thejson 2010 else: 2011 raise Exception(f"Error: {response.status_code} - {response.text}") 2012 2013 2014def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2015 justification=None, response=None, comment=None): 2016 """ 2017 Updates the status of a findings or multiple findings. This is a blocking call. 2018 2019 Args: 2020 token (str): 2021 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2022 organization_context (str): 2023 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2024 user_id (str, required): 2025 User ID to update the finding status for. 2026 finding_ids (str, required): 2027 Finding ID to update the status for. 2028 status (str, required): 2029 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2030 justification (str, optional): 2031 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2032 response (str, optional): 2033 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2034 comment (str, optional): 2035 Optional comment to add to the finding status update. 2036 2037 Raises: 2038 ValueError: Raised if required parameters are not provided. 2039 Exception: Raised if the query fails. 2040 2041 Returns: 2042 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2043 """ 2044 if not user_id: 2045 raise ValueError("User Id is required") 2046 if not finding_ids: 2047 raise ValueError("Finding Ids is required") 2048 if not status: 2049 raise ValueError("Status is required") 2050 2051 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2052 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2053 justification=justification, response=response, 2054 comment=comment) 2055 2056 return send_graphql_query(token, organization_context, mutation, variables) 2057 2058 2059def upload_file_for_binary_analysis( 2060 token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False 2061): 2062 """ 2063 Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. 2064 NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that. 2065 2066 Args: 2067 token (str): 2068 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2069 organization_context (str): 2070 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2071 test_id (str, required): 2072 Test ID to upload the file for. 2073 file_path (str, required): 2074 Local path to the file to upload. 2075 chunk_size (int, optional): 2076 The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB. 2077 quick_scan (bool, optional): 2078 If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation. 2079 enable_bandit_scan (bool, optional): 2080 If True, will create an additional bandit scan in addition to the default binary analysis scan. 2081 2082 Raises: 2083 ValueError: Raised if test_id or file_path are not provided. 2084 Exception: Raised if the query fails. 2085 2086 Returns: 2087 dict: The response from the GraphQL query, a completeMultipartUpload Object. 2088 """ 2089 # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation 2090 if not test_id: 2091 raise ValueError("Test Id is required") 2092 if not file_path: 2093 raise ValueError("File Path is required") 2094 if chunk_size < MIN_CHUNK_SIZE: 2095 raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes") 2096 if chunk_size >= MAX_CHUNK_SIZE: 2097 raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes") 2098 2099 # Start Multi-part Upload 2100 graphql_query = """ 2101 mutation Start_SDK($testId: ID!) { 2102 startMultipartUploadV2(testId: $testId) { 2103 uploadId 2104 key 2105 } 2106 } 2107 """ 2108 2109 variables = { 2110 "testId": test_id 2111 } 2112 2113 response = send_graphql_query(token, organization_context, graphql_query, variables) 2114 2115 upload_id = response['data']['startMultipartUploadV2']['uploadId'] 2116 upload_key = response['data']['startMultipartUploadV2']['key'] 2117 2118 # if the file is greater than max chunk size (or 5 GB), split the file in chunks, 2119 # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part) 2120 # and upload the file to the returned upload URL 2121 i = 0 2122 part_data = [] 2123 for chunk in file_chunks(file_path, chunk_size): 2124 i = i + 1 2125 graphql_query = """ 2126 mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) { 2127 generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) { 2128 key 2129 uploadUrl 2130 } 2131 } 2132 """ 2133 2134 variables = { 2135 "partNumber": i, 2136 "uploadId": upload_id, 2137 "uploadKey": upload_key 2138 } 2139 2140 response = send_graphql_query(token, organization_context, graphql_query, variables) 2141 2142 chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl'] 2143 2144 # upload the chunk to the upload URL 2145 response = upload_bytes_to_url(chunk_upload_url, chunk) 2146 2147 part_data.append({ 2148 "ETag": response.headers['ETag'], 2149 "PartNumber": i 2150 }) 2151 2152 # call completeMultipartUploadV2 2153 graphql_query = """ 2154 mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) { 2155 completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) { 2156 key 2157 } 2158 } 2159 """ 2160 2161 variables = { 2162 "partData": part_data, 2163 "uploadId": upload_id, 2164 "uploadKey": upload_key 2165 } 2166 2167 response = send_graphql_query(token, organization_context, graphql_query, variables) 2168 2169 # get key from the result 2170 key = response['data']['completeMultipartUploadV2']['key'] 2171 2172 variables = { 2173 "key": key, 2174 "testId": test_id 2175 } 2176 2177 # call launchBinaryUploadProcessing 2178 if quick_scan or enable_bandit_scan: 2179 graphql_query = """ 2180 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) { 2181 launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) { 2182 key 2183 newBanditScanId 2184 } 2185 } 2186 """ 2187 else: 2188 graphql_query = """ 2189 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) { 2190 launchBinaryUploadProcessing(key: $key, testId: $testId) { 2191 key 2192 newBanditScanId 2193 } 2194 } 2195 """ 2196 2197 if quick_scan: 2198 variables["configurationOptions"] = ["QUICK_SCAN"] 2199 2200 if enable_bandit_scan: 2201 config_options = variables.get("configurationOptions", []) 2202 config_options.append("ENABLE_BANDIT_SCAN") 2203 variables["configurationOptions"] = config_options 2204 2205 response = send_graphql_query(token, organization_context, graphql_query, variables) 2206 2207 return response['data'] 2208 2209 2210def upload_test_results_file(token, organization_context, test_id=None, file_path=None): 2211 """ 2212 Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that. 2213 2214 Args: 2215 token (str): 2216 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2217 organization_context (str): 2218 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2219 test_id (str, required): 2220 Test ID to upload the file for. 2221 file_path (str, required): 2222 Local path to the file to upload. 2223 2224 Raises: 2225 ValueError: Raised if test_id or file_path are not provided. 2226 Exception: Raised if the query fails. 2227 2228 Returns: 2229 dict: The response from the GraphQL query, a completeTestResultUpload Object. 2230 """ 2231 if not test_id: 2232 raise ValueError("Test Id is required") 2233 if not file_path: 2234 raise ValueError("File Path is required") 2235 2236 # Gerneate Test Result Upload URL 2237 graphql_query = """ 2238 mutation GenerateTestResultUploadUrl_SDK($testId: ID!) { 2239 generateSinglePartUploadUrl(testId: $testId) { 2240 uploadUrl 2241 key 2242 } 2243 } 2244 """ 2245 2246 variables = { 2247 "testId": test_id 2248 } 2249 2250 response = send_graphql_query(token, organization_context, graphql_query, variables) 2251 2252 # get the upload URL and key 2253 upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl'] 2254 key = response['data']['generateSinglePartUploadUrl']['key'] 2255 2256 # upload the file 2257 upload_file_to_url(upload_url, file_path) 2258 2259 # complete the upload 2260 graphql_query = """ 2261 mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) { 2262 launchTestResultProcessing(key: $key, testId: $testId) { 2263 key 2264 } 2265 } 2266 """ 2267 2268 variables = { 2269 "testId": test_id, 2270 "key": key 2271 } 2272 2273 response = send_graphql_query(token, organization_context, graphql_query, variables) 2274 return response['data'] 2275 2276 2277def upload_bytes_to_url(url, bytes): 2278 """ 2279 Used for uploading a file to a pre-signed S3 URL 2280 2281 Args: 2282 url (str): 2283 (Pre-signed S3) URL 2284 bytes (bytes): 2285 Bytes to upload 2286 2287 Raises: 2288 Exception: If the response status code is not 200 2289 2290 Returns: 2291 requests.Response: Response object 2292 """ 2293 response = requests.put(url, data=bytes) 2294 2295 if response.status_code == 200: 2296 return response 2297 else: 2298 raise Exception(f"Error: {response.status_code} - {response.text}") 2299 2300 2301def upload_file_to_url(url, file_path): 2302 """ 2303 Used for uploading a file to a pre-signed S3 URL 2304 2305 Args: 2306 url (str): 2307 (Pre-signed S3) URL 2308 file_path (str): 2309 Local path to file to upload 2310 2311 Raises: 2312 Exception: If the response status code is not 200 2313 2314 Returns: 2315 requests.Response: Response object 2316 """ 2317 with open(file_path, 'rb') as file: 2318 response = requests.put(url, data=file) 2319 2320 if response.status_code == 200: 2321 return response 2322 else: 2323 raise Exception(f"Error: {response.status_code} - {response.text}")
DEFAULT CHUNK SIZE: 1000 MiB
MAX CHUNK SIZE: 2 GiB
MIN CHUNK SIZE: 5 MiB
28class UploadMethod(Enum): 29 """ 30 Enumeration class representing different upload methods. 31 32 Attributes: 33 WEB_APP_UI: Upload method via web application UI. 34 API: Upload method via API. 35 GITHUB_INTEGRATION: Upload method via GitHub integration. 36 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 37 38 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 39 """ 40 WEB_APP_UI = "WEB_APP_UI" 41 API = "API" 42 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 43 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
Enumeration class representing different upload methods.
Attributes:
- WEB_APP_UI: Upload method via web application UI.
- API: Upload method via API.
- GITHUB_INTEGRATION: Upload method via GitHub integration.
- AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
To use any value from this enumeration, use UploadMethod.
Inherited Members
- enum.Enum
- name
- value
46def create_artifact( 47 token, 48 organization_context, 49 business_unit_id=None, 50 created_by_user_id=None, 51 asset_version_id=None, 52 artifact_name=None, 53 product_id=None, 54): 55 """ 56 Create a new Artifact. 57 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 58 Please see the examples in the Github repository for more information: 59 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 60 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 61 62 Args: 63 token (str): 64 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 65 organization_context (str): 66 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 67 business_unit_id (str, required): 68 Business Unit ID to associate the artifact with. 69 created_by_user_id (str, required): 70 User ID of the user creating the artifact. 71 asset_version_id (str, required): 72 Asset Version ID to associate the artifact with. 73 artifact_name (str, required): 74 The name of the Artifact being created. 75 product_id (str, optional): 76 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 77 78 Raises: 79 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 80 Exception: Raised if the query fails. 81 82 Returns: 83 dict: createArtifact Object 84 """ 85 if not business_unit_id: 86 raise ValueError("Business unit ID is required") 87 if not created_by_user_id: 88 raise ValueError("Created by user ID is required") 89 if not asset_version_id: 90 raise ValueError("Asset version ID is required") 91 if not artifact_name: 92 raise ValueError("Artifact name is required") 93 94 graphql_query = """ 95 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 96 createArtifact(input: $input) { 97 id 98 name 99 assetVersion { 100 id 101 name 102 asset { 103 id 104 name 105 } 106 } 107 createdBy { 108 id 109 email 110 } 111 ctx { 112 asset 113 products 114 businessUnits 115 } 116 } 117 } 118 """ 119 120 # Asset name, business unit context, and creating user are required 121 variables = { 122 "input": { 123 "name": artifact_name, 124 "createdBy": created_by_user_id, 125 "assetVersion": asset_version_id, 126 "ctx": { 127 "asset": asset_version_id, 128 "businessUnits": [business_unit_id] 129 } 130 } 131 } 132 133 if product_id is not None: 134 variables["input"]["ctx"]["products"] = product_id 135 136 response = send_graphql_query(token, organization_context, graphql_query, variables) 137 return response['data']
Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the artifact with.
- created_by_user_id (str, required): User ID of the user creating the artifact.
- asset_version_id (str, required): Asset Version ID to associate the artifact with.
- artifact_name (str, required): The name of the Artifact being created.
- product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createArtifact Object
140def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 141 """ 142 Create a new Asset. 143 144 Args: 145 token (str): 146 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 147 organization_context (str): 148 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 149 business_unit_id (str, required): 150 Business Unit ID to associate the asset with. 151 created_by_user_id (str, required): 152 User ID of the user creating the asset. 153 asset_name (str, required): 154 The name of the Asset being created. 155 product_id (str, optional): 156 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 157 158 Raises: 159 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 160 Exception: Raised if the query fails. 161 162 Returns: 163 dict: createAsset Object 164 """ 165 if not business_unit_id: 166 raise ValueError("Business unit ID is required") 167 if not created_by_user_id: 168 raise ValueError("Created by user ID is required") 169 if not asset_name: 170 raise ValueError("Asset name is required") 171 172 graphql_query = """ 173 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 174 createAsset(input: $input) { 175 id 176 name 177 dependentProducts { 178 id 179 name 180 } 181 group { 182 id 183 name 184 } 185 createdBy { 186 id 187 email 188 } 189 ctx { 190 asset 191 products 192 businessUnits 193 } 194 } 195 } 196 """ 197 198 # Asset name, business unit context, and creating user are required 199 variables = { 200 "input": { 201 "name": asset_name, 202 "group": business_unit_id, 203 "createdBy": created_by_user_id, 204 "ctx": { 205 "businessUnits": [business_unit_id] 206 } 207 } 208 } 209 210 if product_id is not None: 211 variables["input"]["ctx"]["products"] = product_id 212 213 response = send_graphql_query(token, organization_context, graphql_query, variables) 214 return response['data']
Create a new Asset.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset with.
- created_by_user_id (str, required): User ID of the user creating the asset.
- asset_name (str, required): The name of the Asset being created.
- product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAsset Object
217def create_asset_version( 218 token, 219 organization_context, 220 business_unit_id=None, 221 created_by_user_id=None, 222 asset_id=None, 223 asset_version_name=None, 224 product_id=None, 225): 226 """ 227 Create a new Asset Version. 228 229 Args: 230 token (str): 231 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 232 organization_context (str): 233 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 234 business_unit_id (str, required): 235 Business Unit ID to associate the asset version with. 236 created_by_user_id (str, required): 237 User ID of the user creating the asset version. 238 asset_id (str, required): 239 Asset ID to associate the asset version with. 240 asset_version_name (str, required): 241 The name of the Asset Version being created. 242 product_id (str, optional): 243 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 244 245 Raises: 246 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 247 Exception: Raised if the query fails. 248 249 Returns: 250 dict: createAssetVersion Object 251 252 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 253 """ 254 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 255 if not business_unit_id: 256 raise ValueError("Business unit ID is required") 257 if not created_by_user_id: 258 raise ValueError("Created by user ID is required") 259 if not asset_id: 260 raise ValueError("Asset ID is required") 261 if not asset_version_name: 262 raise ValueError("Asset version name is required") 263 264 graphql_query = """ 265 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 266 createAssetVersion(input: $input) { 267 id 268 name 269 asset { 270 id 271 name 272 } 273 createdBy { 274 id 275 email 276 } 277 ctx { 278 asset 279 products 280 businessUnits 281 } 282 } 283 } 284 """ 285 286 # Asset name, business unit context, and creating user are required 287 variables = { 288 "input": { 289 "name": asset_version_name, 290 "createdBy": created_by_user_id, 291 "asset": asset_id, 292 "ctx": { 293 "asset": asset_id, 294 "businessUnits": [business_unit_id] 295 } 296 } 297 } 298 299 if product_id is not None: 300 variables["input"]["ctx"]["products"] = product_id 301 302 response = send_graphql_query(token, organization_context, graphql_query, variables) 303 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset version with.
- created_by_user_id (str, required): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
- product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
306def create_asset_version_on_asset( 307 token, 308 organization_context, 309 created_by_user_id=None, 310 asset_id=None, 311 asset_version_name=None, 312 product_id=None, 313): 314 """ 315 Create a new Asset Version. 316 317 Args: 318 token (str): 319 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 320 organization_context (str): 321 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 322 created_by_user_id (str, optional): 323 User ID of the user creating the asset version. 324 asset_id (str, required): 325 Asset ID to associate the asset version with. 326 asset_version_name (str, required): 327 The name of the Asset Version being created. 328 329 Raises: 330 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 331 Exception: Raised if the query fails. 332 333 Returns: 334 dict: createAssetVersion Object 335 """ 336 if not asset_id: 337 raise ValueError("Asset ID is required") 338 if not asset_version_name: 339 raise ValueError("Asset version name is required") 340 341 graphql_query = """ 342 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) { 343 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) { 344 id 345 assetVersion { 346 id 347 } 348 } 349 } 350 """ 351 352 # Asset name, business unit context, and creating user are required 353 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 354 355 if created_by_user_id: 356 variables["createdByUserId"] = created_by_user_id 357 358 if product_id: 359 variables["productId"] = product_id 360 361 response = send_graphql_query(token, organization_context, graphql_query, variables) 362 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- created_by_user_id (str, optional): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
365def create_new_asset_version_artifact_and_test_for_upload( 366 token, 367 organization_context, 368 business_unit_id=None, 369 created_by_user_id=None, 370 asset_id=None, 371 version=None, 372 product_id=None, 373 test_type=None, 374 artifact_description=None, 375 upload_method: UploadMethod = UploadMethod.API, 376): 377 """ 378 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 379 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 380 381 Args: 382 token (str): 383 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 384 organization_context (str): 385 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 386 business_unit_id (str, optional): 387 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 388 created_by_user_id (str, optional): 389 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 390 asset_id (str, required): 391 Asset ID to create the asset version for. If not provided, the default asset will be used. 392 version (str, required): 393 Version to create the asset version for. 394 product_id (str, optional): 395 Product ID to create the entities for. If not provided, the default product will be used. 396 test_type (str, required): 397 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 398 artifact_description (str, optional): 399 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 400 upload_method (UploadMethod, optional): 401 The method of uploading the test results. Default is UploadMethod.API. 402 403 404 Raises: 405 ValueError: Raised if asset_id or version are not provided. 406 Exception: Raised if the query fails. 407 408 Returns: 409 str: The Test ID of the newly created test that is used for uploading the file. 410 """ 411 if not asset_id: 412 raise ValueError("Asset ID is required") 413 if not version: 414 raise ValueError("Version is required") 415 416 assets = get_all_assets(token, organization_context, asset_id=asset_id) 417 if not assets: 418 raise ValueError("No assets found with the provided asset ID") 419 asset = assets[0] 420 421 # get the asset name 422 asset_name = asset['name'] 423 424 # get the existing asset product IDs 425 asset_product_ids = asset['ctx']['products'] 426 427 # get the asset product ID 428 if product_id and product_id not in asset_product_ids: 429 asset_product_ids.append(product_id) 430 431 # if business_unit_id or created_by_user_id are not provided, get the existing asset 432 if not business_unit_id or not created_by_user_id: 433 if not business_unit_id: 434 business_unit_id = asset['group']['id'] 435 if not created_by_user_id: 436 created_by_user_id = asset['createdBy']['id'] 437 438 if not business_unit_id: 439 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 440 if not created_by_user_id: 441 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 442 443 # create the asset version 444 response = create_asset_version_on_asset( 445 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id 446 ) 447 # get the asset version ID 448 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 449 450 # create the test 451 if test_type == "finite_state_binary_analysis": 452 # create the artifact 453 if not artifact_description: 454 artifact_description = "Binary" 455 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 456 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 457 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 458 artifact_name=binary_artifact_name, product_id=asset_product_ids) 459 460 # get the artifact ID 461 binary_artifact_id = response['createArtifact']['id'] 462 463 # create the test 464 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 465 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 466 created_by_user_id=created_by_user_id, asset_id=asset_id, 467 artifact_id=binary_artifact_id, product_id=asset_product_ids, 468 test_name=test_name, upload_method=upload_method) 469 test_id = response['createTest']['id'] 470 return test_id 471 472 else: 473 # create the artifact 474 if not artifact_description: 475 artifact_description = "Unspecified Artifact" 476 artifact_name = f"{asset_name} {version} - {artifact_description}" 477 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 478 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 479 artifact_name=artifact_name, product_id=asset_product_ids) 480 481 # get the artifact ID 482 binary_artifact_id = response['createArtifact']['id'] 483 484 # create the test 485 test_name = f"{asset_name} {version} - {test_type}" 486 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 487 created_by_user_id=created_by_user_id, asset_id=asset_id, 488 artifact_id=binary_artifact_id, product_id=asset_product_ids, 489 test_name=test_name, test_type=test_type, 490 upload_method=upload_method) 491 test_id = response['createTest']['id'] 492 return test_id
Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
- created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
- version (str, required): Version to create the asset version for.
- product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
- test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
- artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if asset_id or version are not provided.
- Exception: Raised if the query fails.
Returns:
str: The Test ID of the newly created test that is used for uploading the file.
495def create_new_asset_version_and_upload_binary( 496 token, 497 organization_context, 498 business_unit_id=None, 499 created_by_user_id=None, 500 asset_id=None, 501 version=None, 502 file_path=None, 503 product_id=None, 504 artifact_description=None, 505 quick_scan=False, 506 upload_method: UploadMethod = UploadMethod.API, 507 enable_bandit_scan: bool = False, 508): 509 """ 510 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 511 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 512 513 Args: 514 token (str): 515 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 516 organization_context (str): 517 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 518 business_unit_id (str, optional): 519 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 520 created_by_user_id (str, optional): 521 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 522 asset_id (str, required): 523 Asset ID to create the asset version for. 524 version (str, required): 525 Version to create the asset version for. 526 file_path (str, required): 527 Local path to the file to upload. 528 product_id (str, optional): 529 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 530 artifact_description (str, optional): 531 Description of the artifact. If not provided, the default is "Firmware Binary". 532 quick_scan (bool, optional): 533 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 534 upload_method (UploadMethod, optional): 535 The method of uploading the test results. Default is UploadMethod.API. 536 enable_bandit_scan (bool, optional): 537 If True, will create an additional bandit scan in addition to the default binary analysis scan. 538 539 Raises: 540 ValueError: Raised if asset_id, version, or file_path are not provided. 541 Exception: Raised if any of the queries fail. 542 543 Returns: 544 dict: The response from the GraphQL query, a createAssetVersion Object. 545 """ 546 if not asset_id: 547 raise ValueError("Asset ID is required") 548 if not version: 549 raise ValueError("Version is required") 550 if not file_path: 551 raise ValueError("File path is required") 552 553 # create the asset version and binary test 554 if not artifact_description: 555 artifact_description = "Firmware Binary" 556 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 557 token, 558 organization_context, 559 business_unit_id=business_unit_id, 560 created_by_user_id=created_by_user_id, 561 asset_id=asset_id, 562 version=version, 563 product_id=product_id, 564 test_type="finite_state_binary_analysis", 565 artifact_description=artifact_description, 566 upload_method=upload_method, 567 ) 568 569 # upload file for binary test 570 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 571 quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan) 572 return response
Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Local path to the file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
- artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
- quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
- enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
- ValueError: Raised if asset_id, version, or file_path are not provided.
- Exception: Raised if any of the queries fail.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
575def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 576 created_by_user_id=None, asset_id=None, version=None, 577 file_path=None, product_id=None, test_type=None, 578 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 579 """ 580 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 581 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 582 583 Args: 584 token (str): 585 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 586 organization_context (str): 587 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 588 business_unit_id (str, optional): 589 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 590 created_by_user_id (str, optional): 591 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 592 asset_id (str, required): 593 Asset ID to create the asset version for. 594 version (str, required): 595 Version to create the asset version for. 596 file_path (str, required): 597 Path to the test results file to upload. 598 product_id (str, optional): 599 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 600 test_type (str, required): 601 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 602 artifact_description (str, optional): 603 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 604 upload_method (UploadMethod, optional): 605 The method of uploading the test results. Default is UploadMethod.API. 606 607 Raises: 608 ValueError: If the asset_id, version, or file_path are not provided. 609 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 610 611 Returns: 612 dict: The response from the GraphQL query, a createAssetVersion Object. 613 """ 614 if not asset_id: 615 raise ValueError("Asset ID is required") 616 if not version: 617 raise ValueError("Version is required") 618 if not file_path: 619 raise ValueError("File path is required") 620 if not test_type: 621 raise ValueError("Test type is required") 622 623 # create the asset version and test 624 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 625 business_unit_id=business_unit_id, 626 created_by_user_id=created_by_user_id, 627 asset_id=asset_id, version=version, 628 product_id=product_id, test_type=test_type, 629 artifact_description=artifact_description, 630 upload_method=upload_method) 631 632 # upload test results file 633 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 634 return response
Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Path to the test results file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
- test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
- artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: If the asset_id, version, or file_path are not provided.
- Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
637def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 638 product_description=None, vendor_id=None, vendor_name=None): 639 """ 640 Create a new Product. 641 642 Args: 643 token (str): 644 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 645 organization_context (str): 646 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 647 business_unit_id (str, required): 648 Business Unit ID to associate the product with. 649 created_by_user_id (str, required): 650 User ID of the user creating the product. 651 product_name (str, required): 652 The name of the Product being created. 653 product_description (str, optional): 654 The description of the Product being created. 655 vendor_id (str, optional): 656 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 657 vendor_name (str, optional): 658 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 659 660 Raises: 661 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 662 Exception: Raised if the query fails. 663 664 Returns: 665 dict: createProduct Object 666 """ 667 668 if not business_unit_id: 669 raise ValueError("Business unit ID is required") 670 if not created_by_user_id: 671 raise ValueError("Created by user ID is required") 672 if not product_name: 673 raise ValueError("Product name is required") 674 675 graphql_query = """ 676 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 677 createProduct(input: $input) { 678 id 679 name 680 vendor { 681 name 682 } 683 group { 684 id 685 name 686 } 687 createdBy { 688 id 689 email 690 } 691 ctx { 692 businessUnit 693 } 694 } 695 } 696 """ 697 698 # Product name, business unit context, and creating user are required 699 variables = { 700 "input": { 701 "name": product_name, 702 "group": business_unit_id, 703 "createdBy": created_by_user_id, 704 "ctx": { 705 "businessUnit": business_unit_id 706 } 707 } 708 } 709 710 if product_description is not None: 711 variables["input"]["description"] = product_description 712 713 # If the vendor ID is specified, this will link the new product to the existing vendor 714 if vendor_id is not None: 715 variables["input"]["vendor"] = { 716 "id": vendor_id 717 } 718 719 # If the vendor name is specified, this will create a new vendor and link it to the new product 720 if vendor_name is not None: 721 variables["input"]["createVendor"] = { 722 "name": vendor_name 723 } 724 725 response = send_graphql_query(token, organization_context, graphql_query, variables) 726 727 return response['data']
Create a new Product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the product with.
- created_by_user_id (str, required): User ID of the user creating the product.
- product_name (str, required): The name of the Product being created.
- product_description (str, optional): The description of the Product being created.
- vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
- vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createProduct Object
730def create_test( 731 token, 732 organization_context, 733 business_unit_id=None, 734 created_by_user_id=None, 735 asset_id=None, 736 artifact_id=None, 737 test_name=None, 738 product_id=None, 739 test_type=None, 740 tools=[], 741 upload_method: UploadMethod = UploadMethod.API, 742): 743 """ 744 Create a new Test object for uploading files. 745 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 746 Please see the examples in the Github repository for more information: 747 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 748 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 749 750 Args: 751 token (str): 752 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 753 organization_context (str): 754 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 755 business_unit_id (str, required): 756 Business Unit ID to associate the Test with. 757 created_by_user_id (str, required): 758 User ID of the user creating the Test. 759 asset_id (str, required): 760 Asset ID to associate the Test with. 761 artifact_id (str, required): 762 Artifact ID to associate the Test with. 763 test_name (str, required): 764 The name of the Test being created. 765 product_id (str, optional): 766 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 767 test_type (str, required): 768 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 769 tools (list, optional): 770 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 771 upload_method (UploadMethod, required): 772 The method of uploading the test results. 773 774 Raises: 775 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 776 Exception: Raised if the query fails. 777 778 Returns: 779 dict: createTest Object 780 """ 781 if not business_unit_id: 782 raise ValueError("Business unit ID is required") 783 if not created_by_user_id: 784 raise ValueError("Created by user ID is required") 785 if not asset_id: 786 raise ValueError("Asset ID is required") 787 if not artifact_id: 788 raise ValueError("Artifact ID is required") 789 if not test_name: 790 raise ValueError("Test name is required") 791 if not test_type: 792 raise ValueError("Test type is required") 793 794 graphql_query = """ 795 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 796 createTest(input: $input) { 797 id 798 name 799 artifactUnderTest { 800 id 801 name 802 assetVersion { 803 id 804 name 805 asset { 806 id 807 name 808 dependentProducts { 809 id 810 name 811 } 812 } 813 } 814 } 815 createdBy { 816 id 817 email 818 } 819 ctx { 820 asset 821 products 822 businessUnits 823 } 824 uploadMethod 825 } 826 } 827 """ 828 829 # Asset name, business unit context, and creating user are required 830 variables = { 831 "input": { 832 "name": test_name, 833 "createdBy": created_by_user_id, 834 "artifactUnderTest": artifact_id, 835 "testResultFileFormat": test_type, 836 "ctx": { 837 "asset": asset_id, 838 "businessUnits": [business_unit_id] 839 }, 840 "tools": tools, 841 "uploadMethod": upload_method.value 842 } 843 } 844 845 if product_id is not None: 846 variables["input"]["ctx"]["products"] = product_id 847 848 response = send_graphql_query(token, organization_context, graphql_query, variables) 849 return response['data']
Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
- tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
- upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
852def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 853 asset_id=None, artifact_id=None, test_name=None, product_id=None, 854 upload_method: UploadMethod = UploadMethod.API): 855 """ 856 Create a new Test object for uploading files for Finite State Binary Analysis. 857 858 Args: 859 token (str): 860 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 861 organization_context (str): 862 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 863 business_unit_id (str, required): 864 Business Unit ID to associate the Test with. 865 created_by_user_id (str, required): 866 User ID of the user creating the Test. 867 asset_id (str, required): 868 Asset ID to associate the Test with. 869 artifact_id (str, required): 870 Artifact ID to associate the Test with. 871 test_name (str, required): 872 The name of the Test being created. 873 product_id (str, optional): 874 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 875 upload_method (UploadMethod, optional): 876 The method of uploading the test results. Default is UploadMethod.API. 877 878 Raises: 879 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 880 Exception: Raised if the query fails. 881 882 Returns: 883 dict: createTest Object 884 """ 885 tools = [ 886 { 887 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 888 "name": "Finite State Binary Analysis" 889 } 890 ] 891 return create_test(token, organization_context, business_unit_id=business_unit_id, 892 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 893 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 894 tools=tools, upload_method=upload_method)
Create a new Test object for uploading files for Finite State Binary Analysis.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
897def create_test_as_cyclone_dx( 898 token, 899 organization_context, 900 business_unit_id=None, 901 created_by_user_id=None, 902 asset_id=None, 903 artifact_id=None, 904 test_name=None, 905 product_id=None, 906 upload_method: UploadMethod = UploadMethod.API, 907): 908 """ 909 Create a new Test object for uploading CycloneDX files. 910 911 Args: 912 token (str): 913 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 914 organization_context (str): 915 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 916 business_unit_id (str, required): 917 Business Unit ID to associate the Test with. 918 created_by_user_id (str, required): 919 User ID of the user creating the Test. 920 asset_id (str, required): 921 Asset ID to associate the Test with. 922 artifact_id (str, required): 923 Artifact ID to associate the Test with. 924 test_name (str, required): 925 The name of the Test being created. 926 product_id (str, optional): 927 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 928 upload_method (UploadMethod, optional): 929 The method of uploading the test results. Default is UploadMethod.API. 930 931 Raises: 932 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 933 Exception: Raised if the query fails. 934 935 Returns: 936 dict: createTest Object 937 """ 938 return create_test(token, organization_context, business_unit_id=business_unit_id, 939 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 940 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
Create a new Test object for uploading CycloneDX files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
943def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 944 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 945 upload_method: UploadMethod = UploadMethod.API): 946 """ 947 Create a new Test object for uploading Third Party Scanner files. 948 949 Args: 950 token (str): 951 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 952 organization_context (str): 953 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 954 business_unit_id (str, required): 955 Business Unit ID to associate the Test with. 956 created_by_user_id (str, required): 957 User ID of the user creating the Test. 958 asset_id (str, required): 959 Asset ID to associate the Test with. 960 artifact_id (str, required): 961 Artifact ID to associate the Test with. 962 test_name (str, required): 963 The name of the Test being created. 964 product_id (str, optional): 965 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 966 test_type (str, required): 967 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 968 upload_method (UploadMethod, optional): 969 The method of uploading the test results. Default is UploadMethod.API. 970 971 Raises: 972 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 973 Exception: Raised if the query fails. 974 975 Returns: 976 dict: createTest Object 977 """ 978 return create_test(token, organization_context, business_unit_id=business_unit_id, 979 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 980 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
Create a new Test object for uploading Third Party Scanner files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
983def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 984 report_subtype=None, output_filename=None, verbose=False): 985 """ 986 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 987 988 Args: 989 token (str): 990 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 991 organization_context (str): 992 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 993 asset_version_id (str, required): 994 The Asset Version ID to download the report for. 995 report_type (str, required): 996 The file type of the report to download. Valid values are "CSV" and "PDF". 997 report_subtype (str, required): 998 The type of report to download. Based on available reports for the `report_type` specified 999 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1000 Valid values for PDF are "RISK_SUMMARY". 1001 output_filename (str, optional): 1002 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1003 verbose (bool, optional): 1004 If True, will print additional information to the console. Defaults to False. 1005 1006 Raises: 1007 ValueError: Raised if required parameters are not provided. 1008 Exception: Raised if the query fails. 1009 1010 Returns: 1011 None 1012 """ 1013 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1014 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1015 1016 # Send an HTTP GET request to the URL 1017 response = requests.get(url) 1018 1019 # Check if the request was successful (status code 200) 1020 if response.status_code == 200: 1021 # Open a local file in binary write mode and write the content to it 1022 if verbose: 1023 print("File downloaded successfully.") 1024 with open(output_filename, 'wb') as file: 1025 file.write(response.content) 1026 if verbose: 1027 print(f'Wrote file to {output_filename}') 1028 else: 1029 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, required): The Asset Version ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1032def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1033 output_filename=None, verbose=False): 1034 """ 1035 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1036 1037 Args: 1038 token (str): 1039 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1040 organization_context (str): 1041 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1042 product_id (str, required): 1043 The Product ID to download the report for. 1044 report_type (str, required): 1045 The file type of the report to download. Valid values are "CSV". 1046 report_subtype (str, required): 1047 The type of report to download. Based on available reports for the `report_type` specified 1048 Valid values for CSV are "ALL_FINDINGS". 1049 output_filename (str, optional): 1050 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1051 verbose (bool, optional): 1052 If True, will print additional information to the console. Defaults to False. 1053 """ 1054 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1055 report_subtype=report_subtype, verbose=verbose) 1056 1057 # Send an HTTP GET request to the URL 1058 response = requests.get(url) 1059 1060 # Check if the request was successful (status code 200) 1061 if response.status_code == 200: 1062 # Open a local file in binary write mode and write the content to it 1063 if verbose: 1064 print("File downloaded successfully.") 1065 with open(output_filename, 'wb') as file: 1066 file.write(response.content) 1067 if verbose: 1068 print(f'Wrote file to {output_filename}') 1069 else: 1070 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, required): The Product ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
1073def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1074 output_filename="sbom.json", verbose=False): 1075 """ 1076 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1077 1078 Args: 1079 token (str): 1080 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1081 organization_context (str): 1082 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1083 sbom_type (str, required): 1084 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1085 sbom_subtype (str, required): 1086 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1087 asset_version_id (str, required): 1088 The Asset Version ID to download the SBOM for. 1089 output_filename (str, required): 1090 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1091 verbose (bool, optional): 1092 If True, will print additional information to the console. Defaults to False. 1093 1094 Raises: 1095 ValueError: Raised if required parameters are not provided. 1096 Exception: Raised if the query fails. 1097 1098 Returns: 1099 None 1100 """ 1101 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1102 asset_version_id=asset_version_id, verbose=verbose) 1103 1104 # Send an HTTP GET request to the URL 1105 response = requests.get(url) 1106 1107 # Check if the request was successful (status code 200) 1108 if response.status_code == 200: 1109 # Open a local file in binary write mode and write the content to it 1110 if verbose: 1111 print("File downloaded successfully.") 1112 with open(output_filename, 'wb') as file: 1113 file.write(response.content) 1114 if verbose: 1115 print(f'Wrote file to {output_filename}') 1116 else: 1117 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
- asset_version_id (str, required): The Asset Version ID to download the SBOM for.
- output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1120def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1121 """ 1122 Helper method to read a file in chunks. 1123 1124 Args: 1125 file_path (str): 1126 Local path to the file to read. 1127 chunk_size (int, optional): 1128 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1129 1130 Yields: 1131 bytes: The next chunk of the file. 1132 1133 Raises: 1134 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1135 """ 1136 with open(file_path, 'rb') as f: 1137 while True: 1138 chunk = f.read(chunk_size) 1139 if chunk: 1140 yield chunk 1141 else: 1142 break
Helper method to read a file in chunks.
Arguments:
- file_path (str): Local path to the file to read.
- chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:
bytes: The next chunk of the file.
Raises:
- FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1145def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1146 """ 1147 Get all artifacts in the organization. Uses pagination to get all results. 1148 1149 Args: 1150 token (str): 1151 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1152 organization_context (str): 1153 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1154 artifact_id (str, optional): 1155 An optional Artifact ID if this is used to get a single artifact, by default None 1156 business_unit_id (str, optional): 1157 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1158 1159 Raises: 1160 Exception: Raised if the query fails. 1161 1162 Returns: 1163 list: List of Artifact Objects 1164 """ 1165 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1166 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
Get all artifacts in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
- business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Artifact Objects
1169def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1170 """ 1171 Gets all assets in the organization. Uses pagination to get all results. 1172 1173 Args: 1174 token (str): 1175 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1176 organization_context (str): 1177 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1178 asset_id (str, optional): 1179 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1180 business_unit_id (str, optional): 1181 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1182 1183 Raises: 1184 Exception: Raised if the query fails. 1185 1186 Returns: 1187 list: List of Asset Objects 1188 """ 1189 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1190 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets all assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1193def get_all_asset_versions(token, organization_context): 1194 """ 1195 Get all asset versions in the organization. Uses pagination to get all results. 1196 1197 Args: 1198 token (str): 1199 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1200 organization_context (str): 1201 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1202 1203 Raises: 1204 Exception: Raised if the query fails. 1205 1206 Returns: 1207 list: List of AssetVersion Objects 1208 """ 1209 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1210 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
Get all asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1213def get_all_asset_versions_for_product(token, organization_context, product_id): 1214 """ 1215 Get all asset versions for a product. Uses pagination to get all results. 1216 1217 Args: 1218 token (str): 1219 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1220 organization_context (str): 1221 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1222 product_id (str): 1223 The Product ID to get asset versions for 1224 1225 Returns: 1226 list: List of AssetVersion Objects 1227 """ 1228 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1229 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Get all asset versions for a product. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str): The Product ID to get asset versions for
Returns:
list: List of AssetVersion Objects
1232def get_all_business_units(token, organization_context): 1233 """ 1234 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1235 1236 Args: 1237 token (str): 1238 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1239 organization_context (str): 1240 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1241 1242 Raises: 1243 Exception: Raised if the query fails. 1244 1245 Returns: 1246 list: List of Group Objects 1247 """ 1248 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1249 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Group Objects
1252def get_all_organizations(token, organization_context): 1253 """ 1254 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1255 1256 Args: 1257 token (str): 1258 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1259 organization_context (str): 1260 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1261 1262 Raises: 1263 Exception: Raised if the query fails. 1264 1265 Returns: 1266 list: List of Organization Objects 1267 """ 1268 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1269 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Organization Objects
1272def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1273 """ 1274 Get all results from a paginated GraphQL query 1275 1276 Args: 1277 token (str): 1278 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1279 organization_context (str): 1280 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1281 query (str): 1282 The GraphQL query string 1283 variables (dict, optional): 1284 Variables to be used in the GraphQL query, by default None 1285 field (str, required): 1286 The field in the response JSON that contains the results 1287 limit (int, Optional): 1288 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1289 1290 Raises: 1291 Exception: If the response status code is not 200, or if the field is not in the response JSON 1292 1293 Returns: 1294 list: List of results 1295 """ 1296 1297 if not field: 1298 raise Exception("Error: field is required") 1299 if limit and limit > 1000: 1300 raise Exception("Error: limit cannot be greater than 1000") 1301 if limit and limit < 1: 1302 raise Exception("Error: limit cannot be less than 1") 1303 if not variables["first"]: 1304 raise Exception("Error: first is required") 1305 if variables["first"] < 1: 1306 raise Exception("Error: first cannot be less than 1") 1307 if variables["first"] > 1000: 1308 raise Exception("Error: limit cannot be greater than 1000") 1309 1310 # query the API for the first page of results 1311 response_data = send_graphql_query(token, organization_context, query, variables) 1312 1313 # if there are no results, return an empty list 1314 if not response_data: 1315 return [] 1316 1317 # create a list to store the results 1318 results = [] 1319 1320 # add the first page of results to the list 1321 if field in response_data['data']: 1322 results.extend(response_data['data'][field]) 1323 else: 1324 raise Exception(f"Error: {field} not in response JSON") 1325 1326 if len(response_data['data'][field]) > 0: 1327 # get the cursor from the last entry in the list 1328 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1329 1330 while cursor: 1331 if limit and len(results) == limit: 1332 break 1333 1334 variables['after'] = cursor 1335 1336 # add the next page of results to the list 1337 response_data = send_graphql_query(token, organization_context, query, variables) 1338 results.extend(response_data['data'][field]) 1339 1340 try: 1341 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1342 except IndexError: 1343 # when there is no additional cursor, stop getting more pages 1344 cursor = None 1345 1346 return results
Get all results from a paginated GraphQL query
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
- field (str, required): The field in the response JSON that contains the results
- limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
- Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:
list: List of results
1349def get_all_products(token, organization_context): 1350 """ 1351 Get all products in the organization. Uses pagination to get all results. 1352 1353 Args: 1354 token (str): 1355 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1356 organization_context (str): 1357 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1358 1359 Raises: 1360 Exception: Raised if the query fails. 1361 1362 Returns: 1363 list: List of Product Objects 1364 1365 .. deprecated:: 0.1.4. Use get_products instead. 1366 """ 1367 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1368 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1369 queries.ALL_PRODUCTS['variables'], 'allProducts')
Get all products in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Product Objects
Deprecated since version 0.1.4. Use get_products instead..
1372def get_all_users(token, organization_context): 1373 """ 1374 Get all users in the organization. Uses pagination to get all results. 1375 1376 Args: 1377 token (str): 1378 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1379 organization_context (str): 1380 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1381 1382 Raises: 1383 Exception: Raised if the query fails. 1384 1385 Returns: 1386 list: List of User Objects 1387 """ 1388 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1389 queries.ALL_USERS['variables'], 'allUsers')
Get all users in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of User Objects
1392def get_artifact_context(token, organization_context, artifact_id): 1393 """ 1394 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1395 1396 Args: 1397 token (str): 1398 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1399 organization_context (str): 1400 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1401 1402 Raises: 1403 Exception: Raised if the query fails. 1404 1405 Returns: 1406 dict: Artifact Context Object 1407 """ 1408 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1409 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1410 1411 return artifact[0]['ctx']
Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
dict: Artifact Context Object
1414def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1415 """ 1416 Gets assets in the organization. Uses pagination to get all results. 1417 1418 Args: 1419 token (str): 1420 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1421 organization_context (str): 1422 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1423 asset_id (str, optional): 1424 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1425 business_unit_id (str, optional): 1426 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1427 1428 Raises: 1429 Exception: Raised if the query fails. 1430 1431 Returns: 1432 list: List of Asset Objects 1433 """ 1434 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1435 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1438def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1439 """ 1440 Gets asset versions in the organization. Uses pagination to get all results. 1441 1442 Args: 1443 token (str): 1444 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1445 organization_context (str): 1446 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1447 asset_version_id (str, optional): 1448 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1449 asset_id (str, optional): 1450 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1451 business_unit_id (str, optional): 1452 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1453 1454 Raises: 1455 Exception: Raised if the query fails. 1456 1457 Returns: 1458 list: List of AssetVersion Objects 1459 """ 1460 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1461 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1462 asset_id=asset_id, 1463 business_unit_id=business_unit_id), 1464 'allAssetVersions')
Gets asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
- asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1467def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1468 """ 1469 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1470 1471 Args: 1472 client_id (str): 1473 CLIENT_ID as specified in the API documentation 1474 client_secret (str): 1475 CLIENT_SECRET as specified in the API documentation 1476 token_url (str, optional): 1477 Token URL, by default TOKEN_URL 1478 audience (str, optional): 1479 Audience, by default AUDIENCE 1480 1481 Raises: 1482 Exception: If the response status code is not 200 1483 1484 Returns: 1485 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1486 """ 1487 payload = { 1488 "client_id": client_id, 1489 "client_secret": client_secret, 1490 "audience": AUDIENCE, 1491 "grant_type": "client_credentials" 1492 } 1493 1494 headers = { 1495 'content-type': "application/json" 1496 } 1497 1498 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1499 if response.status_code == 200: 1500 auth_token = response.json()['access_token'] 1501 else: 1502 raise Exception(f"Error: {response.status_code} - {response.text}") 1503 1504 return auth_token
Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
Arguments:
- client_id (str): CLIENT_ID as specified in the API documentation
- client_secret (str): CLIENT_SECRET as specified in the API documentation
- token_url (str, optional): Token URL, by default TOKEN_URL
- audience (str, optional): Audience, by default AUDIENCE
Raises:
- Exception: If the response status code is not 200
Returns:
str: Auth token. Use this token as the Authorization header in subsequent API calls.
1507def get_findings( 1508 token, 1509 organization_context, 1510 asset_version_id=None, 1511 finding_id=None, 1512 category=None, 1513 status=None, 1514 severity=None, 1515 count=False, 1516 limit=None, 1517): 1518 """ 1519 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1520 Args: 1521 token (str): 1522 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1523 organization_context (str): 1524 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1525 asset_version_id (str, optional): 1526 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1527 finding_id (str, optional): 1528 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1529 category (str, optional): 1530 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1531 This can be a single string, or an array of values. 1532 status (str, optional): 1533 The status of Findings to return. 1534 severity (str, optional): 1535 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1536 count (bool, optional): 1537 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1538 limit (int, optional): 1539 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1540 1541 Raises: 1542 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1543 1544 Returns: 1545 list: List of Finding Objects 1546 """ 1547 1548 if limit and limit > 1000: 1549 raise Exception("Error: limit must be less than 1000") 1550 if limit and limit < 1: 1551 raise Exception("Error: limit must be greater than 0") 1552 1553 if count: 1554 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1555 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1556 finding_id=finding_id, category=category, 1557 status=status, severity=severity, 1558 limit=limit))["data"]["_allFindingsMeta"] 1559 else: 1560 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1561 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1562 finding_id=finding_id, category=category, 1563 status=status, severity=severity, 1564 limit=limit), 'allFindings', limit=limit)
Gets all the Findings for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
- finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
- category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
- status (str, optional): The status of Findings to return.
- severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
- count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
- limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Finding Objects
1567def get_product_asset_versions(token, organization_context, product_id=None): 1568 """ 1569 Gets all the asset versions for a product. 1570 Args: 1571 token (str): 1572 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1573 organization_context (str): 1574 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1575 product_id (str, optional): 1576 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1577 Raises: 1578 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1579 Returns: 1580 list: List of AssetVersion Objects 1581 """ 1582 if not product_id: 1583 raise Exception("Product ID is required") 1584 1585 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1586 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Gets all the asset versions for a product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of AssetVersion Objects
1589def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1590 """ 1591 Gets all the products for the specified business unit. 1592 Args: 1593 token (str): 1594 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1595 organization_context (str): 1596 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1597 product_id (str, optional): 1598 Product ID to get. If not provided, will get all products in the organization. 1599 business_unit_id (str, optional): 1600 Business Unit ID to get products for. If not provided, will get all products in the organization. 1601 Raises: 1602 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1603 Returns: 1604 list: List of Product Objects 1605 """ 1606 1607 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1608 queries.GET_PRODUCTS['variables'](product_id=product_id, 1609 business_unit_id=business_unit_id), 1610 'allProducts')
Gets all the products for the specified business unit.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
- business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Product Objects
1613def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1614 report_subtype=None, verbose=False) -> str: 1615 """ 1616 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1617 This may take several minutes to complete, depending on the size of the report. 1618 1619 Args: 1620 token (str): 1621 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1622 organization_context (str): 1623 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1624 asset_version_id (str, optional): 1625 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1626 product_id (str, optional): 1627 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1628 report_type (str, required): 1629 The file type of the report to download. Valid values are "CSV" and "PDF". 1630 report_subtype (str, required): 1631 The type of report to download. Based on available reports for the `report_type` specified 1632 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1633 Valid values for PDF are "RISK_SUMMARY". 1634 verbose (bool, optional): 1635 If True, print additional information to the console. Defaults to False. 1636 """ 1637 if not report_type: 1638 raise ValueError("Report Type is required") 1639 if not report_subtype: 1640 raise ValueError("Report Subtype is required") 1641 if not asset_version_id and not product_id: 1642 raise ValueError("Asset Version ID or Product ID is required") 1643 1644 if asset_version_id and product_id: 1645 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1646 1647 if report_type not in ["CSV", "PDF"]: 1648 raise Exception(f"Report Type {report_type} not supported") 1649 1650 if report_type == "CSV": 1651 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1652 raise Exception(f"Report Subtype {report_subtype} not supported") 1653 1654 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1655 report_type=report_type, report_subtype=report_subtype) 1656 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1657 report_type=report_type, report_subtype=report_subtype) 1658 1659 response_data = send_graphql_query(token, organization_context, mutation, variables) 1660 if verbose: 1661 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1662 1663 # get exportJobId from the result 1664 if asset_version_id: 1665 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1666 elif product_id: 1667 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1668 else: 1669 raise Exception( 1670 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1671 1672 if verbose: 1673 print(f'Export Job ID: {export_job_id}') 1674 1675 if report_type == "PDF": 1676 if report_subtype not in ["RISK_SUMMARY"]: 1677 raise Exception(f"Report Subtype {report_subtype} not supported") 1678 1679 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1680 report_type=report_type, report_subtype=report_subtype) 1681 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1682 report_type=report_type, report_subtype=report_subtype) 1683 1684 response_data = send_graphql_query(token, organization_context, mutation, variables) 1685 if verbose: 1686 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1687 1688 # get exportJobId from the result 1689 if asset_version_id: 1690 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1691 elif product_id: 1692 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1693 else: 1694 raise Exception( 1695 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1696 1697 if verbose: 1698 print(f'Export Job ID: {export_job_id}') 1699 1700 if not export_job_id: 1701 raise Exception( 1702 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1703 1704 # poll the API until the export job is complete 1705 sleep_time = 10 1706 total_time = 0 1707 if verbose: 1708 print(f'Polling every {sleep_time} seconds for export job to complete') 1709 1710 while True: 1711 time.sleep(sleep_time) 1712 total_time += sleep_time 1713 if verbose: 1714 print(f'Total time elapsed: {total_time} seconds') 1715 1716 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1717 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1718 1719 response_data = send_graphql_query(token, organization_context, query, variables) 1720 1721 if verbose: 1722 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1723 1724 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1725 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1726 if verbose: 1727 print( 1728 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1729 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to download the report for. Either
asset_version_id
orproduct_id
are required. - product_id (str, optional): Product ID to download the report for. Either
asset_version_id
orproduct_id
are required. - report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - verbose (bool, optional): If True, print additional information to the console. Defaults to False.
1732def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1733 verbose=False) -> str: 1734 """ 1735 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1736 This may take several minutes to complete, depending on the size of SBOM. 1737 1738 Args: 1739 token (str): 1740 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1741 organization_context (str): 1742 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1743 sbom_type (str, required): 1744 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1745 sbom_subtype (str, required): 1746 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1747 asset_version_id (str, required): 1748 Asset Version ID to download the SBOM for. 1749 verbose (bool, optional): 1750 If True, print additional information to the console. Defaults to False. 1751 1752 Raises: 1753 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1754 Exception: Raised if the query fails. 1755 1756 Returns: 1757 str: URL to download the SBOM from. 1758 """ 1759 1760 if not sbom_type: 1761 raise ValueError("SBOM Type is required") 1762 if not sbom_subtype: 1763 raise ValueError("SBOM Subtype is required") 1764 if not asset_version_id: 1765 raise ValueError("Asset Version ID is required") 1766 1767 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1768 raise Exception(f"SBOM Type {sbom_type} not supported") 1769 1770 if sbom_type == "CYCLONEDX": 1771 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1772 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1773 1774 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1775 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1776 1777 response_data = send_graphql_query(token, organization_context, mutation, variables) 1778 if verbose: 1779 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1780 1781 # get exportJobId from the result 1782 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1783 if verbose: 1784 print(f'Export Job ID: {export_job_id}') 1785 1786 if sbom_type == "SPDX": 1787 if sbom_subtype not in ["SBOM_ONLY"]: 1788 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1789 1790 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1791 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1792 1793 response_data = send_graphql_query(token, organization_context, mutation, variables) 1794 if verbose: 1795 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1796 1797 # get exportJobId from the result 1798 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1799 if verbose: 1800 print(f'Export Job ID: {export_job_id}') 1801 1802 if not export_job_id: 1803 raise Exception( 1804 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1805 1806 # poll the API until the export job is complete 1807 sleep_time = 10 1808 total_time = 0 1809 if verbose: 1810 print(f'Polling every {sleep_time} seconds for export job to complete') 1811 while True: 1812 time.sleep(sleep_time) 1813 total_time += sleep_time 1814 if verbose: 1815 print(f'Total time elapsed: {total_time} seconds') 1816 1817 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1818 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1819 1820 response_data = send_graphql_query(token, organization_context, query, variables) 1821 1822 if verbose: 1823 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1824 1825 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1826 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1827 if verbose: 1828 print( 1829 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1830 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
- asset_version_id (str, required): Asset Version ID to download the SBOM for.
- verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
- Exception: Raised if the query fails.
Returns:
str: URL to download the SBOM from.
1833def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1834 """ 1835 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1836 Args: 1837 token (str): 1838 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1839 organization_context (str): 1840 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1841 asset_version_id (str, optional): 1842 Asset Version ID to get software components for. 1843 type (str, optional): 1844 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1845 Raises: 1846 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1847 Returns: 1848 list: List of Software Component Objects 1849 """ 1850 if not asset_version_id: 1851 raise Exception("Asset Version ID is required") 1852 1853 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1854 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1855 type=type), 1856 'allSoftwareComponentInstances')
Gets all the Software Components for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get software components for.
- type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Software Component Objects
1859def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1860 case_sensitive=False) -> list: 1861 """ 1862 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1863 Search Methods: EXACT or CONTAINS 1864 An exact match will return only the software component whose name matches the name exactly. 1865 A contains match will return all software components whose name contains the search string. 1866 1867 Args: 1868 token (str): 1869 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1870 organization_context (str): 1871 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1872 name (str, required): 1873 Name of the software component to search for. 1874 version (str, optional): 1875 Version of the software component to search for. If not specified, will search for all versions of the software component. 1876 asset_version_id (str, optional): 1877 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1878 search_method (str, optional): 1879 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1880 case_sensitive (bool, optional): 1881 Whether or not to perform a case sensitive search. Defaults to False. 1882 Raises: 1883 ValueError: Raised if name is not provided. 1884 Exception: Raised if the query fails. 1885 Returns: 1886 list: List of SoftwareComponentInstance Objects 1887 """ 1888 if asset_version_id: 1889 query = """ 1890query GetSoftwareComponentInstances_SDK( 1891 $filter: SoftwareComponentInstanceFilter 1892 $after: String 1893 $first: Int 1894) { 1895 allSoftwareComponentInstances( 1896 filter: $filter 1897 after: $after 1898 first: $first 1899 ) { 1900 _cursor 1901 id 1902 name 1903 version 1904 originalComponents { 1905 id 1906 name 1907 version 1908 } 1909 } 1910} 1911""" 1912 else: 1913 # gets the asset version info that contains the software component 1914 query = """ 1915query GetSoftwareComponentInstances_SDK( 1916 $filter: SoftwareComponentInstanceFilter 1917 $after: String 1918 $first: Int 1919) { 1920 allSoftwareComponentInstances( 1921 filter: $filter 1922 after: $after 1923 first: $first 1924 ) { 1925 _cursor 1926 id 1927 name 1928 version 1929 assetVersion { 1930 id 1931 name 1932 asset { 1933 id 1934 name 1935 } 1936 } 1937 } 1938} 1939""" 1940 1941 variables = { 1942 "filter": { 1943 "mergedComponentRefId": None 1944 }, 1945 "after": None, 1946 "first": 100 1947 } 1948 1949 if asset_version_id: 1950 variables["filter"]["assetVersionRefId"] = asset_version_id 1951 1952 if search_method == 'EXACT': 1953 if case_sensitive: 1954 variables["filter"]["name"] = name 1955 else: 1956 variables["filter"]["name_like"] = name 1957 elif search_method == 'CONTAINS': 1958 variables["filter"]["name_contains"] = name 1959 1960 if version: 1961 if search_method == 'EXACT': 1962 variables["filter"]["version"] = version 1963 elif search_method == 'CONTAINS': 1964 variables["filter"]["version_contains"] = version 1965 1966 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1967 field="allSoftwareComponentInstances") 1968 1969 return records
Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- name (str, required): Name of the software component to search for.
- version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
- asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
- search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
- case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
- ValueError: Raised if name is not provided.
- Exception: Raised if the query fails.
Returns:
list: List of SoftwareComponentInstance Objects
1972def send_graphql_query(token, organization_context, query, variables=None): 1973 """ 1974 Send a GraphQL query to the API 1975 1976 Args: 1977 token (str): 1978 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1979 organization_context (str): 1980 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1981 query (str): 1982 The GraphQL query string 1983 variables (dict, optional): 1984 Variables to be used in the GraphQL query, by default None 1985 1986 Raises: 1987 Exception: If the response status code is not 200 1988 1989 Returns: 1990 dict: Response JSON 1991 """ 1992 headers = { 1993 'Content-Type': 'application/json', 1994 'Authorization': f'Bearer {token}', 1995 'Organization-Context': organization_context 1996 } 1997 data = { 1998 'query': query, 1999 'variables': variables 2000 } 2001 2002 response = requests.post(API_URL, headers=headers, json=data) 2003 2004 if response.status_code == 200: 2005 thejson = response.json() 2006 2007 if "errors" in thejson: 2008 raise Exception(f"Error: {thejson['errors']}") 2009 2010 return thejson 2011 else: 2012 raise Exception(f"Error: {response.status_code} - {response.text}")
Send a GraphQL query to the API
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
- Exception: If the response status code is not 200
Returns:
dict: Response JSON
2015def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2016 justification=None, response=None, comment=None): 2017 """ 2018 Updates the status of a findings or multiple findings. This is a blocking call. 2019 2020 Args: 2021 token (str): 2022 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2023 organization_context (str): 2024 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2025 user_id (str, required): 2026 User ID to update the finding status for. 2027 finding_ids (str, required): 2028 Finding ID to update the status for. 2029 status (str, required): 2030 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2031 justification (str, optional): 2032 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2033 response (str, optional): 2034 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2035 comment (str, optional): 2036 Optional comment to add to the finding status update. 2037 2038 Raises: 2039 ValueError: Raised if required parameters are not provided. 2040 Exception: Raised if the query fails. 2041 2042 Returns: 2043 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2044 """ 2045 if not user_id: 2046 raise ValueError("User Id is required") 2047 if not finding_ids: 2048 raise ValueError("Finding Ids is required") 2049 if not status: 2050 raise ValueError("Status is required") 2051 2052 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2053 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2054 justification=justification, response=response, 2055 comment=comment) 2056 2057 return send_graphql_query(token, organization_context, mutation, variables)
Updates the status of a findings or multiple findings. This is a blocking call.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- user_id (str, required): User ID to update the finding status for.
- finding_ids (str, required): Finding ID to update the status for.
- status (str, required): Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
- justification (str, optional): Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
- response (str, optional): Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
- comment (str, optional): Optional comment to add to the finding status update.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2060def upload_file_for_binary_analysis( 2061 token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False 2062): 2063 """ 2064 Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. 2065 NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that. 2066 2067 Args: 2068 token (str): 2069 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2070 organization_context (str): 2071 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2072 test_id (str, required): 2073 Test ID to upload the file for. 2074 file_path (str, required): 2075 Local path to the file to upload. 2076 chunk_size (int, optional): 2077 The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB. 2078 quick_scan (bool, optional): 2079 If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation. 2080 enable_bandit_scan (bool, optional): 2081 If True, will create an additional bandit scan in addition to the default binary analysis scan. 2082 2083 Raises: 2084 ValueError: Raised if test_id or file_path are not provided. 2085 Exception: Raised if the query fails. 2086 2087 Returns: 2088 dict: The response from the GraphQL query, a completeMultipartUpload Object. 2089 """ 2090 # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation 2091 if not test_id: 2092 raise ValueError("Test Id is required") 2093 if not file_path: 2094 raise ValueError("File Path is required") 2095 if chunk_size < MIN_CHUNK_SIZE: 2096 raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes") 2097 if chunk_size >= MAX_CHUNK_SIZE: 2098 raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes") 2099 2100 # Start Multi-part Upload 2101 graphql_query = """ 2102 mutation Start_SDK($testId: ID!) { 2103 startMultipartUploadV2(testId: $testId) { 2104 uploadId 2105 key 2106 } 2107 } 2108 """ 2109 2110 variables = { 2111 "testId": test_id 2112 } 2113 2114 response = send_graphql_query(token, organization_context, graphql_query, variables) 2115 2116 upload_id = response['data']['startMultipartUploadV2']['uploadId'] 2117 upload_key = response['data']['startMultipartUploadV2']['key'] 2118 2119 # if the file is greater than max chunk size (or 5 GB), split the file in chunks, 2120 # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part) 2121 # and upload the file to the returned upload URL 2122 i = 0 2123 part_data = [] 2124 for chunk in file_chunks(file_path, chunk_size): 2125 i = i + 1 2126 graphql_query = """ 2127 mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) { 2128 generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) { 2129 key 2130 uploadUrl 2131 } 2132 } 2133 """ 2134 2135 variables = { 2136 "partNumber": i, 2137 "uploadId": upload_id, 2138 "uploadKey": upload_key 2139 } 2140 2141 response = send_graphql_query(token, organization_context, graphql_query, variables) 2142 2143 chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl'] 2144 2145 # upload the chunk to the upload URL 2146 response = upload_bytes_to_url(chunk_upload_url, chunk) 2147 2148 part_data.append({ 2149 "ETag": response.headers['ETag'], 2150 "PartNumber": i 2151 }) 2152 2153 # call completeMultipartUploadV2 2154 graphql_query = """ 2155 mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) { 2156 completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) { 2157 key 2158 } 2159 } 2160 """ 2161 2162 variables = { 2163 "partData": part_data, 2164 "uploadId": upload_id, 2165 "uploadKey": upload_key 2166 } 2167 2168 response = send_graphql_query(token, organization_context, graphql_query, variables) 2169 2170 # get key from the result 2171 key = response['data']['completeMultipartUploadV2']['key'] 2172 2173 variables = { 2174 "key": key, 2175 "testId": test_id 2176 } 2177 2178 # call launchBinaryUploadProcessing 2179 if quick_scan or enable_bandit_scan: 2180 graphql_query = """ 2181 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) { 2182 launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) { 2183 key 2184 newBanditScanId 2185 } 2186 } 2187 """ 2188 else: 2189 graphql_query = """ 2190 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) { 2191 launchBinaryUploadProcessing(key: $key, testId: $testId) { 2192 key 2193 newBanditScanId 2194 } 2195 } 2196 """ 2197 2198 if quick_scan: 2199 variables["configurationOptions"] = ["QUICK_SCAN"] 2200 2201 if enable_bandit_scan: 2202 config_options = variables.get("configurationOptions", []) 2203 config_options.append("ENABLE_BANDIT_SCAN") 2204 variables["configurationOptions"] = config_options 2205 2206 response = send_graphql_query(token, organization_context, graphql_query, variables) 2207 2208 return response['data']
Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- test_id (str, required): Test ID to upload the file for.
- file_path (str, required): Local path to the file to upload.
- chunk_size (int, optional): The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
- quick_scan (bool, optional): If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
- enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
- ValueError: Raised if test_id or file_path are not provided.
- Exception: Raised if the query fails.
Returns:
dict: The response from the GraphQL query, a completeMultipartUpload Object.
2211def upload_test_results_file(token, organization_context, test_id=None, file_path=None): 2212 """ 2213 Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that. 2214 2215 Args: 2216 token (str): 2217 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2218 organization_context (str): 2219 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2220 test_id (str, required): 2221 Test ID to upload the file for. 2222 file_path (str, required): 2223 Local path to the file to upload. 2224 2225 Raises: 2226 ValueError: Raised if test_id or file_path are not provided. 2227 Exception: Raised if the query fails. 2228 2229 Returns: 2230 dict: The response from the GraphQL query, a completeTestResultUpload Object. 2231 """ 2232 if not test_id: 2233 raise ValueError("Test Id is required") 2234 if not file_path: 2235 raise ValueError("File Path is required") 2236 2237 # Gerneate Test Result Upload URL 2238 graphql_query = """ 2239 mutation GenerateTestResultUploadUrl_SDK($testId: ID!) { 2240 generateSinglePartUploadUrl(testId: $testId) { 2241 uploadUrl 2242 key 2243 } 2244 } 2245 """ 2246 2247 variables = { 2248 "testId": test_id 2249 } 2250 2251 response = send_graphql_query(token, organization_context, graphql_query, variables) 2252 2253 # get the upload URL and key 2254 upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl'] 2255 key = response['data']['generateSinglePartUploadUrl']['key'] 2256 2257 # upload the file 2258 upload_file_to_url(upload_url, file_path) 2259 2260 # complete the upload 2261 graphql_query = """ 2262 mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) { 2263 launchTestResultProcessing(key: $key, testId: $testId) { 2264 key 2265 } 2266 } 2267 """ 2268 2269 variables = { 2270 "testId": test_id, 2271 "key": key 2272 } 2273 2274 response = send_graphql_query(token, organization_context, graphql_query, variables) 2275 return response['data']
Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- test_id (str, required): Test ID to upload the file for.
- file_path (str, required): Local path to the file to upload.
Raises:
- ValueError: Raised if test_id or file_path are not provided.
- Exception: Raised if the query fails.
Returns:
dict: The response from the GraphQL query, a completeTestResultUpload Object.
2278def upload_bytes_to_url(url, bytes): 2279 """ 2280 Used for uploading a file to a pre-signed S3 URL 2281 2282 Args: 2283 url (str): 2284 (Pre-signed S3) URL 2285 bytes (bytes): 2286 Bytes to upload 2287 2288 Raises: 2289 Exception: If the response status code is not 200 2290 2291 Returns: 2292 requests.Response: Response object 2293 """ 2294 response = requests.put(url, data=bytes) 2295 2296 if response.status_code == 200: 2297 return response 2298 else: 2299 raise Exception(f"Error: {response.status_code} - {response.text}")
Used for uploading a file to a pre-signed S3 URL
Arguments:
- url (str): (Pre-signed S3) URL
- bytes (bytes): Bytes to upload
Raises:
- Exception: If the response status code is not 200
Returns:
requests.Response: Response object
2302def upload_file_to_url(url, file_path): 2303 """ 2304 Used for uploading a file to a pre-signed S3 URL 2305 2306 Args: 2307 url (str): 2308 (Pre-signed S3) URL 2309 file_path (str): 2310 Local path to file to upload 2311 2312 Raises: 2313 Exception: If the response status code is not 200 2314 2315 Returns: 2316 requests.Response: Response object 2317 """ 2318 with open(file_path, 'rb') as file: 2319 response = requests.put(url, data=file) 2320 2321 if response.status_code == 200: 2322 return response 2323 else: 2324 raise Exception(f"Error: {response.status_code} - {response.text}")
Used for uploading a file to a pre-signed S3 URL
Arguments:
- url (str): (Pre-signed S3) URL
- file_path (str): Local path to file to upload
Raises:
- Exception: If the response status code is not 200
Returns:
requests.Response: Response object