finite_state_sdk
1import json 2from enum import Enum 3 4import requests 5import time 6from warnings import warn 7import finite_state_sdk.queries as queries 8 9API_URL = 'https://platform.finitestate.io/api/v1/graphql' 10AUDIENCE = "https://platform.finitestate.io/api/v1/graphql" 11TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token" 12 13""" 14DEFAULT CHUNK SIZE: 1000 MiB 15""" 16DEFAULT_CHUNK_SIZE = 1024**2 * 1000 17""" 18MAX CHUNK SIZE: 2 GiB 19""" 20MAX_CHUNK_SIZE = 1024**2 * 2000 21""" 22MIN CHUNK SIZE: 5 MiB 23""" 24MIN_CHUNK_SIZE = 1024**2 * 5 25 26 27class UploadMethod(Enum): 28 """ 29 Enumeration class representing different upload methods. 30 31 Attributes: 32 WEB_APP_UI: Upload method via web application UI. 33 API: Upload method via API. 34 GITHUB_INTEGRATION: Upload method via GitHub integration. 35 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 36 37 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 38 """ 39 WEB_APP_UI = "WEB_APP_UI" 40 API = "API" 41 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 42 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION" 43 44 45def create_artifact( 46 token, 47 organization_context, 48 business_unit_id=None, 49 created_by_user_id=None, 50 asset_version_id=None, 51 artifact_name=None, 52 product_id=None, 53): 54 """ 55 Create a new Artifact. 56 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 57 Please see the examples in the Github repository for more information: 58 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 59 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 60 61 Args: 62 token (str): 63 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 64 organization_context (str): 65 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 66 business_unit_id (str, required): 67 Business Unit ID to associate the artifact with. 68 created_by_user_id (str, required): 69 User ID of the user creating the artifact. 70 asset_version_id (str, required): 71 Asset Version ID to associate the artifact with. 72 artifact_name (str, required): 73 The name of the Artifact being created. 74 product_id (str, optional): 75 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 76 77 Raises: 78 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 79 Exception: Raised if the query fails. 80 81 Returns: 82 dict: createArtifact Object 83 """ 84 if not business_unit_id: 85 raise ValueError("Business unit ID is required") 86 if not created_by_user_id: 87 raise ValueError("Created by user ID is required") 88 if not asset_version_id: 89 raise ValueError("Asset version ID is required") 90 if not artifact_name: 91 raise ValueError("Artifact name is required") 92 93 graphql_query = """ 94 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 95 createArtifact(input: $input) { 96 id 97 name 98 assetVersion { 99 id 100 name 101 asset { 102 id 103 name 104 } 105 } 106 createdBy { 107 id 108 email 109 } 110 ctx { 111 asset 112 products 113 businessUnits 114 } 115 } 116 } 117 """ 118 119 # Asset name, business unit context, and creating user are required 120 variables = { 121 "input": { 122 "name": artifact_name, 123 "createdBy": created_by_user_id, 124 "assetVersion": asset_version_id, 125 "ctx": { 126 "asset": asset_version_id, 127 "businessUnits": [business_unit_id] 128 } 129 } 130 } 131 132 if product_id is not None: 133 variables["input"]["ctx"]["products"] = product_id 134 135 response = send_graphql_query(token, organization_context, graphql_query, variables) 136 return response['data'] 137 138 139def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 140 """ 141 Create a new Asset. 142 143 Args: 144 token (str): 145 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 146 organization_context (str): 147 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 148 business_unit_id (str, required): 149 Business Unit ID to associate the asset with. 150 created_by_user_id (str, required): 151 User ID of the user creating the asset. 152 asset_name (str, required): 153 The name of the Asset being created. 154 product_id (str, optional): 155 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 156 157 Raises: 158 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 159 Exception: Raised if the query fails. 160 161 Returns: 162 dict: createAsset Object 163 """ 164 if not business_unit_id: 165 raise ValueError("Business unit ID is required") 166 if not created_by_user_id: 167 raise ValueError("Created by user ID is required") 168 if not asset_name: 169 raise ValueError("Asset name is required") 170 171 graphql_query = """ 172 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 173 createAsset(input: $input) { 174 id 175 name 176 dependentProducts { 177 id 178 name 179 } 180 group { 181 id 182 name 183 } 184 createdBy { 185 id 186 email 187 } 188 ctx { 189 asset 190 products 191 businessUnits 192 } 193 } 194 } 195 """ 196 197 # Asset name, business unit context, and creating user are required 198 variables = { 199 "input": { 200 "name": asset_name, 201 "group": business_unit_id, 202 "createdBy": created_by_user_id, 203 "ctx": { 204 "businessUnits": [business_unit_id] 205 } 206 } 207 } 208 209 if product_id is not None: 210 variables["input"]["ctx"]["products"] = product_id 211 212 response = send_graphql_query(token, organization_context, graphql_query, variables) 213 return response['data'] 214 215 216def create_asset_version( 217 token, 218 organization_context, 219 business_unit_id=None, 220 created_by_user_id=None, 221 asset_id=None, 222 asset_version_name=None, 223 product_id=None, 224): 225 """ 226 Create a new Asset Version. 227 228 Args: 229 token (str): 230 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 231 organization_context (str): 232 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 233 business_unit_id (str, required): 234 Business Unit ID to associate the asset version with. 235 created_by_user_id (str, required): 236 User ID of the user creating the asset version. 237 asset_id (str, required): 238 Asset ID to associate the asset version with. 239 asset_version_name (str, required): 240 The name of the Asset Version being created. 241 product_id (str, optional): 242 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 243 244 Raises: 245 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 246 Exception: Raised if the query fails. 247 248 Returns: 249 dict: createAssetVersion Object 250 251 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 252 """ 253 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 254 if not business_unit_id: 255 raise ValueError("Business unit ID is required") 256 if not created_by_user_id: 257 raise ValueError("Created by user ID is required") 258 if not asset_id: 259 raise ValueError("Asset ID is required") 260 if not asset_version_name: 261 raise ValueError("Asset version name is required") 262 263 graphql_query = """ 264 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 265 createAssetVersion(input: $input) { 266 id 267 name 268 asset { 269 id 270 name 271 } 272 createdBy { 273 id 274 email 275 } 276 ctx { 277 asset 278 products 279 businessUnits 280 } 281 } 282 } 283 """ 284 285 # Asset name, business unit context, and creating user are required 286 variables = { 287 "input": { 288 "name": asset_version_name, 289 "createdBy": created_by_user_id, 290 "asset": asset_id, 291 "ctx": { 292 "asset": asset_id, 293 "businessUnits": [business_unit_id] 294 } 295 } 296 } 297 298 if product_id is not None: 299 variables["input"]["ctx"]["products"] = product_id 300 301 response = send_graphql_query(token, organization_context, graphql_query, variables) 302 return response['data'] 303 304 305def create_asset_version_on_asset( 306 token, 307 organization_context, 308 created_by_user_id=None, 309 asset_id=None, 310 asset_version_name=None, 311): 312 """ 313 Create a new Asset Version. 314 315 Args: 316 token (str): 317 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 318 organization_context (str): 319 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 320 created_by_user_id (str, optional): 321 User ID of the user creating the asset version. 322 asset_id (str, required): 323 Asset ID to associate the asset version with. 324 asset_version_name (str, required): 325 The name of the Asset Version being created. 326 327 Raises: 328 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 329 Exception: Raised if the query fails. 330 331 Returns: 332 dict: createAssetVersion Object 333 """ 334 if not asset_id: 335 raise ValueError("Asset ID is required") 336 if not asset_version_name: 337 raise ValueError("Asset version name is required") 338 339 graphql_query = """ 340 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) { 341 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) { 342 id 343 assetVersion { 344 id 345 } 346 } 347 } 348 """ 349 350 # Asset name, business unit context, and creating user are required 351 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 352 353 if created_by_user_id: 354 variables["createdByUserId"] = created_by_user_id 355 356 response = send_graphql_query(token, organization_context, graphql_query, variables) 357 return response['data'] 358 359 360def create_new_asset_version_artifact_and_test_for_upload( 361 token, 362 organization_context, 363 business_unit_id=None, 364 created_by_user_id=None, 365 asset_id=None, 366 version=None, 367 product_id=None, 368 test_type=None, 369 artifact_description=None, 370 upload_method: UploadMethod = UploadMethod.API, 371): 372 """ 373 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 374 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 375 376 Args: 377 token (str): 378 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 379 organization_context (str): 380 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 381 business_unit_id (str, optional): 382 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 383 created_by_user_id (str, optional): 384 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 385 asset_id (str, required): 386 Asset ID to create the asset version for. If not provided, the default asset will be used. 387 version (str, required): 388 Version to create the asset version for. 389 product_id (str, optional): 390 Product ID to create the entities for. If not provided, the default product will be used. 391 test_type (str, required): 392 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 393 artifact_description (str, optional): 394 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 395 upload_method (UploadMethod, optional): 396 The method of uploading the test results. Default is UploadMethod.API. 397 398 399 Raises: 400 ValueError: Raised if asset_id or version are not provided. 401 Exception: Raised if the query fails. 402 403 Returns: 404 str: The Test ID of the newly created test that is used for uploading the file. 405 """ 406 if not asset_id: 407 raise ValueError("Asset ID is required") 408 if not version: 409 raise ValueError("Version is required") 410 411 assets = get_all_assets(token, organization_context, asset_id=asset_id) 412 asset = assets[0] 413 414 # get the asset name 415 asset_name = asset['name'] 416 417 # get the existing asset product IDs 418 asset_product_ids = asset['ctx']['products'] 419 420 # get the asset product ID 421 if product_id and product_id not in asset_product_ids: 422 asset_product_ids.append(product_id) 423 424 # if business_unit_id or created_by_user_id are not provided, get the existing asset 425 if not business_unit_id or not created_by_user_id: 426 if not business_unit_id: 427 business_unit_id = asset['group']['id'] 428 if not created_by_user_id: 429 created_by_user_id = asset['createdBy']['id'] 430 431 if not business_unit_id: 432 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 433 if not created_by_user_id: 434 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 435 436 # create the asset version 437 response = create_asset_version_on_asset( 438 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version 439 ) 440 # get the asset version ID 441 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 442 443 # create the test 444 if test_type == "finite_state_binary_analysis": 445 # create the artifact 446 if not artifact_description: 447 artifact_description = "Binary" 448 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 449 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 450 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 451 artifact_name=binary_artifact_name, product_id=asset_product_ids) 452 453 # get the artifact ID 454 binary_artifact_id = response['createArtifact']['id'] 455 456 # create the test 457 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 458 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 459 created_by_user_id=created_by_user_id, asset_id=asset_id, 460 artifact_id=binary_artifact_id, product_id=asset_product_ids, 461 test_name=test_name, upload_method=upload_method) 462 test_id = response['createTest']['id'] 463 return test_id 464 465 else: 466 # create the artifact 467 if not artifact_description: 468 artifact_description = "Unspecified Artifact" 469 artifact_name = f"{asset_name} {version} - {artifact_description}" 470 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 471 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 472 artifact_name=artifact_name, product_id=asset_product_ids) 473 474 # get the artifact ID 475 binary_artifact_id = response['createArtifact']['id'] 476 477 # create the test 478 test_name = f"{asset_name} {version} - {test_type}" 479 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 480 created_by_user_id=created_by_user_id, asset_id=asset_id, 481 artifact_id=binary_artifact_id, product_id=asset_product_ids, 482 test_name=test_name, test_type=test_type, 483 upload_method=upload_method) 484 test_id = response['createTest']['id'] 485 return test_id 486 487 488def create_new_asset_version_and_upload_binary( 489 token, 490 organization_context, 491 business_unit_id=None, 492 created_by_user_id=None, 493 asset_id=None, 494 version=None, 495 file_path=None, 496 product_id=None, 497 artifact_description=None, 498 quick_scan=False, 499 upload_method: UploadMethod = UploadMethod.API, 500): 501 """ 502 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 503 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 504 505 Args: 506 token (str): 507 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 508 organization_context (str): 509 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 510 business_unit_id (str, optional): 511 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 512 created_by_user_id (str, optional): 513 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 514 asset_id (str, required): 515 Asset ID to create the asset version for. 516 version (str, required): 517 Version to create the asset version for. 518 file_path (str, required): 519 Local path to the file to upload. 520 product_id (str, optional): 521 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 522 artifact_description (str, optional): 523 Description of the artifact. If not provided, the default is "Firmware Binary". 524 quick_scan (bool, optional): 525 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 526 upload_method (UploadMethod, optional): 527 The method of uploading the test results. Default is UploadMethod.API. 528 529 Raises: 530 ValueError: Raised if asset_id, version, or file_path are not provided. 531 Exception: Raised if any of the queries fail. 532 533 Returns: 534 dict: The response from the GraphQL query, a createAssetVersion Object. 535 """ 536 if not asset_id: 537 raise ValueError("Asset ID is required") 538 if not version: 539 raise ValueError("Version is required") 540 if not file_path: 541 raise ValueError("File path is required") 542 543 # create the asset version and binary test 544 if not artifact_description: 545 artifact_description = "Firmware Binary" 546 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 547 token, 548 organization_context, 549 business_unit_id=business_unit_id, 550 created_by_user_id=created_by_user_id, 551 asset_id=asset_id, 552 version=version, 553 product_id=product_id, 554 test_type="finite_state_binary_analysis", 555 artifact_description=artifact_description, 556 upload_method=upload_method, 557 ) 558 559 # upload file for binary test 560 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 561 quick_scan=quick_scan) 562 return response 563 564 565def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 566 created_by_user_id=None, asset_id=None, version=None, 567 file_path=None, product_id=None, test_type=None, 568 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 569 """ 570 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 571 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 572 573 Args: 574 token (str): 575 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 576 organization_context (str): 577 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 578 business_unit_id (str, optional): 579 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 580 created_by_user_id (str, optional): 581 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 582 asset_id (str, required): 583 Asset ID to create the asset version for. 584 version (str, required): 585 Version to create the asset version for. 586 file_path (str, required): 587 Path to the test results file to upload. 588 product_id (str, optional): 589 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 590 test_type (str, required): 591 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 592 artifact_description (str, optional): 593 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 594 upload_method (UploadMethod, optional): 595 The method of uploading the test results. Default is UploadMethod.API. 596 597 Raises: 598 ValueError: If the asset_id, version, or file_path are not provided. 599 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 600 601 Returns: 602 dict: The response from the GraphQL query, a createAssetVersion Object. 603 """ 604 if not asset_id: 605 raise ValueError("Asset ID is required") 606 if not version: 607 raise ValueError("Version is required") 608 if not file_path: 609 raise ValueError("File path is required") 610 if not test_type: 611 raise ValueError("Test type is required") 612 613 # create the asset version and test 614 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 615 business_unit_id=business_unit_id, 616 created_by_user_id=created_by_user_id, 617 asset_id=asset_id, version=version, 618 product_id=product_id, test_type=test_type, 619 artifact_description=artifact_description, 620 upload_method=upload_method) 621 622 # upload test results file 623 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 624 return response 625 626 627def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 628 product_description=None, vendor_id=None, vendor_name=None): 629 """ 630 Create a new Product. 631 632 Args: 633 token (str): 634 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 635 organization_context (str): 636 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 637 business_unit_id (str, required): 638 Business Unit ID to associate the product with. 639 created_by_user_id (str, required): 640 User ID of the user creating the product. 641 product_name (str, required): 642 The name of the Product being created. 643 product_description (str, optional): 644 The description of the Product being created. 645 vendor_id (str, optional): 646 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 647 vendor_name (str, optional): 648 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 649 650 Raises: 651 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 652 Exception: Raised if the query fails. 653 654 Returns: 655 dict: createProduct Object 656 """ 657 658 if not business_unit_id: 659 raise ValueError("Business unit ID is required") 660 if not created_by_user_id: 661 raise ValueError("Created by user ID is required") 662 if not product_name: 663 raise ValueError("Product name is required") 664 665 graphql_query = """ 666 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 667 createProduct(input: $input) { 668 id 669 name 670 vendor { 671 name 672 } 673 group { 674 id 675 name 676 } 677 createdBy { 678 id 679 email 680 } 681 ctx { 682 businessUnit 683 } 684 } 685 } 686 """ 687 688 # Product name, business unit context, and creating user are required 689 variables = { 690 "input": { 691 "name": product_name, 692 "group": business_unit_id, 693 "createdBy": created_by_user_id, 694 "ctx": { 695 "businessUnit": business_unit_id 696 } 697 } 698 } 699 700 if product_description is not None: 701 variables["input"]["description"] = product_description 702 703 # If the vendor ID is specified, this will link the new product to the existing vendor 704 if vendor_id is not None: 705 variables["input"]["vendor"] = { 706 "id": vendor_id 707 } 708 709 # If the vendor name is specified, this will create a new vendor and link it to the new product 710 if vendor_name is not None: 711 variables["input"]["createVendor"] = { 712 "name": vendor_name 713 } 714 715 response = send_graphql_query(token, organization_context, graphql_query, variables) 716 717 return response['data'] 718 719 720def create_test( 721 token, 722 organization_context, 723 business_unit_id=None, 724 created_by_user_id=None, 725 asset_id=None, 726 artifact_id=None, 727 test_name=None, 728 product_id=None, 729 test_type=None, 730 tools=[], 731 upload_method: UploadMethod = UploadMethod.API, 732): 733 """ 734 Create a new Test object for uploading files. 735 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 736 Please see the examples in the Github repository for more information: 737 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 738 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 739 740 Args: 741 token (str): 742 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 743 organization_context (str): 744 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 745 business_unit_id (str, required): 746 Business Unit ID to associate the Test with. 747 created_by_user_id (str, required): 748 User ID of the user creating the Test. 749 asset_id (str, required): 750 Asset ID to associate the Test with. 751 artifact_id (str, required): 752 Artifact ID to associate the Test with. 753 test_name (str, required): 754 The name of the Test being created. 755 product_id (str, optional): 756 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 757 test_type (str, required): 758 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 759 tools (list, optional): 760 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 761 upload_method (UploadMethod, required): 762 The method of uploading the test results. 763 764 Raises: 765 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 766 Exception: Raised if the query fails. 767 768 Returns: 769 dict: createTest Object 770 """ 771 if not business_unit_id: 772 raise ValueError("Business unit ID is required") 773 if not created_by_user_id: 774 raise ValueError("Created by user ID is required") 775 if not asset_id: 776 raise ValueError("Asset ID is required") 777 if not artifact_id: 778 raise ValueError("Artifact ID is required") 779 if not test_name: 780 raise ValueError("Test name is required") 781 if not test_type: 782 raise ValueError("Test type is required") 783 784 graphql_query = """ 785 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 786 createTest(input: $input) { 787 id 788 name 789 artifactUnderTest { 790 id 791 name 792 assetVersion { 793 id 794 name 795 asset { 796 id 797 name 798 dependentProducts { 799 id 800 name 801 } 802 } 803 } 804 } 805 createdBy { 806 id 807 email 808 } 809 ctx { 810 asset 811 products 812 businessUnits 813 } 814 uploadMethod 815 } 816 } 817 """ 818 819 # Asset name, business unit context, and creating user are required 820 variables = { 821 "input": { 822 "name": test_name, 823 "createdBy": created_by_user_id, 824 "artifactUnderTest": artifact_id, 825 "testResultFileFormat": test_type, 826 "ctx": { 827 "asset": asset_id, 828 "businessUnits": [business_unit_id] 829 }, 830 "tools": tools, 831 "uploadMethod": upload_method.value 832 } 833 } 834 835 if product_id is not None: 836 variables["input"]["ctx"]["products"] = product_id 837 838 response = send_graphql_query(token, organization_context, graphql_query, variables) 839 return response['data'] 840 841 842def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 843 asset_id=None, artifact_id=None, test_name=None, product_id=None, 844 upload_method: UploadMethod = UploadMethod.API): 845 """ 846 Create a new Test object for uploading files for Finite State Binary Analysis. 847 848 Args: 849 token (str): 850 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 851 organization_context (str): 852 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 853 business_unit_id (str, required): 854 Business Unit ID to associate the Test with. 855 created_by_user_id (str, required): 856 User ID of the user creating the Test. 857 asset_id (str, required): 858 Asset ID to associate the Test with. 859 artifact_id (str, required): 860 Artifact ID to associate the Test with. 861 test_name (str, required): 862 The name of the Test being created. 863 product_id (str, optional): 864 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 865 upload_method (UploadMethod, optional): 866 The method of uploading the test results. Default is UploadMethod.API. 867 868 Raises: 869 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 870 Exception: Raised if the query fails. 871 872 Returns: 873 dict: createTest Object 874 """ 875 tools = [ 876 { 877 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 878 "name": "Finite State Binary Analysis" 879 } 880 ] 881 return create_test(token, organization_context, business_unit_id=business_unit_id, 882 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 883 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 884 tools=tools, upload_method=upload_method) 885 886 887def create_test_as_cyclone_dx( 888 token, 889 organization_context, 890 business_unit_id=None, 891 created_by_user_id=None, 892 asset_id=None, 893 artifact_id=None, 894 test_name=None, 895 product_id=None, 896 upload_method: UploadMethod = UploadMethod.API, 897): 898 """ 899 Create a new Test object for uploading CycloneDX files. 900 901 Args: 902 token (str): 903 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 904 organization_context (str): 905 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 906 business_unit_id (str, required): 907 Business Unit ID to associate the Test with. 908 created_by_user_id (str, required): 909 User ID of the user creating the Test. 910 asset_id (str, required): 911 Asset ID to associate the Test with. 912 artifact_id (str, required): 913 Artifact ID to associate the Test with. 914 test_name (str, required): 915 The name of the Test being created. 916 product_id (str, optional): 917 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 918 upload_method (UploadMethod, optional): 919 The method of uploading the test results. Default is UploadMethod.API. 920 921 Raises: 922 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 923 Exception: Raised if the query fails. 924 925 Returns: 926 dict: createTest Object 927 """ 928 return create_test(token, organization_context, business_unit_id=business_unit_id, 929 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 930 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method) 931 932 933def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 934 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 935 upload_method: UploadMethod = UploadMethod.API): 936 """ 937 Create a new Test object for uploading Third Party Scanner files. 938 939 Args: 940 token (str): 941 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 942 organization_context (str): 943 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 944 business_unit_id (str, required): 945 Business Unit ID to associate the Test with. 946 created_by_user_id (str, required): 947 User ID of the user creating the Test. 948 asset_id (str, required): 949 Asset ID to associate the Test with. 950 artifact_id (str, required): 951 Artifact ID to associate the Test with. 952 test_name (str, required): 953 The name of the Test being created. 954 product_id (str, optional): 955 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 956 test_type (str, required): 957 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 958 upload_method (UploadMethod, optional): 959 The method of uploading the test results. Default is UploadMethod.API. 960 961 Raises: 962 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 963 Exception: Raised if the query fails. 964 965 Returns: 966 dict: createTest Object 967 """ 968 return create_test(token, organization_context, business_unit_id=business_unit_id, 969 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 970 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method) 971 972 973def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 974 report_subtype=None, output_filename=None, verbose=False): 975 """ 976 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 977 978 Args: 979 token (str): 980 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 981 organization_context (str): 982 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 983 asset_version_id (str, required): 984 The Asset Version ID to download the report for. 985 report_type (str, required): 986 The file type of the report to download. Valid values are "CSV" and "PDF". 987 report_subtype (str, required): 988 The type of report to download. Based on available reports for the `report_type` specified 989 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 990 Valid values for PDF are "RISK_SUMMARY". 991 output_filename (str, optional): 992 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 993 verbose (bool, optional): 994 If True, will print additional information to the console. Defaults to False. 995 996 Raises: 997 ValueError: Raised if required parameters are not provided. 998 Exception: Raised if the query fails. 999 1000 Returns: 1001 None 1002 """ 1003 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1004 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1005 1006 # Send an HTTP GET request to the URL 1007 response = requests.get(url) 1008 1009 # Check if the request was successful (status code 200) 1010 if response.status_code == 200: 1011 # Open a local file in binary write mode and write the content to it 1012 if verbose: 1013 print("File downloaded successfully.") 1014 with open(output_filename, 'wb') as file: 1015 file.write(response.content) 1016 if verbose: 1017 print(f'Wrote file to {output_filename}') 1018 else: 1019 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1020 1021 1022def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1023 output_filename=None, verbose=False): 1024 """ 1025 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1026 1027 Args: 1028 token (str): 1029 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1030 organization_context (str): 1031 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1032 product_id (str, required): 1033 The Product ID to download the report for. 1034 report_type (str, required): 1035 The file type of the report to download. Valid values are "CSV". 1036 report_subtype (str, required): 1037 The type of report to download. Based on available reports for the `report_type` specified 1038 Valid values for CSV are "ALL_FINDINGS". 1039 output_filename (str, optional): 1040 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1041 verbose (bool, optional): 1042 If True, will print additional information to the console. Defaults to False. 1043 """ 1044 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1045 report_subtype=report_subtype, verbose=verbose) 1046 1047 # Send an HTTP GET request to the URL 1048 response = requests.get(url) 1049 1050 # Check if the request was successful (status code 200) 1051 if response.status_code == 200: 1052 # Open a local file in binary write mode and write the content to it 1053 if verbose: 1054 print("File downloaded successfully.") 1055 with open(output_filename, 'wb') as file: 1056 file.write(response.content) 1057 if verbose: 1058 print(f'Wrote file to {output_filename}') 1059 else: 1060 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1061 1062 1063def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1064 output_filename="sbom.json", verbose=False): 1065 """ 1066 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1067 1068 Args: 1069 token (str): 1070 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1071 organization_context (str): 1072 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1073 sbom_type (str, required): 1074 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1075 sbom_subtype (str, required): 1076 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1077 asset_version_id (str, required): 1078 The Asset Version ID to download the SBOM for. 1079 output_filename (str, required): 1080 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1081 verbose (bool, optional): 1082 If True, will print additional information to the console. Defaults to False. 1083 1084 Raises: 1085 ValueError: Raised if required parameters are not provided. 1086 Exception: Raised if the query fails. 1087 1088 Returns: 1089 None 1090 """ 1091 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1092 asset_version_id=asset_version_id, verbose=verbose) 1093 1094 # Send an HTTP GET request to the URL 1095 response = requests.get(url) 1096 1097 # Check if the request was successful (status code 200) 1098 if response.status_code == 200: 1099 # Open a local file in binary write mode and write the content to it 1100 if verbose: 1101 print("File downloaded successfully.") 1102 with open(output_filename, 'wb') as file: 1103 file.write(response.content) 1104 if verbose: 1105 print(f'Wrote file to {output_filename}') 1106 else: 1107 raise Exception(f"Failed to download the file. Status code: {response.status_code}") 1108 1109 1110def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1111 """ 1112 Helper method to read a file in chunks. 1113 1114 Args: 1115 file_path (str): 1116 Local path to the file to read. 1117 chunk_size (int, optional): 1118 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1119 1120 Yields: 1121 bytes: The next chunk of the file. 1122 1123 Raises: 1124 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1125 """ 1126 with open(file_path, 'rb') as f: 1127 while True: 1128 chunk = f.read(chunk_size) 1129 if chunk: 1130 yield chunk 1131 else: 1132 break 1133 1134 1135def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1136 """ 1137 Get all artifacts in the organization. Uses pagination to get all results. 1138 1139 Args: 1140 token (str): 1141 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1142 organization_context (str): 1143 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1144 artifact_id (str, optional): 1145 An optional Artifact ID if this is used to get a single artifact, by default None 1146 business_unit_id (str, optional): 1147 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1148 1149 Raises: 1150 Exception: Raised if the query fails. 1151 1152 Returns: 1153 list: List of Artifact Objects 1154 """ 1155 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1156 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets') 1157 1158 1159def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1160 """ 1161 Gets all assets in the organization. Uses pagination to get all results. 1162 1163 Args: 1164 token (str): 1165 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1166 organization_context (str): 1167 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1168 asset_id (str, optional): 1169 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1170 business_unit_id (str, optional): 1171 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1172 1173 Raises: 1174 Exception: Raised if the query fails. 1175 1176 Returns: 1177 list: List of Asset Objects 1178 """ 1179 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1180 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1181 1182 1183def get_all_asset_versions(token, organization_context): 1184 """ 1185 Get all asset versions in the organization. Uses pagination to get all results. 1186 1187 Args: 1188 token (str): 1189 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1190 organization_context (str): 1191 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1192 1193 Raises: 1194 Exception: Raised if the query fails. 1195 1196 Returns: 1197 list: List of AssetVersion Objects 1198 """ 1199 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1200 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions') 1201 1202 1203def get_all_asset_versions_for_product(token, organization_context, product_id): 1204 """ 1205 Get all asset versions for a product. Uses pagination to get all results. 1206 1207 Args: 1208 token (str): 1209 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1210 organization_context (str): 1211 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1212 product_id (str): 1213 The Product ID to get asset versions for 1214 1215 Returns: 1216 list: List of AssetVersion Objects 1217 """ 1218 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1219 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1220 1221 1222def get_all_business_units(token, organization_context): 1223 """ 1224 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1225 1226 Args: 1227 token (str): 1228 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1229 organization_context (str): 1230 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1231 1232 Raises: 1233 Exception: Raised if the query fails. 1234 1235 Returns: 1236 list: List of Group Objects 1237 """ 1238 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1239 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups') 1240 1241 1242def get_all_organizations(token, organization_context): 1243 """ 1244 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1245 1246 Args: 1247 token (str): 1248 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1249 organization_context (str): 1250 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1251 1252 Raises: 1253 Exception: Raised if the query fails. 1254 1255 Returns: 1256 list: List of Organization Objects 1257 """ 1258 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1259 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations') 1260 1261 1262def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1263 """ 1264 Get all results from a paginated GraphQL query 1265 1266 Args: 1267 token (str): 1268 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1269 organization_context (str): 1270 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1271 query (str): 1272 The GraphQL query string 1273 variables (dict, optional): 1274 Variables to be used in the GraphQL query, by default None 1275 field (str, required): 1276 The field in the response JSON that contains the results 1277 limit (int, Optional): 1278 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1279 1280 Raises: 1281 Exception: If the response status code is not 200, or if the field is not in the response JSON 1282 1283 Returns: 1284 list: List of results 1285 """ 1286 1287 if not field: 1288 raise Exception("Error: field is required") 1289 if limit and limit > 1000: 1290 raise Exception("Error: limit cannot be greater than 1000") 1291 if limit and limit < 1: 1292 raise Exception("Error: limit cannot be less than 1") 1293 if not variables["first"]: 1294 raise Exception("Error: first is required") 1295 if variables["first"] < 1: 1296 raise Exception("Error: first cannot be less than 1") 1297 if variables["first"] > 1000: 1298 raise Exception("Error: limit cannot be greater than 1000") 1299 1300 # query the API for the first page of results 1301 response_data = send_graphql_query(token, organization_context, query, variables) 1302 1303 # if there are no results, return an empty list 1304 if not response_data: 1305 return [] 1306 1307 # create a list to store the results 1308 results = [] 1309 1310 # add the first page of results to the list 1311 if field in response_data['data']: 1312 results.extend(response_data['data'][field]) 1313 else: 1314 raise Exception(f"Error: {field} not in response JSON") 1315 1316 if len(response_data['data'][field]) > 0: 1317 # get the cursor from the last entry in the list 1318 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1319 1320 while cursor: 1321 if limit and len(results) == limit: 1322 break 1323 1324 variables['after'] = cursor 1325 1326 # add the next page of results to the list 1327 response_data = send_graphql_query(token, organization_context, query, variables) 1328 results.extend(response_data['data'][field]) 1329 1330 try: 1331 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1332 except IndexError: 1333 # when there is no additional cursor, stop getting more pages 1334 cursor = None 1335 1336 return results 1337 1338 1339def get_all_products(token, organization_context): 1340 """ 1341 Get all products in the organization. Uses pagination to get all results. 1342 1343 Args: 1344 token (str): 1345 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1346 organization_context (str): 1347 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1348 1349 Raises: 1350 Exception: Raised if the query fails. 1351 1352 Returns: 1353 list: List of Product Objects 1354 1355 .. deprecated:: 0.1.4. Use get_products instead. 1356 """ 1357 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1358 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1359 queries.ALL_PRODUCTS['variables'], 'allProducts') 1360 1361 1362def get_all_users(token, organization_context): 1363 """ 1364 Get all users in the organization. Uses pagination to get all results. 1365 1366 Args: 1367 token (str): 1368 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1369 organization_context (str): 1370 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1371 1372 Raises: 1373 Exception: Raised if the query fails. 1374 1375 Returns: 1376 list: List of User Objects 1377 """ 1378 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1379 queries.ALL_USERS['variables'], 'allUsers') 1380 1381 1382def get_artifact_context(token, organization_context, artifact_id): 1383 """ 1384 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1385 1386 Args: 1387 token (str): 1388 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1389 organization_context (str): 1390 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1391 1392 Raises: 1393 Exception: Raised if the query fails. 1394 1395 Returns: 1396 dict: Artifact Context Object 1397 """ 1398 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1399 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1400 1401 return artifact[0]['ctx'] 1402 1403 1404def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1405 """ 1406 Gets assets in the organization. Uses pagination to get all results. 1407 1408 Args: 1409 token (str): 1410 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1411 organization_context (str): 1412 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1413 asset_id (str, optional): 1414 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1415 business_unit_id (str, optional): 1416 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1417 1418 Raises: 1419 Exception: Raised if the query fails. 1420 1421 Returns: 1422 list: List of Asset Objects 1423 """ 1424 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1425 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets') 1426 1427 1428def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1429 """ 1430 Gets asset versions in the organization. Uses pagination to get all results. 1431 1432 Args: 1433 token (str): 1434 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1435 organization_context (str): 1436 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1437 asset_version_id (str, optional): 1438 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1439 asset_id (str, optional): 1440 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1441 business_unit_id (str, optional): 1442 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1443 1444 Raises: 1445 Exception: Raised if the query fails. 1446 1447 Returns: 1448 list: List of AssetVersion Objects 1449 """ 1450 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1451 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1452 asset_id=asset_id, 1453 business_unit_id=business_unit_id), 1454 'allAssetVersions') 1455 1456 1457def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1458 """ 1459 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1460 1461 Args: 1462 client_id (str): 1463 CLIENT_ID as specified in the API documentation 1464 client_secret (str): 1465 CLIENT_SECRET as specified in the API documentation 1466 token_url (str, optional): 1467 Token URL, by default TOKEN_URL 1468 audience (str, optional): 1469 Audience, by default AUDIENCE 1470 1471 Raises: 1472 Exception: If the response status code is not 200 1473 1474 Returns: 1475 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1476 """ 1477 payload = { 1478 "client_id": client_id, 1479 "client_secret": client_secret, 1480 "audience": AUDIENCE, 1481 "grant_type": "client_credentials" 1482 } 1483 1484 headers = { 1485 'content-type': "application/json" 1486 } 1487 1488 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1489 if response.status_code == 200: 1490 auth_token = response.json()['access_token'] 1491 else: 1492 raise Exception(f"Error: {response.status_code} - {response.text}") 1493 1494 return auth_token 1495 1496 1497def get_findings( 1498 token, 1499 organization_context, 1500 asset_version_id=None, 1501 finding_id=None, 1502 category=None, 1503 status=None, 1504 severity=None, 1505 count=False, 1506 limit=None, 1507): 1508 """ 1509 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1510 Args: 1511 token (str): 1512 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1513 organization_context (str): 1514 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1515 asset_version_id (str, optional): 1516 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1517 finding_id (str, optional): 1518 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1519 category (str, optional): 1520 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1521 This can be a single string, or an array of values. 1522 status (str, optional): 1523 The status of Findings to return. 1524 severity (str, optional): 1525 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1526 count (bool, optional): 1527 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1528 limit (int, optional): 1529 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1530 1531 Raises: 1532 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1533 1534 Returns: 1535 list: List of Finding Objects 1536 """ 1537 1538 if limit and limit > 1000: 1539 raise Exception("Error: limit must be less than 1000") 1540 if limit and limit < 1: 1541 raise Exception("Error: limit must be greater than 0") 1542 1543 if count: 1544 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1545 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1546 finding_id=finding_id, category=category, 1547 status=status, severity=severity, 1548 limit=limit))["data"]["_allFindingsMeta"] 1549 else: 1550 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1551 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1552 finding_id=finding_id, category=category, 1553 status=status, severity=severity, 1554 limit=limit), 'allFindings', limit=limit) 1555 1556 1557def get_product_asset_versions(token, organization_context, product_id=None): 1558 """ 1559 Gets all the asset versions for a product. 1560 Args: 1561 token (str): 1562 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1563 organization_context (str): 1564 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1565 product_id (str, optional): 1566 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1567 Raises: 1568 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1569 Returns: 1570 list: List of AssetVersion Objects 1571 """ 1572 if not product_id: 1573 raise Exception("Product ID is required") 1574 1575 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1576 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts') 1577 1578 1579def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1580 """ 1581 Gets all the products for the specified business unit. 1582 Args: 1583 token (str): 1584 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1585 organization_context (str): 1586 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1587 product_id (str, optional): 1588 Product ID to get. If not provided, will get all products in the organization. 1589 business_unit_id (str, optional): 1590 Business Unit ID to get products for. If not provided, will get all products in the organization. 1591 Raises: 1592 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1593 Returns: 1594 list: List of Product Objects 1595 """ 1596 1597 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1598 queries.GET_PRODUCTS['variables'](product_id=product_id, 1599 business_unit_id=business_unit_id), 1600 'allProducts') 1601 1602 1603def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1604 report_subtype=None, verbose=False) -> str: 1605 """ 1606 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1607 This may take several minutes to complete, depending on the size of the report. 1608 1609 Args: 1610 token (str): 1611 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1612 organization_context (str): 1613 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1614 asset_version_id (str, optional): 1615 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1616 product_id (str, optional): 1617 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1618 report_type (str, required): 1619 The file type of the report to download. Valid values are "CSV" and "PDF". 1620 report_subtype (str, required): 1621 The type of report to download. Based on available reports for the `report_type` specified 1622 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1623 Valid values for PDF are "RISK_SUMMARY". 1624 verbose (bool, optional): 1625 If True, print additional information to the console. Defaults to False. 1626 """ 1627 if not report_type: 1628 raise ValueError("Report Type is required") 1629 if not report_subtype: 1630 raise ValueError("Report Subtype is required") 1631 if not asset_version_id and not product_id: 1632 raise ValueError("Asset Version ID or Product ID is required") 1633 1634 if asset_version_id and product_id: 1635 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1636 1637 if report_type not in ["CSV", "PDF"]: 1638 raise Exception(f"Report Type {report_type} not supported") 1639 1640 if report_type == "CSV": 1641 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1642 raise Exception(f"Report Subtype {report_subtype} not supported") 1643 1644 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1645 report_type=report_type, report_subtype=report_subtype) 1646 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1647 report_type=report_type, report_subtype=report_subtype) 1648 1649 response_data = send_graphql_query(token, organization_context, mutation, variables) 1650 if verbose: 1651 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1652 1653 # get exportJobId from the result 1654 if asset_version_id: 1655 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1656 elif product_id: 1657 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1658 else: 1659 raise Exception( 1660 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1661 1662 if verbose: 1663 print(f'Export Job ID: {export_job_id}') 1664 1665 if report_type == "PDF": 1666 if report_subtype not in ["RISK_SUMMARY"]: 1667 raise Exception(f"Report Subtype {report_subtype} not supported") 1668 1669 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1670 report_type=report_type, report_subtype=report_subtype) 1671 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1672 report_type=report_type, report_subtype=report_subtype) 1673 1674 response_data = send_graphql_query(token, organization_context, mutation, variables) 1675 if verbose: 1676 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1677 1678 # get exportJobId from the result 1679 if asset_version_id: 1680 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1681 elif product_id: 1682 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1683 else: 1684 raise Exception( 1685 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1686 1687 if verbose: 1688 print(f'Export Job ID: {export_job_id}') 1689 1690 if not export_job_id: 1691 raise Exception( 1692 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1693 1694 # poll the API until the export job is complete 1695 sleep_time = 10 1696 total_time = 0 1697 if verbose: 1698 print(f'Polling every {sleep_time} seconds for export job to complete') 1699 1700 while True: 1701 time.sleep(sleep_time) 1702 total_time += sleep_time 1703 if verbose: 1704 print(f'Total time elapsed: {total_time} seconds') 1705 1706 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1707 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1708 1709 response_data = send_graphql_query(token, organization_context, query, variables) 1710 1711 if verbose: 1712 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1713 1714 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1715 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1716 if verbose: 1717 print( 1718 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1719 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1720 1721 1722def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1723 verbose=False) -> str: 1724 """ 1725 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1726 This may take several minutes to complete, depending on the size of SBOM. 1727 1728 Args: 1729 token (str): 1730 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1731 organization_context (str): 1732 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1733 sbom_type (str, required): 1734 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1735 sbom_subtype (str, required): 1736 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1737 asset_version_id (str, required): 1738 Asset Version ID to download the SBOM for. 1739 verbose (bool, optional): 1740 If True, print additional information to the console. Defaults to False. 1741 1742 Raises: 1743 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1744 Exception: Raised if the query fails. 1745 1746 Returns: 1747 str: URL to download the SBOM from. 1748 """ 1749 1750 if not sbom_type: 1751 raise ValueError("SBOM Type is required") 1752 if not sbom_subtype: 1753 raise ValueError("SBOM Subtype is required") 1754 if not asset_version_id: 1755 raise ValueError("Asset Version ID is required") 1756 1757 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1758 raise Exception(f"SBOM Type {sbom_type} not supported") 1759 1760 if sbom_type == "CYCLONEDX": 1761 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1762 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1763 1764 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1765 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1766 1767 response_data = send_graphql_query(token, organization_context, mutation, variables) 1768 if verbose: 1769 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1770 1771 # get exportJobId from the result 1772 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1773 if verbose: 1774 print(f'Export Job ID: {export_job_id}') 1775 1776 if sbom_type == "SPDX": 1777 if sbom_subtype not in ["SBOM_ONLY"]: 1778 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1779 1780 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1781 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1782 1783 response_data = send_graphql_query(token, organization_context, mutation, variables) 1784 if verbose: 1785 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1786 1787 # get exportJobId from the result 1788 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1789 if verbose: 1790 print(f'Export Job ID: {export_job_id}') 1791 1792 if not export_job_id: 1793 raise Exception( 1794 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1795 1796 # poll the API until the export job is complete 1797 sleep_time = 10 1798 total_time = 0 1799 if verbose: 1800 print(f'Polling every {sleep_time} seconds for export job to complete') 1801 while True: 1802 time.sleep(sleep_time) 1803 total_time += sleep_time 1804 if verbose: 1805 print(f'Total time elapsed: {total_time} seconds') 1806 1807 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1808 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1809 1810 response_data = send_graphql_query(token, organization_context, query, variables) 1811 1812 if verbose: 1813 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1814 1815 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1816 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1817 if verbose: 1818 print( 1819 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1820 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink'] 1821 1822 1823def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1824 """ 1825 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1826 Args: 1827 token (str): 1828 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1829 organization_context (str): 1830 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1831 asset_version_id (str, optional): 1832 Asset Version ID to get software components for. 1833 type (str, optional): 1834 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1835 Raises: 1836 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1837 Returns: 1838 list: List of Software Component Objects 1839 """ 1840 if not asset_version_id: 1841 raise Exception("Asset Version ID is required") 1842 1843 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1844 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1845 type=type), 1846 'allSoftwareComponentInstances') 1847 1848 1849def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1850 case_sensitive=False) -> list: 1851 """ 1852 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1853 Search Methods: EXACT or CONTAINS 1854 An exact match will return only the software component whose name matches the name exactly. 1855 A contains match will return all software components whose name contains the search string. 1856 1857 Args: 1858 token (str): 1859 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1860 organization_context (str): 1861 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1862 name (str, required): 1863 Name of the software component to search for. 1864 version (str, optional): 1865 Version of the software component to search for. If not specified, will search for all versions of the software component. 1866 asset_version_id (str, optional): 1867 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1868 search_method (str, optional): 1869 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1870 case_sensitive (bool, optional): 1871 Whether or not to perform a case sensitive search. Defaults to False. 1872 Raises: 1873 ValueError: Raised if name is not provided. 1874 Exception: Raised if the query fails. 1875 Returns: 1876 list: List of SoftwareComponentInstance Objects 1877 """ 1878 if asset_version_id: 1879 query = """ 1880query GetSoftwareComponentInstances_SDK( 1881 $filter: SoftwareComponentInstanceFilter 1882 $after: String 1883 $first: Int 1884) { 1885 allSoftwareComponentInstances( 1886 filter: $filter 1887 after: $after 1888 first: $first 1889 ) { 1890 _cursor 1891 id 1892 name 1893 version 1894 originalComponents { 1895 id 1896 name 1897 version 1898 } 1899 } 1900} 1901""" 1902 else: 1903 # gets the asset version info that contains the software component 1904 query = """ 1905query GetSoftwareComponentInstances_SDK( 1906 $filter: SoftwareComponentInstanceFilter 1907 $after: String 1908 $first: Int 1909) { 1910 allSoftwareComponentInstances( 1911 filter: $filter 1912 after: $after 1913 first: $first 1914 ) { 1915 _cursor 1916 id 1917 name 1918 version 1919 assetVersion { 1920 id 1921 name 1922 asset { 1923 id 1924 name 1925 } 1926 } 1927 } 1928} 1929""" 1930 1931 variables = { 1932 "filter": { 1933 "mergedComponentRefId": None 1934 }, 1935 "after": None, 1936 "first": 100 1937 } 1938 1939 if asset_version_id: 1940 variables["filter"]["assetVersionRefId"] = asset_version_id 1941 1942 if search_method == 'EXACT': 1943 if case_sensitive: 1944 variables["filter"]["name"] = name 1945 else: 1946 variables["filter"]["name_like"] = name 1947 elif search_method == 'CONTAINS': 1948 variables["filter"]["name_contains"] = name 1949 1950 if version: 1951 if search_method == 'EXACT': 1952 variables["filter"]["version"] = version 1953 elif search_method == 'CONTAINS': 1954 variables["filter"]["version_contains"] = version 1955 1956 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1957 field="allSoftwareComponentInstances") 1958 1959 return records 1960 1961 1962def send_graphql_query(token, organization_context, query, variables=None): 1963 """ 1964 Send a GraphQL query to the API 1965 1966 Args: 1967 token (str): 1968 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1969 organization_context (str): 1970 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1971 query (str): 1972 The GraphQL query string 1973 variables (dict, optional): 1974 Variables to be used in the GraphQL query, by default None 1975 1976 Raises: 1977 Exception: If the response status code is not 200 1978 1979 Returns: 1980 dict: Response JSON 1981 """ 1982 headers = { 1983 'Content-Type': 'application/json', 1984 'Authorization': f'Bearer {token}', 1985 'Organization-Context': organization_context 1986 } 1987 data = { 1988 'query': query, 1989 'variables': variables 1990 } 1991 1992 response = requests.post(API_URL, headers=headers, json=data) 1993 1994 if response.status_code == 200: 1995 thejson = response.json() 1996 1997 if "errors" in thejson: 1998 raise Exception(f"Error: {thejson['errors']}") 1999 2000 return thejson 2001 else: 2002 raise Exception(f"Error: {response.status_code} - {response.text}") 2003 2004 2005def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2006 justification=None, response=None, comment=None): 2007 """ 2008 Updates the status of a findings or multiple findings. This is a blocking call. 2009 2010 Args: 2011 token (str): 2012 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2013 organization_context (str): 2014 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2015 user_id (str, required): 2016 User ID to update the finding status for. 2017 finding_ids (str, required): 2018 Finding ID to update the status for. 2019 status (str, required): 2020 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2021 justification (str, optional): 2022 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2023 response (str, optional): 2024 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2025 comment (str, optional): 2026 Optional comment to add to the finding status update. 2027 2028 Raises: 2029 ValueError: Raised if required parameters are not provided. 2030 Exception: Raised if the query fails. 2031 2032 Returns: 2033 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2034 """ 2035 if not user_id: 2036 raise ValueError("User Id is required") 2037 if not finding_ids: 2038 raise ValueError("Finding Ids is required") 2039 if not status: 2040 raise ValueError("Status is required") 2041 2042 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2043 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2044 justification=justification, response=response, 2045 comment=comment) 2046 2047 return send_graphql_query(token, organization_context, mutation, variables) 2048 2049 2050def upload_file_for_binary_analysis( 2051 token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False 2052): 2053 """ 2054 Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. 2055 NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that. 2056 2057 Args: 2058 token (str): 2059 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2060 organization_context (str): 2061 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2062 test_id (str, required): 2063 Test ID to upload the file for. 2064 file_path (str, required): 2065 Local path to the file to upload. 2066 chunk_size (int, optional): 2067 The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB. 2068 quick_scan (bool, optional): 2069 If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation. 2070 2071 Raises: 2072 ValueError: Raised if test_id or file_path are not provided. 2073 Exception: Raised if the query fails. 2074 2075 Returns: 2076 dict: The response from the GraphQL query, a completeMultipartUpload Object. 2077 """ 2078 # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation 2079 if not test_id: 2080 raise ValueError("Test Id is required") 2081 if not file_path: 2082 raise ValueError("File Path is required") 2083 if chunk_size < MIN_CHUNK_SIZE: 2084 raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes") 2085 if chunk_size >= MAX_CHUNK_SIZE: 2086 raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes") 2087 2088 # Start Multi-part Upload 2089 graphql_query = """ 2090 mutation Start_SDK($testId: ID!) { 2091 startMultipartUploadV2(testId: $testId) { 2092 uploadId 2093 key 2094 } 2095 } 2096 """ 2097 2098 variables = { 2099 "testId": test_id 2100 } 2101 2102 response = send_graphql_query(token, organization_context, graphql_query, variables) 2103 2104 upload_id = response['data']['startMultipartUploadV2']['uploadId'] 2105 upload_key = response['data']['startMultipartUploadV2']['key'] 2106 2107 # if the file is greater than max chunk size (or 5 GB), split the file in chunks, 2108 # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part) 2109 # and upload the file to the returned upload URL 2110 i = 0 2111 part_data = [] 2112 for chunk in file_chunks(file_path, chunk_size): 2113 i = i + 1 2114 graphql_query = """ 2115 mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) { 2116 generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) { 2117 key 2118 uploadUrl 2119 } 2120 } 2121 """ 2122 2123 variables = { 2124 "partNumber": i, 2125 "uploadId": upload_id, 2126 "uploadKey": upload_key 2127 } 2128 2129 response = send_graphql_query(token, organization_context, graphql_query, variables) 2130 2131 chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl'] 2132 2133 # upload the chunk to the upload URL 2134 response = upload_bytes_to_url(chunk_upload_url, chunk) 2135 2136 part_data.append({ 2137 "ETag": response.headers['ETag'], 2138 "PartNumber": i 2139 }) 2140 2141 # call completeMultipartUploadV2 2142 graphql_query = """ 2143 mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) { 2144 completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) { 2145 key 2146 } 2147 } 2148 """ 2149 2150 variables = { 2151 "partData": part_data, 2152 "uploadId": upload_id, 2153 "uploadKey": upload_key 2154 } 2155 2156 response = send_graphql_query(token, organization_context, graphql_query, variables) 2157 2158 # get key from the result 2159 key = response['data']['completeMultipartUploadV2']['key'] 2160 2161 variables = { 2162 "key": key, 2163 "testId": test_id 2164 } 2165 2166 # call launchBinaryUploadProcessing 2167 if quick_scan: 2168 graphql_query = """ 2169 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) { 2170 launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) { 2171 key 2172 } 2173 } 2174 """ 2175 variables["configurationOptions"] = ["QUICK_SCAN"] 2176 else: 2177 graphql_query = """ 2178 mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) { 2179 launchBinaryUploadProcessing(key: $key, testId: $testId) { 2180 key 2181 } 2182 } 2183 """ 2184 2185 response = send_graphql_query(token, organization_context, graphql_query, variables) 2186 2187 return response['data'] 2188 2189 2190def upload_test_results_file(token, organization_context, test_id=None, file_path=None): 2191 """ 2192 Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that. 2193 2194 Args: 2195 token (str): 2196 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 2197 organization_context (str): 2198 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2199 test_id (str, required): 2200 Test ID to upload the file for. 2201 file_path (str, required): 2202 Local path to the file to upload. 2203 2204 Raises: 2205 ValueError: Raised if test_id or file_path are not provided. 2206 Exception: Raised if the query fails. 2207 2208 Returns: 2209 dict: The response from the GraphQL query, a completeTestResultUpload Object. 2210 """ 2211 if not test_id: 2212 raise ValueError("Test Id is required") 2213 if not file_path: 2214 raise ValueError("File Path is required") 2215 2216 # Gerneate Test Result Upload URL 2217 graphql_query = """ 2218 mutation GenerateTestResultUploadUrl_SDK($testId: ID!) { 2219 generateSinglePartUploadUrl(testId: $testId) { 2220 uploadUrl 2221 key 2222 } 2223 } 2224 """ 2225 2226 variables = { 2227 "testId": test_id 2228 } 2229 2230 response = send_graphql_query(token, organization_context, graphql_query, variables) 2231 2232 # get the upload URL and key 2233 upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl'] 2234 key = response['data']['generateSinglePartUploadUrl']['key'] 2235 2236 # upload the file 2237 upload_file_to_url(upload_url, file_path) 2238 2239 # complete the upload 2240 graphql_query = """ 2241 mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) { 2242 launchTestResultProcessing(key: $key, testId: $testId) { 2243 key 2244 } 2245 } 2246 """ 2247 2248 variables = { 2249 "testId": test_id, 2250 "key": key 2251 } 2252 2253 response = send_graphql_query(token, organization_context, graphql_query, variables) 2254 return response['data'] 2255 2256 2257def upload_bytes_to_url(url, bytes): 2258 """ 2259 Used for uploading a file to a pre-signed S3 URL 2260 2261 Args: 2262 url (str): 2263 (Pre-signed S3) URL 2264 bytes (bytes): 2265 Bytes to upload 2266 2267 Raises: 2268 Exception: If the response status code is not 200 2269 2270 Returns: 2271 requests.Response: Response object 2272 """ 2273 response = requests.put(url, data=bytes) 2274 2275 if response.status_code == 200: 2276 return response 2277 else: 2278 raise Exception(f"Error: {response.status_code} - {response.text}") 2279 2280 2281def upload_file_to_url(url, file_path): 2282 """ 2283 Used for uploading a file to a pre-signed S3 URL 2284 2285 Args: 2286 url (str): 2287 (Pre-signed S3) URL 2288 file_path (str): 2289 Local path to file to upload 2290 2291 Raises: 2292 Exception: If the response status code is not 200 2293 2294 Returns: 2295 requests.Response: Response object 2296 """ 2297 with open(file_path, 'rb') as file: 2298 response = requests.put(url, data=file) 2299 2300 if response.status_code == 200: 2301 return response 2302 else: 2303 raise Exception(f"Error: {response.status_code} - {response.text}")
DEFAULT CHUNK SIZE: 1000 MiB
MAX CHUNK SIZE: 2 GiB
MIN CHUNK SIZE: 5 MiB
28class UploadMethod(Enum): 29 """ 30 Enumeration class representing different upload methods. 31 32 Attributes: 33 WEB_APP_UI: Upload method via web application UI. 34 API: Upload method via API. 35 GITHUB_INTEGRATION: Upload method via GitHub integration. 36 AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration. 37 38 To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI 39 """ 40 WEB_APP_UI = "WEB_APP_UI" 41 API = "API" 42 GITHUB_INTEGRATION = "GITHUB_INTEGRATION" 43 AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
Enumeration class representing different upload methods.
Attributes:
- WEB_APP_UI: Upload method via web application UI.
- API: Upload method via API.
- GITHUB_INTEGRATION: Upload method via GitHub integration.
- AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
To use any value from this enumeration, use UploadMethod.
Inherited Members
- enum.Enum
- name
- value
46def create_artifact( 47 token, 48 organization_context, 49 business_unit_id=None, 50 created_by_user_id=None, 51 asset_version_id=None, 52 artifact_name=None, 53 product_id=None, 54): 55 """ 56 Create a new Artifact. 57 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 58 Please see the examples in the Github repository for more information: 59 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 60 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 61 62 Args: 63 token (str): 64 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 65 organization_context (str): 66 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 67 business_unit_id (str, required): 68 Business Unit ID to associate the artifact with. 69 created_by_user_id (str, required): 70 User ID of the user creating the artifact. 71 asset_version_id (str, required): 72 Asset Version ID to associate the artifact with. 73 artifact_name (str, required): 74 The name of the Artifact being created. 75 product_id (str, optional): 76 Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product. 77 78 Raises: 79 ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided. 80 Exception: Raised if the query fails. 81 82 Returns: 83 dict: createArtifact Object 84 """ 85 if not business_unit_id: 86 raise ValueError("Business unit ID is required") 87 if not created_by_user_id: 88 raise ValueError("Created by user ID is required") 89 if not asset_version_id: 90 raise ValueError("Asset version ID is required") 91 if not artifact_name: 92 raise ValueError("Artifact name is required") 93 94 graphql_query = """ 95 mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) { 96 createArtifact(input: $input) { 97 id 98 name 99 assetVersion { 100 id 101 name 102 asset { 103 id 104 name 105 } 106 } 107 createdBy { 108 id 109 email 110 } 111 ctx { 112 asset 113 products 114 businessUnits 115 } 116 } 117 } 118 """ 119 120 # Asset name, business unit context, and creating user are required 121 variables = { 122 "input": { 123 "name": artifact_name, 124 "createdBy": created_by_user_id, 125 "assetVersion": asset_version_id, 126 "ctx": { 127 "asset": asset_version_id, 128 "businessUnits": [business_unit_id] 129 } 130 } 131 } 132 133 if product_id is not None: 134 variables["input"]["ctx"]["products"] = product_id 135 136 response = send_graphql_query(token, organization_context, graphql_query, variables) 137 return response['data']
Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the artifact with.
- created_by_user_id (str, required): User ID of the user creating the artifact.
- asset_version_id (str, required): Asset Version ID to associate the artifact with.
- artifact_name (str, required): The name of the Artifact being created.
- product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createArtifact Object
140def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None): 141 """ 142 Create a new Asset. 143 144 Args: 145 token (str): 146 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 147 organization_context (str): 148 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 149 business_unit_id (str, required): 150 Business Unit ID to associate the asset with. 151 created_by_user_id (str, required): 152 User ID of the user creating the asset. 153 asset_name (str, required): 154 The name of the Asset being created. 155 product_id (str, optional): 156 Product ID to associate the asset with. If not specified, the asset will not be associated with a product. 157 158 Raises: 159 ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided. 160 Exception: Raised if the query fails. 161 162 Returns: 163 dict: createAsset Object 164 """ 165 if not business_unit_id: 166 raise ValueError("Business unit ID is required") 167 if not created_by_user_id: 168 raise ValueError("Created by user ID is required") 169 if not asset_name: 170 raise ValueError("Asset name is required") 171 172 graphql_query = """ 173 mutation CreateAssetMutation_SDK($input: CreateAssetInput!) { 174 createAsset(input: $input) { 175 id 176 name 177 dependentProducts { 178 id 179 name 180 } 181 group { 182 id 183 name 184 } 185 createdBy { 186 id 187 email 188 } 189 ctx { 190 asset 191 products 192 businessUnits 193 } 194 } 195 } 196 """ 197 198 # Asset name, business unit context, and creating user are required 199 variables = { 200 "input": { 201 "name": asset_name, 202 "group": business_unit_id, 203 "createdBy": created_by_user_id, 204 "ctx": { 205 "businessUnits": [business_unit_id] 206 } 207 } 208 } 209 210 if product_id is not None: 211 variables["input"]["ctx"]["products"] = product_id 212 213 response = send_graphql_query(token, organization_context, graphql_query, variables) 214 return response['data']
Create a new Asset.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset with.
- created_by_user_id (str, required): User ID of the user creating the asset.
- asset_name (str, required): The name of the Asset being created.
- product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAsset Object
217def create_asset_version( 218 token, 219 organization_context, 220 business_unit_id=None, 221 created_by_user_id=None, 222 asset_id=None, 223 asset_version_name=None, 224 product_id=None, 225): 226 """ 227 Create a new Asset Version. 228 229 Args: 230 token (str): 231 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 232 organization_context (str): 233 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 234 business_unit_id (str, required): 235 Business Unit ID to associate the asset version with. 236 created_by_user_id (str, required): 237 User ID of the user creating the asset version. 238 asset_id (str, required): 239 Asset ID to associate the asset version with. 240 asset_version_name (str, required): 241 The name of the Asset Version being created. 242 product_id (str, optional): 243 Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product. 244 245 Raises: 246 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 247 Exception: Raised if the query fails. 248 249 Returns: 250 dict: createAssetVersion Object 251 252 deprecated:: 0.1.7. Use create_asset_version_on_asset instead. 253 """ 254 warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2) 255 if not business_unit_id: 256 raise ValueError("Business unit ID is required") 257 if not created_by_user_id: 258 raise ValueError("Created by user ID is required") 259 if not asset_id: 260 raise ValueError("Asset ID is required") 261 if not asset_version_name: 262 raise ValueError("Asset version name is required") 263 264 graphql_query = """ 265 mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) { 266 createAssetVersion(input: $input) { 267 id 268 name 269 asset { 270 id 271 name 272 } 273 createdBy { 274 id 275 email 276 } 277 ctx { 278 asset 279 products 280 businessUnits 281 } 282 } 283 } 284 """ 285 286 # Asset name, business unit context, and creating user are required 287 variables = { 288 "input": { 289 "name": asset_version_name, 290 "createdBy": created_by_user_id, 291 "asset": asset_id, 292 "ctx": { 293 "asset": asset_id, 294 "businessUnits": [business_unit_id] 295 } 296 } 297 } 298 299 if product_id is not None: 300 variables["input"]["ctx"]["products"] = product_id 301 302 response = send_graphql_query(token, organization_context, graphql_query, variables) 303 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the asset version with.
- created_by_user_id (str, required): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
- product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
306def create_asset_version_on_asset( 307 token, 308 organization_context, 309 created_by_user_id=None, 310 asset_id=None, 311 asset_version_name=None, 312): 313 """ 314 Create a new Asset Version. 315 316 Args: 317 token (str): 318 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 319 organization_context (str): 320 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 321 created_by_user_id (str, optional): 322 User ID of the user creating the asset version. 323 asset_id (str, required): 324 Asset ID to associate the asset version with. 325 asset_version_name (str, required): 326 The name of the Asset Version being created. 327 328 Raises: 329 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided. 330 Exception: Raised if the query fails. 331 332 Returns: 333 dict: createAssetVersion Object 334 """ 335 if not asset_id: 336 raise ValueError("Asset ID is required") 337 if not asset_version_name: 338 raise ValueError("Asset version name is required") 339 340 graphql_query = """ 341 mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) { 342 createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) { 343 id 344 assetVersion { 345 id 346 } 347 } 348 } 349 """ 350 351 # Asset name, business unit context, and creating user are required 352 variables = {"assetVersionName": asset_version_name, "assetId": asset_id} 353 354 if created_by_user_id: 355 variables["createdByUserId"] = created_by_user_id 356 357 response = send_graphql_query(token, organization_context, graphql_query, variables) 358 return response['data']
Create a new Asset Version.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- created_by_user_id (str, optional): User ID of the user creating the asset version.
- asset_id (str, required): Asset ID to associate the asset version with.
- asset_version_name (str, required): The name of the Asset Version being created.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createAssetVersion Object
361def create_new_asset_version_artifact_and_test_for_upload( 362 token, 363 organization_context, 364 business_unit_id=None, 365 created_by_user_id=None, 366 asset_id=None, 367 version=None, 368 product_id=None, 369 test_type=None, 370 artifact_description=None, 371 upload_method: UploadMethod = UploadMethod.API, 372): 373 """ 374 Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. 375 This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases. 376 377 Args: 378 token (str): 379 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 380 organization_context (str): 381 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 382 business_unit_id (str, optional): 383 Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used. 384 created_by_user_id (str, optional): 385 User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used. 386 asset_id (str, required): 387 Asset ID to create the asset version for. If not provided, the default asset will be used. 388 version (str, required): 389 Version to create the asset version for. 390 product_id (str, optional): 391 Product ID to create the entities for. If not provided, the default product will be used. 392 test_type (str, required): 393 Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation. 394 artifact_description (str, optional): 395 Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used. 396 upload_method (UploadMethod, optional): 397 The method of uploading the test results. Default is UploadMethod.API. 398 399 400 Raises: 401 ValueError: Raised if asset_id or version are not provided. 402 Exception: Raised if the query fails. 403 404 Returns: 405 str: The Test ID of the newly created test that is used for uploading the file. 406 """ 407 if not asset_id: 408 raise ValueError("Asset ID is required") 409 if not version: 410 raise ValueError("Version is required") 411 412 assets = get_all_assets(token, organization_context, asset_id=asset_id) 413 asset = assets[0] 414 415 # get the asset name 416 asset_name = asset['name'] 417 418 # get the existing asset product IDs 419 asset_product_ids = asset['ctx']['products'] 420 421 # get the asset product ID 422 if product_id and product_id not in asset_product_ids: 423 asset_product_ids.append(product_id) 424 425 # if business_unit_id or created_by_user_id are not provided, get the existing asset 426 if not business_unit_id or not created_by_user_id: 427 if not business_unit_id: 428 business_unit_id = asset['group']['id'] 429 if not created_by_user_id: 430 created_by_user_id = asset['createdBy']['id'] 431 432 if not business_unit_id: 433 raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset") 434 if not created_by_user_id: 435 raise ValueError("Created By User ID is required and could not be retrieved from the existing asset") 436 437 # create the asset version 438 response = create_asset_version_on_asset( 439 token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version 440 ) 441 # get the asset version ID 442 asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id'] 443 444 # create the test 445 if test_type == "finite_state_binary_analysis": 446 # create the artifact 447 if not artifact_description: 448 artifact_description = "Binary" 449 binary_artifact_name = f"{asset_name} {version} - {artifact_description}" 450 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 451 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 452 artifact_name=binary_artifact_name, product_id=asset_product_ids) 453 454 # get the artifact ID 455 binary_artifact_id = response['createArtifact']['id'] 456 457 # create the test 458 test_name = f"{asset_name} {version} - Finite State Binary Analysis" 459 response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id, 460 created_by_user_id=created_by_user_id, asset_id=asset_id, 461 artifact_id=binary_artifact_id, product_id=asset_product_ids, 462 test_name=test_name, upload_method=upload_method) 463 test_id = response['createTest']['id'] 464 return test_id 465 466 else: 467 # create the artifact 468 if not artifact_description: 469 artifact_description = "Unspecified Artifact" 470 artifact_name = f"{asset_name} {version} - {artifact_description}" 471 response = create_artifact(token, organization_context, business_unit_id=business_unit_id, 472 created_by_user_id=created_by_user_id, asset_version_id=asset_version_id, 473 artifact_name=artifact_name, product_id=asset_product_ids) 474 475 # get the artifact ID 476 binary_artifact_id = response['createArtifact']['id'] 477 478 # create the test 479 test_name = f"{asset_name} {version} - {test_type}" 480 response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id, 481 created_by_user_id=created_by_user_id, asset_id=asset_id, 482 artifact_id=binary_artifact_id, product_id=asset_product_ids, 483 test_name=test_name, test_type=test_type, 484 upload_method=upload_method) 485 test_id = response['createTest']['id'] 486 return test_id
Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
- created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
- version (str, required): Version to create the asset version for.
- product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
- test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
- artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if asset_id or version are not provided.
- Exception: Raised if the query fails.
Returns:
str: The Test ID of the newly created test that is used for uploading the file.
489def create_new_asset_version_and_upload_binary( 490 token, 491 organization_context, 492 business_unit_id=None, 493 created_by_user_id=None, 494 asset_id=None, 495 version=None, 496 file_path=None, 497 product_id=None, 498 artifact_description=None, 499 quick_scan=False, 500 upload_method: UploadMethod = UploadMethod.API, 501): 502 """ 503 Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. 504 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 505 506 Args: 507 token (str): 508 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 509 organization_context (str): 510 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 511 business_unit_id (str, optional): 512 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 513 created_by_user_id (str, optional): 514 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 515 asset_id (str, required): 516 Asset ID to create the asset version for. 517 version (str, required): 518 Version to create the asset version for. 519 file_path (str, required): 520 Local path to the file to upload. 521 product_id (str, optional): 522 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists. 523 artifact_description (str, optional): 524 Description of the artifact. If not provided, the default is "Firmware Binary". 525 quick_scan (bool, optional): 526 If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation. 527 upload_method (UploadMethod, optional): 528 The method of uploading the test results. Default is UploadMethod.API. 529 530 Raises: 531 ValueError: Raised if asset_id, version, or file_path are not provided. 532 Exception: Raised if any of the queries fail. 533 534 Returns: 535 dict: The response from the GraphQL query, a createAssetVersion Object. 536 """ 537 if not asset_id: 538 raise ValueError("Asset ID is required") 539 if not version: 540 raise ValueError("Version is required") 541 if not file_path: 542 raise ValueError("File path is required") 543 544 # create the asset version and binary test 545 if not artifact_description: 546 artifact_description = "Firmware Binary" 547 binary_test_id = create_new_asset_version_artifact_and_test_for_upload( 548 token, 549 organization_context, 550 business_unit_id=business_unit_id, 551 created_by_user_id=created_by_user_id, 552 asset_id=asset_id, 553 version=version, 554 product_id=product_id, 555 test_type="finite_state_binary_analysis", 556 artifact_description=artifact_description, 557 upload_method=upload_method, 558 ) 559 560 # upload file for binary test 561 response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path, 562 quick_scan=quick_scan) 563 return response
Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Local path to the file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
- artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
- quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if asset_id, version, or file_path are not provided.
- Exception: Raised if any of the queries fail.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
566def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None, 567 created_by_user_id=None, asset_id=None, version=None, 568 file_path=None, product_id=None, test_type=None, 569 artifact_description="", upload_method: UploadMethod = UploadMethod.API): 570 """ 571 Creates a new Asset Version for an existing asset, and uploads test results for that asset version. 572 By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them. 573 574 Args: 575 token (str): 576 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 577 organization_context (str): 578 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 579 business_unit_id (str, optional): 580 Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used. 581 created_by_user_id (str, optional): 582 Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used. 583 asset_id (str, required): 584 Asset ID to create the asset version for. 585 version (str, required): 586 Version to create the asset version for. 587 file_path (str, required): 588 Path to the test results file to upload. 589 product_id (str, optional): 590 Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used. 591 test_type (str, required): 592 Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation. 593 artifact_description (str, optional): 594 Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used. 595 upload_method (UploadMethod, optional): 596 The method of uploading the test results. Default is UploadMethod.API. 597 598 Raises: 599 ValueError: If the asset_id, version, or file_path are not provided. 600 Exception: If the test_type is not a supported third party scanner type, or if the query fails. 601 602 Returns: 603 dict: The response from the GraphQL query, a createAssetVersion Object. 604 """ 605 if not asset_id: 606 raise ValueError("Asset ID is required") 607 if not version: 608 raise ValueError("Version is required") 609 if not file_path: 610 raise ValueError("File path is required") 611 if not test_type: 612 raise ValueError("Test type is required") 613 614 # create the asset version and test 615 test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context, 616 business_unit_id=business_unit_id, 617 created_by_user_id=created_by_user_id, 618 asset_id=asset_id, version=version, 619 product_id=product_id, test_type=test_type, 620 artifact_description=artifact_description, 621 upload_method=upload_method) 622 623 # upload test results file 624 response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path) 625 return response
Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
- created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
- asset_id (str, required): Asset ID to create the asset version for.
- version (str, required): Version to create the asset version for.
- file_path (str, required): Path to the test results file to upload.
- product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
- test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
- artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: If the asset_id, version, or file_path are not provided.
- Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:
dict: The response from the GraphQL query, a createAssetVersion Object.
628def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, 629 product_description=None, vendor_id=None, vendor_name=None): 630 """ 631 Create a new Product. 632 633 Args: 634 token (str): 635 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 636 organization_context (str): 637 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 638 business_unit_id (str, required): 639 Business Unit ID to associate the product with. 640 created_by_user_id (str, required): 641 User ID of the user creating the product. 642 product_name (str, required): 643 The name of the Product being created. 644 product_description (str, optional): 645 The description of the Product being created. 646 vendor_id (str, optional): 647 Vendor ID to associate the product with. If not specified, vendor_name must be provided. 648 vendor_name (str, optional): 649 Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist. 650 651 Raises: 652 ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided. 653 Exception: Raised if the query fails. 654 655 Returns: 656 dict: createProduct Object 657 """ 658 659 if not business_unit_id: 660 raise ValueError("Business unit ID is required") 661 if not created_by_user_id: 662 raise ValueError("Created by user ID is required") 663 if not product_name: 664 raise ValueError("Product name is required") 665 666 graphql_query = """ 667 mutation CreateProductMutation_SDK($input: CreateProductInput!) { 668 createProduct(input: $input) { 669 id 670 name 671 vendor { 672 name 673 } 674 group { 675 id 676 name 677 } 678 createdBy { 679 id 680 email 681 } 682 ctx { 683 businessUnit 684 } 685 } 686 } 687 """ 688 689 # Product name, business unit context, and creating user are required 690 variables = { 691 "input": { 692 "name": product_name, 693 "group": business_unit_id, 694 "createdBy": created_by_user_id, 695 "ctx": { 696 "businessUnit": business_unit_id 697 } 698 } 699 } 700 701 if product_description is not None: 702 variables["input"]["description"] = product_description 703 704 # If the vendor ID is specified, this will link the new product to the existing vendor 705 if vendor_id is not None: 706 variables["input"]["vendor"] = { 707 "id": vendor_id 708 } 709 710 # If the vendor name is specified, this will create a new vendor and link it to the new product 711 if vendor_name is not None: 712 variables["input"]["createVendor"] = { 713 "name": vendor_name 714 } 715 716 response = send_graphql_query(token, organization_context, graphql_query, variables) 717 718 return response['data']
Create a new Product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the product with.
- created_by_user_id (str, required): User ID of the user creating the product.
- product_name (str, required): The name of the Product being created.
- product_description (str, optional): The description of the Product being created.
- vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
- vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createProduct Object
721def create_test( 722 token, 723 organization_context, 724 business_unit_id=None, 725 created_by_user_id=None, 726 asset_id=None, 727 artifact_id=None, 728 test_name=None, 729 product_id=None, 730 test_type=None, 731 tools=[], 732 upload_method: UploadMethod = UploadMethod.API, 733): 734 """ 735 Create a new Test object for uploading files. 736 This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. 737 Please see the examples in the Github repository for more information: 738 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py 739 - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py 740 741 Args: 742 token (str): 743 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 744 organization_context (str): 745 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 746 business_unit_id (str, required): 747 Business Unit ID to associate the Test with. 748 created_by_user_id (str, required): 749 User ID of the user creating the Test. 750 asset_id (str, required): 751 Asset ID to associate the Test with. 752 artifact_id (str, required): 753 Artifact ID to associate the Test with. 754 test_name (str, required): 755 The name of the Test being created. 756 product_id (str, optional): 757 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 758 test_type (str, required): 759 The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis". 760 tools (list, optional): 761 List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test. 762 upload_method (UploadMethod, required): 763 The method of uploading the test results. 764 765 Raises: 766 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided. 767 Exception: Raised if the query fails. 768 769 Returns: 770 dict: createTest Object 771 """ 772 if not business_unit_id: 773 raise ValueError("Business unit ID is required") 774 if not created_by_user_id: 775 raise ValueError("Created by user ID is required") 776 if not asset_id: 777 raise ValueError("Asset ID is required") 778 if not artifact_id: 779 raise ValueError("Artifact ID is required") 780 if not test_name: 781 raise ValueError("Test name is required") 782 if not test_type: 783 raise ValueError("Test type is required") 784 785 graphql_query = """ 786 mutation CreateTestMutation_SDK($input: CreateTestInput!) { 787 createTest(input: $input) { 788 id 789 name 790 artifactUnderTest { 791 id 792 name 793 assetVersion { 794 id 795 name 796 asset { 797 id 798 name 799 dependentProducts { 800 id 801 name 802 } 803 } 804 } 805 } 806 createdBy { 807 id 808 email 809 } 810 ctx { 811 asset 812 products 813 businessUnits 814 } 815 uploadMethod 816 } 817 } 818 """ 819 820 # Asset name, business unit context, and creating user are required 821 variables = { 822 "input": { 823 "name": test_name, 824 "createdBy": created_by_user_id, 825 "artifactUnderTest": artifact_id, 826 "testResultFileFormat": test_type, 827 "ctx": { 828 "asset": asset_id, 829 "businessUnits": [business_unit_id] 830 }, 831 "tools": tools, 832 "uploadMethod": upload_method.value 833 } 834 } 835 836 if product_id is not None: 837 variables["input"]["ctx"]["products"] = product_id 838 839 response = send_graphql_query(token, organization_context, graphql_query, variables) 840 return response['data']
Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
- https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
- tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
- upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
843def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None, 844 asset_id=None, artifact_id=None, test_name=None, product_id=None, 845 upload_method: UploadMethod = UploadMethod.API): 846 """ 847 Create a new Test object for uploading files for Finite State Binary Analysis. 848 849 Args: 850 token (str): 851 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 852 organization_context (str): 853 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 854 business_unit_id (str, required): 855 Business Unit ID to associate the Test with. 856 created_by_user_id (str, required): 857 User ID of the user creating the Test. 858 asset_id (str, required): 859 Asset ID to associate the Test with. 860 artifact_id (str, required): 861 Artifact ID to associate the Test with. 862 test_name (str, required): 863 The name of the Test being created. 864 product_id (str, optional): 865 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 866 upload_method (UploadMethod, optional): 867 The method of uploading the test results. Default is UploadMethod.API. 868 869 Raises: 870 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 871 Exception: Raised if the query fails. 872 873 Returns: 874 dict: createTest Object 875 """ 876 tools = [ 877 { 878 "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.", 879 "name": "Finite State Binary Analysis" 880 } 881 ] 882 return create_test(token, organization_context, business_unit_id=business_unit_id, 883 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 884 test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis", 885 tools=tools, upload_method=upload_method)
Create a new Test object for uploading files for Finite State Binary Analysis.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
888def create_test_as_cyclone_dx( 889 token, 890 organization_context, 891 business_unit_id=None, 892 created_by_user_id=None, 893 asset_id=None, 894 artifact_id=None, 895 test_name=None, 896 product_id=None, 897 upload_method: UploadMethod = UploadMethod.API, 898): 899 """ 900 Create a new Test object for uploading CycloneDX files. 901 902 Args: 903 token (str): 904 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 905 organization_context (str): 906 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 907 business_unit_id (str, required): 908 Business Unit ID to associate the Test with. 909 created_by_user_id (str, required): 910 User ID of the user creating the Test. 911 asset_id (str, required): 912 Asset ID to associate the Test with. 913 artifact_id (str, required): 914 Artifact ID to associate the Test with. 915 test_name (str, required): 916 The name of the Test being created. 917 product_id (str, optional): 918 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 919 upload_method (UploadMethod, optional): 920 The method of uploading the test results. Default is UploadMethod.API. 921 922 Raises: 923 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 924 Exception: Raised if the query fails. 925 926 Returns: 927 dict: createTest Object 928 """ 929 return create_test(token, organization_context, business_unit_id=business_unit_id, 930 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 931 test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
Create a new Test object for uploading CycloneDX files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
934def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None, 935 asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, 936 upload_method: UploadMethod = UploadMethod.API): 937 """ 938 Create a new Test object for uploading Third Party Scanner files. 939 940 Args: 941 token (str): 942 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 943 organization_context (str): 944 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 945 business_unit_id (str, required): 946 Business Unit ID to associate the Test with. 947 created_by_user_id (str, required): 948 User ID of the user creating the Test. 949 asset_id (str, required): 950 Asset ID to associate the Test with. 951 artifact_id (str, required): 952 Artifact ID to associate the Test with. 953 test_name (str, required): 954 The name of the Test being created. 955 product_id (str, optional): 956 Product ID to associate the Test with. If not specified, the Test will not be associated with a product. 957 test_type (str, required): 958 Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation. 959 upload_method (UploadMethod, optional): 960 The method of uploading the test results. Default is UploadMethod.API. 961 962 Raises: 963 ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided. 964 Exception: Raised if the query fails. 965 966 Returns: 967 dict: createTest Object 968 """ 969 return create_test(token, organization_context, business_unit_id=business_unit_id, 970 created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, 971 test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
Create a new Test object for uploading Third Party Scanner files.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- business_unit_id (str, required): Business Unit ID to associate the Test with.
- created_by_user_id (str, required): User ID of the user creating the Test.
- asset_id (str, required): Asset ID to associate the Test with.
- artifact_id (str, required): Artifact ID to associate the Test with.
- test_name (str, required): The name of the Test being created.
- product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
- test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
- upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
- ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
- Exception: Raised if the query fails.
Returns:
dict: createTest Object
974def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None, 975 report_subtype=None, output_filename=None, verbose=False): 976 """ 977 Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 978 979 Args: 980 token (str): 981 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 982 organization_context (str): 983 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 984 asset_version_id (str, required): 985 The Asset Version ID to download the report for. 986 report_type (str, required): 987 The file type of the report to download. Valid values are "CSV" and "PDF". 988 report_subtype (str, required): 989 The type of report to download. Based on available reports for the `report_type` specified 990 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 991 Valid values for PDF are "RISK_SUMMARY". 992 output_filename (str, optional): 993 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 994 verbose (bool, optional): 995 If True, will print additional information to the console. Defaults to False. 996 997 Raises: 998 ValueError: Raised if required parameters are not provided. 999 Exception: Raised if the query fails. 1000 1001 Returns: 1002 None 1003 """ 1004 url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id, 1005 report_type=report_type, report_subtype=report_subtype, verbose=verbose) 1006 1007 # Send an HTTP GET request to the URL 1008 response = requests.get(url) 1009 1010 # Check if the request was successful (status code 200) 1011 if response.status_code == 200: 1012 # Open a local file in binary write mode and write the content to it 1013 if verbose: 1014 print("File downloaded successfully.") 1015 with open(output_filename, 'wb') as file: 1016 file.write(response.content) 1017 if verbose: 1018 print(f'Wrote file to {output_filename}') 1019 else: 1020 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, required): The Asset Version ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1023def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None, 1024 output_filename=None, verbose=False): 1025 """ 1026 Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large. 1027 1028 Args: 1029 token (str): 1030 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1031 organization_context (str): 1032 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1033 product_id (str, required): 1034 The Product ID to download the report for. 1035 report_type (str, required): 1036 The file type of the report to download. Valid values are "CSV". 1037 report_subtype (str, required): 1038 The type of report to download. Based on available reports for the `report_type` specified 1039 Valid values for CSV are "ALL_FINDINGS". 1040 output_filename (str, optional): 1041 The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type. 1042 verbose (bool, optional): 1043 If True, will print additional information to the console. Defaults to False. 1044 """ 1045 url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type, 1046 report_subtype=report_subtype, verbose=verbose) 1047 1048 # Send an HTTP GET request to the URL 1049 response = requests.get(url) 1050 1051 # Check if the request was successful (status code 200) 1052 if response.status_code == 200: 1053 # Open a local file in binary write mode and write the content to it 1054 if verbose: 1055 print("File downloaded successfully.") 1056 with open(output_filename, 'wb') as file: 1057 file.write(response.content) 1058 if verbose: 1059 print(f'Wrote file to {output_filename}') 1060 else: 1061 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, required): The Product ID to download the report for.
- report_type (str, required): The file type of the report to download. Valid values are "CSV".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS". - output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
1064def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, 1065 output_filename="sbom.json", verbose=False): 1066 """ 1067 Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large. 1068 1069 Args: 1070 token (str): 1071 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1072 organization_context (str): 1073 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1074 sbom_type (str, required): 1075 The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX". 1076 sbom_subtype (str, required): 1077 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY". 1078 asset_version_id (str, required): 1079 The Asset Version ID to download the SBOM for. 1080 output_filename (str, required): 1081 The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory. 1082 verbose (bool, optional): 1083 If True, will print additional information to the console. Defaults to False. 1084 1085 Raises: 1086 ValueError: Raised if required parameters are not provided. 1087 Exception: Raised if the query fails. 1088 1089 Returns: 1090 None 1091 """ 1092 url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, 1093 asset_version_id=asset_version_id, verbose=verbose) 1094 1095 # Send an HTTP GET request to the URL 1096 response = requests.get(url) 1097 1098 # Check if the request was successful (status code 200) 1099 if response.status_code == 200: 1100 # Open a local file in binary write mode and write the content to it 1101 if verbose: 1102 print("File downloaded successfully.") 1103 with open(output_filename, 'wb') as file: 1104 file.write(response.content) 1105 if verbose: 1106 print(f'Wrote file to {output_filename}') 1107 else: 1108 raise Exception(f"Failed to download the file. Status code: {response.status_code}")
Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
- asset_version_id (str, required): The Asset Version ID to download the SBOM for.
- output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
- verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if required parameters are not provided.
- Exception: Raised if the query fails.
Returns:
None
1111def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE): 1112 """ 1113 Helper method to read a file in chunks. 1114 1115 Args: 1116 file_path (str): 1117 Local path to the file to read. 1118 chunk_size (int, optional): 1119 The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE. 1120 1121 Yields: 1122 bytes: The next chunk of the file. 1123 1124 Raises: 1125 FileIO Exceptions: Raised if the file cannot be opened or read correctly. 1126 """ 1127 with open(file_path, 'rb') as f: 1128 while True: 1129 chunk = f.read(chunk_size) 1130 if chunk: 1131 yield chunk 1132 else: 1133 break
Helper method to read a file in chunks.
Arguments:
- file_path (str): Local path to the file to read.
- chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:
bytes: The next chunk of the file.
Raises:
- FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1136def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None): 1137 """ 1138 Get all artifacts in the organization. Uses pagination to get all results. 1139 1140 Args: 1141 token (str): 1142 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1143 organization_context (str): 1144 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1145 artifact_id (str, optional): 1146 An optional Artifact ID if this is used to get a single artifact, by default None 1147 business_unit_id (str, optional): 1148 An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None 1149 1150 Raises: 1151 Exception: Raised if the query fails. 1152 1153 Returns: 1154 list: List of Artifact Objects 1155 """ 1156 return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1157 queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
Get all artifacts in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
- business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Artifact Objects
1160def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None): 1161 """ 1162 Gets all assets in the organization. Uses pagination to get all results. 1163 1164 Args: 1165 token (str): 1166 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1167 organization_context (str): 1168 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1169 asset_id (str, optional): 1170 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1171 business_unit_id (str, optional): 1172 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1173 1174 Raises: 1175 Exception: Raised if the query fails. 1176 1177 Returns: 1178 list: List of Asset Objects 1179 """ 1180 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1181 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets all assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1184def get_all_asset_versions(token, organization_context): 1185 """ 1186 Get all asset versions in the organization. Uses pagination to get all results. 1187 1188 Args: 1189 token (str): 1190 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1191 organization_context (str): 1192 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1193 1194 Raises: 1195 Exception: Raised if the query fails. 1196 1197 Returns: 1198 list: List of AssetVersion Objects 1199 """ 1200 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1201 queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
Get all asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1204def get_all_asset_versions_for_product(token, organization_context, product_id): 1205 """ 1206 Get all asset versions for a product. Uses pagination to get all results. 1207 1208 Args: 1209 token (str): 1210 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1211 organization_context (str): 1212 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1213 product_id (str): 1214 The Product ID to get asset versions for 1215 1216 Returns: 1217 list: List of AssetVersion Objects 1218 """ 1219 return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'], 1220 queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Get all asset versions for a product. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str): The Product ID to get asset versions for
Returns:
list: List of AssetVersion Objects
1223def get_all_business_units(token, organization_context): 1224 """ 1225 Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results. 1226 1227 Args: 1228 token (str): 1229 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1230 organization_context (str): 1231 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1232 1233 Raises: 1234 Exception: Raised if the query fails. 1235 1236 Returns: 1237 list: List of Group Objects 1238 """ 1239 return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'], 1240 queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Group Objects
1243def get_all_organizations(token, organization_context): 1244 """ 1245 Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results. 1246 1247 Args: 1248 token (str): 1249 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1250 organization_context (str): 1251 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1252 1253 Raises: 1254 Exception: Raised if the query fails. 1255 1256 Returns: 1257 list: List of Organization Objects 1258 """ 1259 return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], 1260 queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Organization Objects
1263def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None): 1264 """ 1265 Get all results from a paginated GraphQL query 1266 1267 Args: 1268 token (str): 1269 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1270 organization_context (str): 1271 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1272 query (str): 1273 The GraphQL query string 1274 variables (dict, optional): 1275 Variables to be used in the GraphQL query, by default None 1276 field (str, required): 1277 The field in the response JSON that contains the results 1278 limit (int, Optional): 1279 The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000. 1280 1281 Raises: 1282 Exception: If the response status code is not 200, or if the field is not in the response JSON 1283 1284 Returns: 1285 list: List of results 1286 """ 1287 1288 if not field: 1289 raise Exception("Error: field is required") 1290 if limit and limit > 1000: 1291 raise Exception("Error: limit cannot be greater than 1000") 1292 if limit and limit < 1: 1293 raise Exception("Error: limit cannot be less than 1") 1294 if not variables["first"]: 1295 raise Exception("Error: first is required") 1296 if variables["first"] < 1: 1297 raise Exception("Error: first cannot be less than 1") 1298 if variables["first"] > 1000: 1299 raise Exception("Error: limit cannot be greater than 1000") 1300 1301 # query the API for the first page of results 1302 response_data = send_graphql_query(token, organization_context, query, variables) 1303 1304 # if there are no results, return an empty list 1305 if not response_data: 1306 return [] 1307 1308 # create a list to store the results 1309 results = [] 1310 1311 # add the first page of results to the list 1312 if field in response_data['data']: 1313 results.extend(response_data['data'][field]) 1314 else: 1315 raise Exception(f"Error: {field} not in response JSON") 1316 1317 if len(response_data['data'][field]) > 0: 1318 # get the cursor from the last entry in the list 1319 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1320 1321 while cursor: 1322 if limit and len(results) == limit: 1323 break 1324 1325 variables['after'] = cursor 1326 1327 # add the next page of results to the list 1328 response_data = send_graphql_query(token, organization_context, query, variables) 1329 results.extend(response_data['data'][field]) 1330 1331 try: 1332 cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor'] 1333 except IndexError: 1334 # when there is no additional cursor, stop getting more pages 1335 cursor = None 1336 1337 return results
Get all results from a paginated GraphQL query
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
- field (str, required): The field in the response JSON that contains the results
- limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
- Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:
list: List of results
1340def get_all_products(token, organization_context): 1341 """ 1342 Get all products in the organization. Uses pagination to get all results. 1343 1344 Args: 1345 token (str): 1346 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1347 organization_context (str): 1348 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1349 1350 Raises: 1351 Exception: Raised if the query fails. 1352 1353 Returns: 1354 list: List of Product Objects 1355 1356 .. deprecated:: 0.1.4. Use get_products instead. 1357 """ 1358 warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2) 1359 return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'], 1360 queries.ALL_PRODUCTS['variables'], 'allProducts')
Get all products in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Product Objects
Deprecated since version 0.1.4. Use get_products instead..
1363def get_all_users(token, organization_context): 1364 """ 1365 Get all users in the organization. Uses pagination to get all results. 1366 1367 Args: 1368 token (str): 1369 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1370 organization_context (str): 1371 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1372 1373 Raises: 1374 Exception: Raised if the query fails. 1375 1376 Returns: 1377 list: List of User Objects 1378 """ 1379 return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'], 1380 queries.ALL_USERS['variables'], 'allUsers')
Get all users in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of User Objects
1383def get_artifact_context(token, organization_context, artifact_id): 1384 """ 1385 Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts. 1386 1387 Args: 1388 token (str): 1389 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1390 organization_context (str): 1391 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1392 1393 Raises: 1394 Exception: Raised if the query fails. 1395 1396 Returns: 1397 dict: Artifact Context Object 1398 """ 1399 artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'], 1400 queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets') 1401 1402 return artifact[0]['ctx']
Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
- Exception: Raised if the query fails.
Returns:
dict: Artifact Context Object
1405def get_assets(token, organization_context, asset_id=None, business_unit_id=None): 1406 """ 1407 Gets assets in the organization. Uses pagination to get all results. 1408 1409 Args: 1410 token (str): 1411 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1412 organization_context (str): 1413 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1414 asset_id (str, optional): 1415 Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID. 1416 business_unit_id (str, optional): 1417 Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit. 1418 1419 Raises: 1420 Exception: Raised if the query fails. 1421 1422 Returns: 1423 list: List of Asset Objects 1424 """ 1425 return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], 1426 queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
Gets assets in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of Asset Objects
1429def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None): 1430 """ 1431 Gets asset versions in the organization. Uses pagination to get all results. 1432 1433 Args: 1434 token (str): 1435 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1436 organization_context (str): 1437 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1438 asset_version_id (str, optional): 1439 Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID. 1440 asset_id (str, optional): 1441 Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset. 1442 business_unit_id (str, optional): 1443 Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit. 1444 1445 Raises: 1446 Exception: Raised if the query fails. 1447 1448 Returns: 1449 list: List of AssetVersion Objects 1450 """ 1451 return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], 1452 queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, 1453 asset_id=asset_id, 1454 business_unit_id=business_unit_id), 1455 'allAssetVersions')
Gets asset versions in the organization. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
- asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
- business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
- Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
1458def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE): 1459 """ 1460 Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET 1461 1462 Args: 1463 client_id (str): 1464 CLIENT_ID as specified in the API documentation 1465 client_secret (str): 1466 CLIENT_SECRET as specified in the API documentation 1467 token_url (str, optional): 1468 Token URL, by default TOKEN_URL 1469 audience (str, optional): 1470 Audience, by default AUDIENCE 1471 1472 Raises: 1473 Exception: If the response status code is not 200 1474 1475 Returns: 1476 str: Auth token. Use this token as the Authorization header in subsequent API calls. 1477 """ 1478 payload = { 1479 "client_id": client_id, 1480 "client_secret": client_secret, 1481 "audience": AUDIENCE, 1482 "grant_type": "client_credentials" 1483 } 1484 1485 headers = { 1486 'content-type': "application/json" 1487 } 1488 1489 response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers) 1490 if response.status_code == 200: 1491 auth_token = response.json()['access_token'] 1492 else: 1493 raise Exception(f"Error: {response.status_code} - {response.text}") 1494 1495 return auth_token
Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
Arguments:
- client_id (str): CLIENT_ID as specified in the API documentation
- client_secret (str): CLIENT_SECRET as specified in the API documentation
- token_url (str, optional): Token URL, by default TOKEN_URL
- audience (str, optional): Audience, by default AUDIENCE
Raises:
- Exception: If the response status code is not 200
Returns:
str: Auth token. Use this token as the Authorization header in subsequent API calls.
1498def get_findings( 1499 token, 1500 organization_context, 1501 asset_version_id=None, 1502 finding_id=None, 1503 category=None, 1504 status=None, 1505 severity=None, 1506 count=False, 1507 limit=None, 1508): 1509 """ 1510 Gets all the Findings for an Asset Version. Uses pagination to get all results. 1511 Args: 1512 token (str): 1513 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1514 organization_context (str): 1515 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1516 asset_version_id (str, optional): 1517 Asset Version ID to get findings for. If not provided, will get all findings in the organization. 1518 finding_id (str, optional): 1519 The ID of a specific finding to get. If specified, will return only the finding with that ID. 1520 category (str, optional): 1521 The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. 1522 This can be a single string, or an array of values. 1523 status (str, optional): 1524 The status of Findings to return. 1525 severity (str, optional): 1526 The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings. 1527 count (bool, optional): 1528 If True, will return the count of findings instead of the findings themselves. Defaults to False. 1529 limit (int, optional): 1530 The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000. 1531 1532 Raises: 1533 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1534 1535 Returns: 1536 list: List of Finding Objects 1537 """ 1538 1539 if limit and limit > 1000: 1540 raise Exception("Error: limit must be less than 1000") 1541 if limit and limit < 1: 1542 raise Exception("Error: limit must be greater than 0") 1543 1544 if count: 1545 return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], 1546 queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, 1547 finding_id=finding_id, category=category, 1548 status=status, severity=severity, 1549 limit=limit))["data"]["_allFindingsMeta"] 1550 else: 1551 return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], 1552 queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, 1553 finding_id=finding_id, category=category, 1554 status=status, severity=severity, 1555 limit=limit), 'allFindings', limit=limit)
Gets all the Findings for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
- finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
- category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
- status (str, optional): The status of Findings to return.
- severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
- count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
- limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Finding Objects
1558def get_product_asset_versions(token, organization_context, product_id=None): 1559 """ 1560 Gets all the asset versions for a product. 1561 Args: 1562 token (str): 1563 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1564 organization_context (str): 1565 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1566 product_id (str, optional): 1567 Product ID to get asset versions for. If not provided, will get all asset versions in the organization. 1568 Raises: 1569 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1570 Returns: 1571 list: List of AssetVersion Objects 1572 """ 1573 if not product_id: 1574 raise Exception("Product ID is required") 1575 1576 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], 1577 queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
Gets all the asset versions for a product.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of AssetVersion Objects
1580def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list: 1581 """ 1582 Gets all the products for the specified business unit. 1583 Args: 1584 token (str): 1585 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1586 organization_context (str): 1587 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1588 product_id (str, optional): 1589 Product ID to get. If not provided, will get all products in the organization. 1590 business_unit_id (str, optional): 1591 Business Unit ID to get products for. If not provided, will get all products in the organization. 1592 Raises: 1593 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1594 Returns: 1595 list: List of Product Objects 1596 """ 1597 1598 return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'], 1599 queries.GET_PRODUCTS['variables'](product_id=product_id, 1600 business_unit_id=business_unit_id), 1601 'allProducts')
Gets all the products for the specified business unit.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
- business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Product Objects
1604def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None, 1605 report_subtype=None, verbose=False) -> str: 1606 """ 1607 Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. 1608 This may take several minutes to complete, depending on the size of the report. 1609 1610 Args: 1611 token (str): 1612 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1613 organization_context (str): 1614 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1615 asset_version_id (str, optional): 1616 Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required. 1617 product_id (str, optional): 1618 Product ID to download the report for. Either `asset_version_id` or `product_id` are required. 1619 report_type (str, required): 1620 The file type of the report to download. Valid values are "CSV" and "PDF". 1621 report_subtype (str, required): 1622 The type of report to download. Based on available reports for the `report_type` specified 1623 Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". 1624 Valid values for PDF are "RISK_SUMMARY". 1625 verbose (bool, optional): 1626 If True, print additional information to the console. Defaults to False. 1627 """ 1628 if not report_type: 1629 raise ValueError("Report Type is required") 1630 if not report_subtype: 1631 raise ValueError("Report Subtype is required") 1632 if not asset_version_id and not product_id: 1633 raise ValueError("Asset Version ID or Product ID is required") 1634 1635 if asset_version_id and product_id: 1636 raise ValueError("Asset Version ID and Product ID are mutually exclusive") 1637 1638 if report_type not in ["CSV", "PDF"]: 1639 raise Exception(f"Report Type {report_type} not supported") 1640 1641 if report_type == "CSV": 1642 if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]: 1643 raise Exception(f"Report Subtype {report_subtype} not supported") 1644 1645 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1646 report_type=report_type, report_subtype=report_subtype) 1647 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1648 report_type=report_type, report_subtype=report_subtype) 1649 1650 response_data = send_graphql_query(token, organization_context, mutation, variables) 1651 if verbose: 1652 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1653 1654 # get exportJobId from the result 1655 if asset_version_id: 1656 export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId'] 1657 elif product_id: 1658 export_job_id = response_data['data']['launchProductCSVExport']['exportJobId'] 1659 else: 1660 raise Exception( 1661 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1662 1663 if verbose: 1664 print(f'Export Job ID: {export_job_id}') 1665 1666 if report_type == "PDF": 1667 if report_subtype not in ["RISK_SUMMARY"]: 1668 raise Exception(f"Report Subtype {report_subtype} not supported") 1669 1670 mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id, 1671 report_type=report_type, report_subtype=report_subtype) 1672 variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id, 1673 report_type=report_type, report_subtype=report_subtype) 1674 1675 response_data = send_graphql_query(token, organization_context, mutation, variables) 1676 if verbose: 1677 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1678 1679 # get exportJobId from the result 1680 if asset_version_id: 1681 export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId'] 1682 elif product_id: 1683 export_job_id = response_data['data']['launchProductPdfExport']['exportJobId'] 1684 else: 1685 raise Exception( 1686 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1687 1688 if verbose: 1689 print(f'Export Job ID: {export_job_id}') 1690 1691 if not export_job_id: 1692 raise Exception( 1693 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1694 1695 # poll the API until the export job is complete 1696 sleep_time = 10 1697 total_time = 0 1698 if verbose: 1699 print(f'Polling every {sleep_time} seconds for export job to complete') 1700 1701 while True: 1702 time.sleep(sleep_time) 1703 total_time += sleep_time 1704 if verbose: 1705 print(f'Total time elapsed: {total_time} seconds') 1706 1707 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1708 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1709 1710 response_data = send_graphql_query(token, organization_context, query, variables) 1711 1712 if verbose: 1713 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1714 1715 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED': 1716 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1717 if verbose: 1718 print( 1719 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1720 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to download the report for. Either
asset_version_id
orproduct_id
are required. - product_id (str, optional): Product ID to download the report for. Either
asset_version_id
orproduct_id
are required. - report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
- report_subtype (str, required): The type of report to download. Based on available reports for the
report_type
specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY". - verbose (bool, optional): If True, print additional information to the console. Defaults to False.
1723def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, 1724 verbose=False) -> str: 1725 """ 1726 Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. 1727 This may take several minutes to complete, depending on the size of SBOM. 1728 1729 Args: 1730 token (str): 1731 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1732 organization_context (str): 1733 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1734 sbom_type (str, required): 1735 The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX". 1736 sbom_subtype (str, required): 1737 The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY". 1738 asset_version_id (str, required): 1739 Asset Version ID to download the SBOM for. 1740 verbose (bool, optional): 1741 If True, print additional information to the console. Defaults to False. 1742 1743 Raises: 1744 ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided. 1745 Exception: Raised if the query fails. 1746 1747 Returns: 1748 str: URL to download the SBOM from. 1749 """ 1750 1751 if not sbom_type: 1752 raise ValueError("SBOM Type is required") 1753 if not sbom_subtype: 1754 raise ValueError("SBOM Subtype is required") 1755 if not asset_version_id: 1756 raise ValueError("Asset Version ID is required") 1757 1758 if sbom_type not in ["CYCLONEDX", "SPDX"]: 1759 raise Exception(f"SBOM Type {sbom_type} not supported") 1760 1761 if sbom_type == "CYCLONEDX": 1762 if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]: 1763 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1764 1765 mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation'] 1766 variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1767 1768 response_data = send_graphql_query(token, organization_context, mutation, variables) 1769 if verbose: 1770 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1771 1772 # get exportJobId from the result 1773 export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId'] 1774 if verbose: 1775 print(f'Export Job ID: {export_job_id}') 1776 1777 if sbom_type == "SPDX": 1778 if sbom_subtype not in ["SBOM_ONLY"]: 1779 raise Exception(f"SBOM Subtype {sbom_subtype} not supported") 1780 1781 mutation = queries.LAUNCH_SPDX_EXPORT['mutation'] 1782 variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id) 1783 1784 response_data = send_graphql_query(token, organization_context, mutation, variables) 1785 if verbose: 1786 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1787 1788 # get exportJobId from the result 1789 export_job_id = response_data['data']['launchSpdxExport']['exportJobId'] 1790 if verbose: 1791 print(f'Export Job ID: {export_job_id}') 1792 1793 if not export_job_id: 1794 raise Exception( 1795 "Error: Export Job ID not found - this should not happen, please contact your Finite State representative") 1796 1797 # poll the API until the export job is complete 1798 sleep_time = 10 1799 total_time = 0 1800 if verbose: 1801 print(f'Polling every {sleep_time} seconds for export job to complete') 1802 while True: 1803 time.sleep(sleep_time) 1804 total_time += sleep_time 1805 if verbose: 1806 print(f'Total time elapsed: {total_time} seconds') 1807 1808 query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query'] 1809 variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id) 1810 1811 response_data = send_graphql_query(token, organization_context, query, variables) 1812 1813 if verbose: 1814 print(f'Response Data: {json.dumps(response_data, indent=4)}') 1815 1816 if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED": 1817 if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']: 1818 if verbose: 1819 print( 1820 f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}') 1821 return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
- sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
- asset_version_id (str, required): Asset Version ID to download the SBOM for.
- verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
- ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
- Exception: Raised if the query fails.
Returns:
str: URL to download the SBOM from.
1824def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list: 1825 """ 1826 Gets all the Software Components for an Asset Version. Uses pagination to get all results. 1827 Args: 1828 token (str): 1829 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 1830 organization_context (str): 1831 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1832 asset_version_id (str, optional): 1833 Asset Version ID to get software components for. 1834 type (str, optional): 1835 The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type 1836 Raises: 1837 Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible. 1838 Returns: 1839 list: List of Software Component Objects 1840 """ 1841 if not asset_version_id: 1842 raise Exception("Asset Version ID is required") 1843 1844 return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], 1845 queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, 1846 type=type), 1847 'allSoftwareComponentInstances')
Gets all the Software Components for an Asset Version. Uses pagination to get all results.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- asset_version_id (str, optional): Asset Version ID to get software components for.
- type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
- Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Software Component Objects
1850def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', 1851 case_sensitive=False) -> list: 1852 """ 1853 Searches the SBOM of a specific asset version or the entire organization for matching software components. 1854 Search Methods: EXACT or CONTAINS 1855 An exact match will return only the software component whose name matches the name exactly. 1856 A contains match will return all software components whose name contains the search string. 1857 1858 Args: 1859 token (str): 1860 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1861 organization_context (str): 1862 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1863 name (str, required): 1864 Name of the software component to search for. 1865 version (str, optional): 1866 Version of the software component to search for. If not specified, will search for all versions of the software component. 1867 asset_version_id (str, optional): 1868 Asset Version ID to search for software components in. If not specified, will search the entire organization. 1869 search_method (str, optional): 1870 Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT". 1871 case_sensitive (bool, optional): 1872 Whether or not to perform a case sensitive search. Defaults to False. 1873 Raises: 1874 ValueError: Raised if name is not provided. 1875 Exception: Raised if the query fails. 1876 Returns: 1877 list: List of SoftwareComponentInstance Objects 1878 """ 1879 if asset_version_id: 1880 query = """ 1881query GetSoftwareComponentInstances_SDK( 1882 $filter: SoftwareComponentInstanceFilter 1883 $after: String 1884 $first: Int 1885) { 1886 allSoftwareComponentInstances( 1887 filter: $filter 1888 after: $after 1889 first: $first 1890 ) { 1891 _cursor 1892 id 1893 name 1894 version 1895 originalComponents { 1896 id 1897 name 1898 version 1899 } 1900 } 1901} 1902""" 1903 else: 1904 # gets the asset version info that contains the software component 1905 query = """ 1906query GetSoftwareComponentInstances_SDK( 1907 $filter: SoftwareComponentInstanceFilter 1908 $after: String 1909 $first: Int 1910) { 1911 allSoftwareComponentInstances( 1912 filter: $filter 1913 after: $after 1914 first: $first 1915 ) { 1916 _cursor 1917 id 1918 name 1919 version 1920 assetVersion { 1921 id 1922 name 1923 asset { 1924 id 1925 name 1926 } 1927 } 1928 } 1929} 1930""" 1931 1932 variables = { 1933 "filter": { 1934 "mergedComponentRefId": None 1935 }, 1936 "after": None, 1937 "first": 100 1938 } 1939 1940 if asset_version_id: 1941 variables["filter"]["assetVersionRefId"] = asset_version_id 1942 1943 if search_method == 'EXACT': 1944 if case_sensitive: 1945 variables["filter"]["name"] = name 1946 else: 1947 variables["filter"]["name_like"] = name 1948 elif search_method == 'CONTAINS': 1949 variables["filter"]["name_contains"] = name 1950 1951 if version: 1952 if search_method == 'EXACT': 1953 variables["filter"]["version"] = version 1954 elif search_method == 'CONTAINS': 1955 variables["filter"]["version_contains"] = version 1956 1957 records = get_all_paginated_results(token, organization_context, query, variables=variables, 1958 field="allSoftwareComponentInstances") 1959 1960 return records
Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- name (str, required): Name of the software component to search for.
- version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
- asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
- search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
- case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
- ValueError: Raised if name is not provided.
- Exception: Raised if the query fails.
Returns:
list: List of SoftwareComponentInstance Objects
1963def send_graphql_query(token, organization_context, query, variables=None): 1964 """ 1965 Send a GraphQL query to the API 1966 1967 Args: 1968 token (str): 1969 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method. 1970 organization_context (str): 1971 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 1972 query (str): 1973 The GraphQL query string 1974 variables (dict, optional): 1975 Variables to be used in the GraphQL query, by default None 1976 1977 Raises: 1978 Exception: If the response status code is not 200 1979 1980 Returns: 1981 dict: Response JSON 1982 """ 1983 headers = { 1984 'Content-Type': 'application/json', 1985 'Authorization': f'Bearer {token}', 1986 'Organization-Context': organization_context 1987 } 1988 data = { 1989 'query': query, 1990 'variables': variables 1991 } 1992 1993 response = requests.post(API_URL, headers=headers, json=data) 1994 1995 if response.status_code == 200: 1996 thejson = response.json() 1997 1998 if "errors" in thejson: 1999 raise Exception(f"Error: {thejson['errors']}") 2000 2001 return thejson 2002 else: 2003 raise Exception(f"Error: {response.status_code} - {response.text}")
Send a GraphQL query to the API
Arguments:
- token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
- organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
- query (str): The GraphQL query string
- variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
- Exception: If the response status code is not 200
Returns:
dict: Response JSON
2006def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, 2007 justification=None, response=None, comment=None): 2008 """ 2009 Updates the status of a findings or multiple findings. This is a blocking call. 2010 2011 Args: 2012 token (str): 2013 Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string. 2014 organization_context (str): 2015 Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx". 2016 user_id (str, required): 2017 User ID to update the finding status for. 2018 finding_ids (str, required): 2019 Finding ID to update the status for. 2020 status (str, required): 2021 Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option 2022 justification (str, optional): 2023 Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum 2024 response (str, optional): 2025 Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum 2026 comment (str, optional): 2027 Optional comment to add to the finding status update. 2028 2029 Raises: 2030 ValueError: Raised if required parameters are not provided. 2031 Exception: Raised if the query fails. 2032 2033 Returns: 2034 dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response 2035 """ 2036 if not user_id: 2037 raise ValueError("User Id is required") 2038 if not finding_ids: 2039 raise ValueError("Finding Ids is required") 2040 if not status: 2041 raise ValueError("Status is required") 2042 2043 mutation = queries.UPDATE_FINDING_STATUSES['mutation'] 2044 variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, 2045 justification=justification, response=response, 2046 comment=comment) 2047 2048 return send_graphql_query(token, organization_context