finite_state_sdk

   1import json
   2from enum import Enum
   3
   4import requests
   5import time
   6from warnings import warn
   7import finite_state_sdk.queries as queries
   8from tenacity import retry, retry_if_exception, stop_after_attempt, wait_fixed
   9from finite_state_sdk.utils import (
  10    BreakoutException,
  11    is_mutation,
  12    is_not_breakout_exception,
  13)
  14
  15API_URL = 'https://platform.finitestate.io/api/v1/graphql'
  16AUDIENCE = "https://platform.finitestate.io/api/v1/graphql"
  17TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token"
  18
  19"""
  20DEFAULT CHUNK SIZE: 1000 MiB
  21"""
  22DEFAULT_CHUNK_SIZE = 1024**2 * 1000
  23"""
  24MAX CHUNK SIZE: 2 GiB
  25"""
  26MAX_CHUNK_SIZE = 1024**2 * 2000
  27"""
  28MIN CHUNK SIZE: 5 MiB
  29"""
  30MIN_CHUNK_SIZE = 1024**2 * 5
  31
  32
  33class UploadMethod(Enum):
  34    """
  35    Enumeration class representing different upload methods.
  36
  37    Attributes:
  38        WEB_APP_UI: Upload method via web application UI.
  39        API: Upload method via API.
  40        GITHUB_INTEGRATION: Upload method via GitHub integration.
  41        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
  42
  43    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
  44    """
  45    WEB_APP_UI = "WEB_APP_UI"
  46    API = "API"
  47    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
  48    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
  49
  50
  51def create_artifact(
  52    token,
  53    organization_context,
  54    business_unit_id=None,
  55    created_by_user_id=None,
  56    asset_version_id=None,
  57    artifact_name=None,
  58    product_id=None,
  59):
  60    """
  61    Create a new Artifact.
  62    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
  63    Please see the examples in the Github repository for more information:
  64    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
  65    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
  66
  67    Args:
  68        token (str):
  69            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  70        organization_context (str):
  71            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  72        business_unit_id (str, required):
  73            Business Unit ID to associate the artifact with.
  74        created_by_user_id (str, required):
  75            User ID of the user creating the artifact.
  76        asset_version_id (str, required):
  77            Asset Version ID to associate the artifact with.
  78        artifact_name (str, required):
  79            The name of the Artifact being created.
  80        product_id (str, optional):
  81            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
  82
  83    Raises:
  84        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  85        Exception: Raised if the query fails.
  86
  87    Returns:
  88        dict: createArtifact Object
  89    """
  90    if not business_unit_id:
  91        raise ValueError("Business unit ID is required")
  92    if not created_by_user_id:
  93        raise ValueError("Created by user ID is required")
  94    if not asset_version_id:
  95        raise ValueError("Asset version ID is required")
  96    if not artifact_name:
  97        raise ValueError("Artifact name is required")
  98
  99    graphql_query = """
 100    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
 101        createArtifact(input: $input) {
 102            id
 103            name
 104            assetVersion {
 105                id
 106                name
 107                asset {
 108                    id
 109                    name
 110                }
 111            }
 112            createdBy {
 113                id
 114                email
 115            }
 116            ctx {
 117                asset
 118                products
 119                businessUnits
 120            }
 121        }
 122    }
 123    """
 124
 125    # Asset name, business unit context, and creating user are required
 126    variables = {
 127        "input": {
 128            "name": artifact_name,
 129            "createdBy": created_by_user_id,
 130            "assetVersion": asset_version_id,
 131            "ctx": {
 132                "asset": asset_version_id,
 133                "businessUnits": [business_unit_id]
 134            }
 135        }
 136    }
 137
 138    if product_id is not None:
 139        variables["input"]["ctx"]["products"] = product_id
 140
 141    response = send_graphql_query(token, organization_context, graphql_query, variables)
 142    return response['data']
 143
 144
 145def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
 146    """
 147    Create a new Asset.
 148
 149    Args:
 150        token (str):
 151            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 152        organization_context (str):
 153            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 154        business_unit_id (str, required):
 155            Business Unit ID to associate the asset with.
 156        created_by_user_id (str, required):
 157            User ID of the user creating the asset.
 158        asset_name (str, required):
 159            The name of the Asset being created.
 160        product_id (str, optional):
 161            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
 162
 163    Raises:
 164        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
 165        Exception: Raised if the query fails.
 166
 167    Returns:
 168        dict: createAsset Object
 169    """
 170    if not business_unit_id:
 171        raise ValueError("Business unit ID is required")
 172    if not created_by_user_id:
 173        raise ValueError("Created by user ID is required")
 174    if not asset_name:
 175        raise ValueError("Asset name is required")
 176
 177    graphql_query = """
 178    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
 179        createAsset(input: $input) {
 180            id
 181            name
 182            dependentProducts {
 183                id
 184                name
 185            }
 186            group {
 187                id
 188                name
 189            }
 190            createdBy {
 191                id
 192                email
 193            }
 194            ctx {
 195                asset
 196                products
 197                businessUnits
 198            }
 199        }
 200    }
 201    """
 202
 203    # Asset name, business unit context, and creating user are required
 204    variables = {
 205        "input": {
 206            "name": asset_name,
 207            "group": business_unit_id,
 208            "createdBy": created_by_user_id,
 209            "ctx": {
 210                "businessUnits": [business_unit_id]
 211            }
 212        }
 213    }
 214
 215    if product_id is not None:
 216        variables["input"]["ctx"]["products"] = product_id
 217
 218    response = send_graphql_query(token, organization_context, graphql_query, variables)
 219    return response['data']
 220
 221
 222def create_asset_version(
 223    token,
 224    organization_context,
 225    business_unit_id=None,
 226    created_by_user_id=None,
 227    asset_id=None,
 228    asset_version_name=None,
 229    product_id=None,
 230):
 231    """
 232    Create a new Asset Version.
 233
 234    Args:
 235        token (str):
 236            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 237        organization_context (str):
 238            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 239        business_unit_id (str, required):
 240            Business Unit ID to associate the asset version with.
 241        created_by_user_id (str, required):
 242            User ID of the user creating the asset version.
 243        asset_id (str, required):
 244            Asset ID to associate the asset version with.
 245        asset_version_name (str, required):
 246            The name of the Asset Version being created.
 247        product_id (str, optional):
 248            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
 249
 250    Raises:
 251        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 252        Exception: Raised if the query fails.
 253
 254    Returns:
 255        dict: createAssetVersion Object
 256
 257    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
 258    """
 259    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
 260    if not business_unit_id:
 261        raise ValueError("Business unit ID is required")
 262    if not created_by_user_id:
 263        raise ValueError("Created by user ID is required")
 264    if not asset_id:
 265        raise ValueError("Asset ID is required")
 266    if not asset_version_name:
 267        raise ValueError("Asset version name is required")
 268
 269    graphql_query = """
 270    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
 271        createAssetVersion(input: $input) {
 272            id
 273            name
 274            asset {
 275                id
 276                name
 277            }
 278            createdBy {
 279                id
 280                email
 281            }
 282            ctx {
 283                asset
 284                products
 285                businessUnits
 286            }
 287        }
 288    }
 289    """
 290
 291    # Asset name, business unit context, and creating user are required
 292    variables = {
 293        "input": {
 294            "name": asset_version_name,
 295            "createdBy": created_by_user_id,
 296            "asset": asset_id,
 297            "ctx": {
 298                "asset": asset_id,
 299                "businessUnits": [business_unit_id]
 300            }
 301        }
 302    }
 303
 304    if product_id is not None:
 305        variables["input"]["ctx"]["products"] = product_id
 306
 307    response = send_graphql_query(token, organization_context, graphql_query, variables)
 308    return response['data']
 309
 310
 311def create_asset_version_on_asset(
 312    token,
 313    organization_context,
 314    created_by_user_id=None,
 315    asset_id=None,
 316    asset_version_name=None,
 317    product_id=None,
 318):
 319    """
 320    Create a new Asset Version.
 321
 322    Args:
 323        token (str):
 324            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 325        organization_context (str):
 326            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 327        created_by_user_id (str, optional):
 328            User ID of the user creating the asset version.
 329        asset_id (str, required):
 330            Asset ID to associate the asset version with.
 331        asset_version_name (str, required):
 332            The name of the Asset Version being created.
 333
 334    Raises:
 335        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 336        Exception: Raised if the query fails.
 337
 338    Returns:
 339        dict: createAssetVersion Object
 340    """
 341    if not asset_id:
 342        raise ValueError("Asset ID is required")
 343    if not asset_version_name:
 344        raise ValueError("Asset version name is required")
 345
 346    graphql_query = """
 347        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) {
 348            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) {
 349                id
 350                assetVersion {
 351                    id
 352                }
 353            }
 354        }
 355    """
 356
 357    # Asset name, business unit context, and creating user are required
 358    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
 359
 360    if created_by_user_id:
 361        variables["createdByUserId"] = created_by_user_id
 362
 363    if product_id:
 364        variables["productId"] = product_id
 365
 366    response = send_graphql_query(token, organization_context, graphql_query, variables)
 367    return response['data']
 368
 369
 370def create_new_asset_version_artifact_and_test_for_upload(
 371    token,
 372    organization_context,
 373    business_unit_id=None,
 374    created_by_user_id=None,
 375    asset_id=None,
 376    version=None,
 377    product_id=None,
 378    test_type=None,
 379    artifact_description=None,
 380    upload_method: UploadMethod = UploadMethod.API,
 381):
 382    """
 383    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
 384    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
 385
 386    Args:
 387        token (str):
 388            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 389        organization_context (str):
 390            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 391        business_unit_id (str, optional):
 392            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
 393        created_by_user_id (str, optional):
 394            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
 395        asset_id (str, required):
 396            Asset ID to create the asset version for. If not provided, the default asset will be used.
 397        version (str, required):
 398            Version to create the asset version for.
 399        product_id (str, optional):
 400            Product ID to create the entities for. If not provided, the default product will be used.
 401        test_type (str, required):
 402            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
 403        artifact_description (str, optional):
 404            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
 405        upload_method (UploadMethod, optional):
 406            The method of uploading the test results. Default is UploadMethod.API.
 407
 408
 409    Raises:
 410        ValueError: Raised if asset_id or version are not provided.
 411        Exception: Raised if the query fails.
 412
 413    Returns:
 414        str: The Test ID of the newly created test that is used for uploading the file.
 415    """
 416    if not asset_id:
 417        raise ValueError("Asset ID is required")
 418    if not version:
 419        raise ValueError("Version is required")
 420
 421    assets = get_all_assets(token, organization_context, asset_id=asset_id)
 422    if not assets:
 423        raise ValueError("No assets found with the provided asset ID")
 424    asset = assets[0]
 425
 426    # get the asset name
 427    asset_name = asset['name']
 428
 429    # get the existing asset product IDs
 430    asset_product_ids = asset['ctx']['products']
 431
 432    # get the asset product ID
 433    if product_id and product_id not in asset_product_ids:
 434        asset_product_ids.append(product_id)
 435
 436    # if business_unit_id or created_by_user_id are not provided, get the existing asset
 437    if not business_unit_id or not created_by_user_id:
 438        if not business_unit_id:
 439            business_unit_id = asset['group']['id']
 440        if not created_by_user_id:
 441            created_by_user_id = asset['createdBy']['id']
 442
 443        if not business_unit_id:
 444            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
 445        if not created_by_user_id:
 446            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
 447
 448    # create the asset version
 449    response = create_asset_version_on_asset(
 450        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id
 451    )
 452    # get the asset version ID
 453    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
 454
 455    # create the test
 456    if test_type == "finite_state_binary_analysis":
 457        # create the artifact
 458        if not artifact_description:
 459            artifact_description = "Binary"
 460        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
 461        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 462                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 463                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
 464
 465        # get the artifact ID
 466        binary_artifact_id = response['createArtifact']['id']
 467
 468        # create the test
 469        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
 470        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
 471                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
 472                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
 473                                                  test_name=test_name, upload_method=upload_method)
 474        test_id = response['createTest']['id']
 475        return test_id
 476
 477    else:
 478        # create the artifact
 479        if not artifact_description:
 480            artifact_description = "Unspecified Artifact"
 481        artifact_name = f"{asset_name} {version} - {artifact_description}"
 482        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 483                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 484                                   artifact_name=artifact_name, product_id=asset_product_ids)
 485
 486        # get the artifact ID
 487        binary_artifact_id = response['createArtifact']['id']
 488
 489        # create the test
 490        test_name = f"{asset_name} {version} - {test_type}"
 491        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
 492                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
 493                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
 494                                                      test_name=test_name, test_type=test_type,
 495                                                      upload_method=upload_method)
 496        test_id = response['createTest']['id']
 497        return test_id
 498
 499
 500def create_new_asset_version_and_upload_binary(
 501    token,
 502    organization_context,
 503    business_unit_id=None,
 504    created_by_user_id=None,
 505    asset_id=None,
 506    version=None,
 507    file_path=None,
 508    product_id=None,
 509    artifact_description=None,
 510    quick_scan=False,
 511    upload_method: UploadMethod = UploadMethod.API,
 512    enable_bandit_scan: bool = False,
 513):
 514    """
 515    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
 516    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 517
 518    Args:
 519        token (str):
 520            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 521        organization_context (str):
 522            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 523        business_unit_id (str, optional):
 524            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 525        created_by_user_id (str, optional):
 526            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 527        asset_id (str, required):
 528            Asset ID to create the asset version for.
 529        version (str, required):
 530            Version to create the asset version for.
 531        file_path (str, required):
 532            Local path to the file to upload.
 533        product_id (str, optional):
 534            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
 535        artifact_description (str, optional):
 536            Description of the artifact. If not provided, the default is "Firmware Binary".
 537        quick_scan (bool, optional):
 538            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
 539        upload_method (UploadMethod, optional):
 540            The method of uploading the test results. Default is UploadMethod.API.
 541        enable_bandit_scan (bool, optional):
 542            If True, will create an additional bandit scan in addition to the default binary analysis scan.
 543
 544    Raises:
 545        ValueError: Raised if asset_id, version, or file_path are not provided.
 546        Exception: Raised if any of the queries fail.
 547
 548    Returns:
 549        dict: The response from the GraphQL query, a createAssetVersion Object.
 550    """
 551    if not asset_id:
 552        raise ValueError("Asset ID is required")
 553    if not version:
 554        raise ValueError("Version is required")
 555    if not file_path:
 556        raise ValueError("File path is required")
 557
 558    # create the asset version and binary test
 559    if not artifact_description:
 560        artifact_description = "Firmware Binary"
 561    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
 562        token,
 563        organization_context,
 564        business_unit_id=business_unit_id,
 565        created_by_user_id=created_by_user_id,
 566        asset_id=asset_id,
 567        version=version,
 568        product_id=product_id,
 569        test_type="finite_state_binary_analysis",
 570        artifact_description=artifact_description,
 571        upload_method=upload_method,
 572    )
 573
 574    # upload file for binary test
 575    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
 576                                               quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan)
 577    return response
 578
 579
 580def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
 581                                                     created_by_user_id=None, asset_id=None, version=None,
 582                                                     file_path=None, product_id=None, test_type=None,
 583                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
 584    """
 585    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
 586    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 587
 588    Args:
 589        token (str):
 590            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 591        organization_context (str):
 592            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 593        business_unit_id (str, optional):
 594            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 595        created_by_user_id (str, optional):
 596            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 597        asset_id (str, required):
 598            Asset ID to create the asset version for.
 599        version (str, required):
 600            Version to create the asset version for.
 601        file_path (str, required):
 602            Path to the test results file to upload.
 603        product_id (str, optional):
 604            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
 605        test_type (str, required):
 606            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
 607        artifact_description (str, optional):
 608            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
 609        upload_method (UploadMethod, optional):
 610            The method of uploading the test results. Default is UploadMethod.API.
 611
 612    Raises:
 613        ValueError: If the asset_id, version, or file_path are not provided.
 614        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
 615
 616    Returns:
 617        dict: The response from the GraphQL query, a createAssetVersion Object.
 618    """
 619    if not asset_id:
 620        raise ValueError("Asset ID is required")
 621    if not version:
 622        raise ValueError("Version is required")
 623    if not file_path:
 624        raise ValueError("File path is required")
 625    if not test_type:
 626        raise ValueError("Test type is required")
 627
 628    # create the asset version and test
 629    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
 630                                                                    business_unit_id=business_unit_id,
 631                                                                    created_by_user_id=created_by_user_id,
 632                                                                    asset_id=asset_id, version=version,
 633                                                                    product_id=product_id, test_type=test_type,
 634                                                                    artifact_description=artifact_description,
 635                                                                    upload_method=upload_method)
 636
 637    # upload test results file
 638    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
 639    return response
 640
 641
 642def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
 643                   product_description=None, vendor_id=None, vendor_name=None):
 644    """
 645    Create a new Product.
 646
 647    Args:
 648        token (str):
 649            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 650        organization_context (str):
 651            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 652        business_unit_id (str, required):
 653            Business Unit ID to associate the product with.
 654        created_by_user_id (str, required):
 655            User ID of the user creating the product.
 656        product_name (str, required):
 657            The name of the Product being created.
 658        product_description (str, optional):
 659            The description of the Product being created.
 660        vendor_id (str, optional):
 661            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
 662        vendor_name (str, optional):
 663            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
 664
 665    Raises:
 666        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
 667        Exception: Raised if the query fails.
 668
 669    Returns:
 670        dict: createProduct Object
 671    """
 672
 673    if not business_unit_id:
 674        raise ValueError("Business unit ID is required")
 675    if not created_by_user_id:
 676        raise ValueError("Created by user ID is required")
 677    if not product_name:
 678        raise ValueError("Product name is required")
 679
 680    graphql_query = """
 681    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
 682        createProduct(input: $input) {
 683            id
 684            name
 685            vendor {
 686                name
 687            }
 688            group {
 689                id
 690                name
 691            }
 692            createdBy {
 693                id
 694                email
 695            }
 696            ctx {
 697                businessUnit
 698            }
 699        }
 700    }
 701    """
 702
 703    # Product name, business unit context, and creating user are required
 704    variables = {
 705        "input": {
 706            "name": product_name,
 707            "group": business_unit_id,
 708            "createdBy": created_by_user_id,
 709            "ctx": {
 710                "businessUnit": business_unit_id
 711            }
 712        }
 713    }
 714
 715    if product_description is not None:
 716        variables["input"]["description"] = product_description
 717
 718    # If the vendor ID is specified, this will link the new product to the existing vendor
 719    if vendor_id is not None:
 720        variables["input"]["vendor"] = {
 721            "id": vendor_id
 722        }
 723
 724    # If the vendor name is specified, this will create a new vendor and link it to the new product
 725    if vendor_name is not None:
 726        variables["input"]["createVendor"] = {
 727            "name": vendor_name
 728        }
 729
 730    response = send_graphql_query(token, organization_context, graphql_query, variables)
 731
 732    return response['data']
 733
 734
 735def create_test(
 736    token,
 737    organization_context,
 738    business_unit_id=None,
 739    created_by_user_id=None,
 740    asset_id=None,
 741    artifact_id=None,
 742    test_name=None,
 743    product_id=None,
 744    test_type=None,
 745    tools=[],
 746    upload_method: UploadMethod = UploadMethod.API,
 747):
 748    """
 749    Create a new Test object for uploading files.
 750    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 751    Please see the examples in the Github repository for more information:
 752    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 753    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 754
 755    Args:
 756        token (str):
 757            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 758        organization_context (str):
 759            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 760        business_unit_id (str, required):
 761            Business Unit ID to associate the Test with.
 762        created_by_user_id (str, required):
 763            User ID of the user creating the Test.
 764        asset_id (str, required):
 765            Asset ID to associate the Test with.
 766        artifact_id (str, required):
 767            Artifact ID to associate the Test with.
 768        test_name (str, required):
 769            The name of the Test being created.
 770        product_id (str, optional):
 771            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 772        test_type (str, required):
 773            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
 774        tools (list, optional):
 775            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
 776        upload_method (UploadMethod, required):
 777            The method of uploading the test results.
 778
 779    Raises:
 780        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
 781        Exception: Raised if the query fails.
 782
 783    Returns:
 784        dict: createTest Object
 785    """
 786    if not business_unit_id:
 787        raise ValueError("Business unit ID is required")
 788    if not created_by_user_id:
 789        raise ValueError("Created by user ID is required")
 790    if not asset_id:
 791        raise ValueError("Asset ID is required")
 792    if not artifact_id:
 793        raise ValueError("Artifact ID is required")
 794    if not test_name:
 795        raise ValueError("Test name is required")
 796    if not test_type:
 797        raise ValueError("Test type is required")
 798
 799    graphql_query = """
 800    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
 801        createTest(input: $input) {
 802            id
 803            name
 804            artifactUnderTest {
 805                id
 806                name
 807                assetVersion {
 808                    id
 809                    name
 810                    asset {
 811                        id
 812                        name
 813                        dependentProducts {
 814                            id
 815                            name
 816                        }
 817                    }
 818                }
 819            }
 820            createdBy {
 821                id
 822                email
 823            }
 824            ctx {
 825                asset
 826                products
 827                businessUnits
 828            }
 829            uploadMethod
 830        }
 831    }
 832    """
 833
 834    # Asset name, business unit context, and creating user are required
 835    variables = {
 836        "input": {
 837            "name": test_name,
 838            "createdBy": created_by_user_id,
 839            "artifactUnderTest": artifact_id,
 840            "testResultFileFormat": test_type,
 841            "ctx": {
 842                "asset": asset_id,
 843                "businessUnits": [business_unit_id]
 844            },
 845            "tools": tools,
 846            "uploadMethod": upload_method.value
 847        }
 848    }
 849
 850    if product_id is not None:
 851        variables["input"]["ctx"]["products"] = product_id
 852
 853    response = send_graphql_query(token, organization_context, graphql_query, variables)
 854    return response['data']
 855
 856
 857def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
 858                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
 859                                   upload_method: UploadMethod = UploadMethod.API):
 860    """
 861    Create a new Test object for uploading files for Finite State Binary Analysis.
 862
 863    Args:
 864        token (str):
 865            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 866        organization_context (str):
 867            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 868        business_unit_id (str, required):
 869            Business Unit ID to associate the Test with.
 870        created_by_user_id (str, required):
 871            User ID of the user creating the Test.
 872        asset_id (str, required):
 873            Asset ID to associate the Test with.
 874        artifact_id (str, required):
 875            Artifact ID to associate the Test with.
 876        test_name (str, required):
 877            The name of the Test being created.
 878        product_id (str, optional):
 879            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 880        upload_method (UploadMethod, optional):
 881            The method of uploading the test results. Default is UploadMethod.API.
 882
 883    Raises:
 884        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 885        Exception: Raised if the query fails.
 886
 887    Returns:
 888        dict: createTest Object
 889    """
 890    tools = [
 891        {
 892            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
 893            "name": "Finite State Binary Analysis"
 894        }
 895    ]
 896    return create_test(token, organization_context, business_unit_id=business_unit_id,
 897                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 898                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
 899                       tools=tools, upload_method=upload_method)
 900
 901
 902def create_test_as_cyclone_dx(
 903    token,
 904    organization_context,
 905    business_unit_id=None,
 906    created_by_user_id=None,
 907    asset_id=None,
 908    artifact_id=None,
 909    test_name=None,
 910    product_id=None,
 911    upload_method: UploadMethod = UploadMethod.API,
 912):
 913    """
 914    Create a new Test object for uploading CycloneDX files.
 915
 916    Args:
 917        token (str):
 918            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 919        organization_context (str):
 920            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 921        business_unit_id (str, required):
 922            Business Unit ID to associate the Test with.
 923        created_by_user_id (str, required):
 924            User ID of the user creating the Test.
 925        asset_id (str, required):
 926            Asset ID to associate the Test with.
 927        artifact_id (str, required):
 928            Artifact ID to associate the Test with.
 929        test_name (str, required):
 930            The name of the Test being created.
 931        product_id (str, optional):
 932            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 933        upload_method (UploadMethod, optional):
 934            The method of uploading the test results. Default is UploadMethod.API.
 935
 936    Raises:
 937        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 938        Exception: Raised if the query fails.
 939
 940    Returns:
 941        dict: createTest Object
 942    """
 943    return create_test(token, organization_context, business_unit_id=business_unit_id,
 944                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 945                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
 946
 947
 948def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
 949                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
 950                                       upload_method: UploadMethod = UploadMethod.API):
 951    """
 952    Create a new Test object for uploading Third Party Scanner files.
 953
 954    Args:
 955        token (str):
 956            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 957        organization_context (str):
 958            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 959        business_unit_id (str, required):
 960            Business Unit ID to associate the Test with.
 961        created_by_user_id (str, required):
 962            User ID of the user creating the Test.
 963        asset_id (str, required):
 964            Asset ID to associate the Test with.
 965        artifact_id (str, required):
 966            Artifact ID to associate the Test with.
 967        test_name (str, required):
 968            The name of the Test being created.
 969        product_id (str, optional):
 970            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 971        test_type (str, required):
 972            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
 973        upload_method (UploadMethod, optional):
 974            The method of uploading the test results. Default is UploadMethod.API.
 975
 976    Raises:
 977        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 978        Exception: Raised if the query fails.
 979
 980    Returns:
 981        dict: createTest Object
 982    """
 983    return create_test(token, organization_context, business_unit_id=business_unit_id,
 984                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 985                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
 986
 987
 988def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 989                                  report_subtype=None, output_filename=None, verbose=False):
 990    """
 991    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 992
 993    Args:
 994        token (str):
 995            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 996        organization_context (str):
 997            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 998        asset_version_id (str, required):
 999            The Asset Version ID to download the report for.
1000        report_type (str, required):
1001            The file type of the report to download. Valid values are "CSV" and "PDF".
1002        report_subtype (str, required):
1003            The type of report to download. Based on available reports for the `report_type` specified
1004            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1005            Valid values for PDF are "RISK_SUMMARY".
1006        output_filename (str, optional):
1007            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1008        verbose (bool, optional):
1009            If True, will print additional information to the console. Defaults to False.
1010
1011    Raises:
1012        ValueError: Raised if required parameters are not provided.
1013        Exception: Raised if the query fails.
1014
1015    Returns:
1016        None
1017    """
1018    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1019                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1020
1021    # Send an HTTP GET request to the URL
1022    response = requests.get(url)
1023
1024    # Check if the request was successful (status code 200)
1025    if response.status_code == 200:
1026        # Open a local file in binary write mode and write the content to it
1027        if verbose:
1028            print("File downloaded successfully.")
1029        with open(output_filename, 'wb') as file:
1030            file.write(response.content)
1031            if verbose:
1032                print(f'Wrote file to {output_filename}')
1033    else:
1034        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1035
1036
1037def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1038                            output_filename=None, verbose=False):
1039    """
1040    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1041
1042    Args:
1043        token (str):
1044            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1045        organization_context (str):
1046            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1047        product_id (str, required):
1048            The Product ID to download the report for.
1049        report_type (str, required):
1050            The file type of the report to download. Valid values are "CSV".
1051        report_subtype (str, required):
1052            The type of report to download. Based on available reports for the `report_type` specified
1053            Valid values for CSV are "ALL_FINDINGS".
1054        output_filename (str, optional):
1055            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1056        verbose (bool, optional):
1057            If True, will print additional information to the console. Defaults to False.
1058    """
1059    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1060                                       report_subtype=report_subtype, verbose=verbose)
1061
1062    # Send an HTTP GET request to the URL
1063    response = requests.get(url)
1064
1065    # Check if the request was successful (status code 200)
1066    if response.status_code == 200:
1067        # Open a local file in binary write mode and write the content to it
1068        if verbose:
1069            print("File downloaded successfully.")
1070        with open(output_filename, 'wb') as file:
1071            file.write(response.content)
1072            if verbose:
1073                print(f'Wrote file to {output_filename}')
1074    else:
1075        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1076
1077
1078def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1079                  output_filename="sbom.json", verbose=False):
1080    """
1081    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1082
1083    Args:
1084        token (str):
1085            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1086        organization_context (str):
1087            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1088        sbom_type (str, required):
1089            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1090        sbom_subtype (str, required):
1091            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1092        asset_version_id (str, required):
1093            The Asset Version ID to download the SBOM for.
1094        output_filename (str, required):
1095            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1096        verbose (bool, optional):
1097            If True, will print additional information to the console. Defaults to False.
1098
1099    Raises:
1100        ValueError: Raised if required parameters are not provided.
1101        Exception: Raised if the query fails.
1102
1103    Returns:
1104        None
1105    """
1106    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1107                                     asset_version_id=asset_version_id, verbose=verbose)
1108
1109    # Send an HTTP GET request to the URL
1110    response = requests.get(url)
1111
1112    # Check if the request was successful (status code 200)
1113    if response.status_code == 200:
1114        # Open a local file in binary write mode and write the content to it
1115        if verbose:
1116            print("File downloaded successfully.")
1117        with open(output_filename, 'wb') as file:
1118            file.write(response.content)
1119            if verbose:
1120                print(f'Wrote file to {output_filename}')
1121    else:
1122        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1123
1124
1125def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1126    """
1127    Helper method to read a file in chunks.
1128
1129    Args:
1130        file_path (str):
1131            Local path to the file to read.
1132        chunk_size (int, optional):
1133            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1134
1135    Yields:
1136        bytes: The next chunk of the file.
1137
1138    Raises:
1139        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1140    """
1141    with open(file_path, 'rb') as f:
1142        while True:
1143            chunk = f.read(chunk_size)
1144            if chunk:
1145                yield chunk
1146            else:
1147                break
1148
1149
1150def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1151    """
1152    Get all artifacts in the organization. Uses pagination to get all results.
1153
1154    Args:
1155        token (str):
1156            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1157        organization_context (str):
1158            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1159        artifact_id (str, optional):
1160            An optional Artifact ID if this is used to get a single artifact, by default None
1161        business_unit_id (str, optional):
1162            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1163
1164    Raises:
1165        Exception: Raised if the query fails.
1166
1167    Returns:
1168        list: List of Artifact Objects
1169    """
1170    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1171                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
1172
1173
1174def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1175    """
1176    Gets all assets in the organization. Uses pagination to get all results.
1177
1178    Args:
1179        token (str):
1180            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1181        organization_context (str):
1182            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1183        asset_id (str, optional):
1184            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1185        business_unit_id (str, optional):
1186            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1187
1188    Raises:
1189        Exception: Raised if the query fails.
1190
1191    Returns:
1192        list: List of Asset Objects
1193    """
1194    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1195                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1196
1197
1198def get_all_asset_versions(token, organization_context):
1199    """
1200    Get all asset versions in the organization. Uses pagination to get all results.
1201
1202    Args:
1203        token (str):
1204            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1205        organization_context (str):
1206            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1207
1208    Raises:
1209        Exception: Raised if the query fails.
1210
1211    Returns:
1212        list: List of AssetVersion Objects
1213    """
1214    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1215                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
1216
1217
1218def get_all_asset_versions_for_product(token, organization_context, product_id):
1219    """
1220    Get all asset versions for a product. Uses pagination to get all results.
1221
1222    Args:
1223        token (str):
1224            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1225        organization_context (str):
1226            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1227        product_id (str):
1228            The Product ID to get asset versions for
1229
1230    Returns:
1231        list: List of AssetVersion Objects
1232    """
1233    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1234                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1235
1236
1237def get_all_business_units(token, organization_context):
1238    """
1239    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1240
1241    Args:
1242        token (str):
1243            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1244        organization_context (str):
1245            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1246
1247    Raises:
1248        Exception: Raised if the query fails.
1249
1250    Returns:
1251        list: List of Group Objects
1252    """
1253    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1254                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
1255
1256
1257def get_all_organizations(token, organization_context):
1258    """
1259    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1260
1261    Args:
1262        token (str):
1263            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1264        organization_context (str):
1265            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1266
1267    Raises:
1268        Exception: Raised if the query fails.
1269
1270    Returns:
1271        list: List of Organization Objects
1272    """
1273    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1274                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
1275
1276
1277def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1278    """
1279    Get all results from a paginated GraphQL query
1280
1281    Args:
1282        token (str):
1283            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1284        organization_context (str):
1285            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1286        query (str):
1287            The GraphQL query string
1288        variables (dict, optional):
1289            Variables to be used in the GraphQL query, by default None
1290        field (str, required):
1291            The field in the response JSON that contains the results
1292        limit (int, Optional):
1293            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1294
1295    Raises:
1296        Exception: If the response status code is not 200, or if the field is not in the response JSON
1297
1298    Returns:
1299        list: List of results
1300    """
1301
1302    if not field:
1303        raise Exception("Error: field is required")
1304    if limit and limit > 1000:
1305        raise Exception("Error: limit cannot be greater than 1000")
1306    if limit and limit < 1:
1307        raise Exception("Error: limit cannot be less than 1")
1308    if not variables["first"]:
1309        raise Exception("Error: first is required")
1310    if variables["first"] < 1:
1311        raise Exception("Error: first cannot be less than 1")
1312    if variables["first"] > 1000:
1313        raise Exception("Error: limit cannot be greater than 1000")
1314
1315    # query the API for the first page of results
1316    response_data = send_graphql_query(token, organization_context, query, variables)
1317
1318    # if there are no results, return an empty list
1319    if not response_data:
1320        return []
1321
1322    # create a list to store the results
1323    results = []
1324
1325    # add the first page of results to the list
1326    if field in response_data['data']:
1327        results.extend(response_data['data'][field])
1328    else:
1329        raise Exception(f"Error: {field} not in response JSON")
1330
1331    if len(response_data['data'][field]) > 0:
1332        # get the cursor from the last entry in the list
1333        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1334
1335        while cursor:
1336            if limit and len(results) == limit:
1337                break
1338
1339            variables['after'] = cursor
1340
1341            # add the next page of results to the list
1342            response_data = send_graphql_query(token, organization_context, query, variables)
1343            results.extend(response_data['data'][field])
1344
1345            try:
1346                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1347            except IndexError:
1348                # when there is no additional cursor, stop getting more pages
1349                cursor = None
1350
1351    return results
1352
1353
1354def get_all_products(token, organization_context):
1355    """
1356    Get all products in the organization. Uses pagination to get all results.
1357
1358    Args:
1359        token (str):
1360            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1361        organization_context (str):
1362            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1363
1364    Raises:
1365        Exception: Raised if the query fails.
1366
1367    Returns:
1368        list: List of Product Objects
1369
1370    .. deprecated:: 0.1.4. Use get_products instead.
1371    """
1372    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1373    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1374                                     queries.ALL_PRODUCTS['variables'], 'allProducts')
1375
1376
1377def get_all_users(token, organization_context):
1378    """
1379    Get all users in the organization. Uses pagination to get all results.
1380
1381    Args:
1382        token (str):
1383            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1384        organization_context (str):
1385            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1386
1387    Raises:
1388        Exception: Raised if the query fails.
1389
1390    Returns:
1391        list: List of User Objects
1392    """
1393    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1394                                     queries.ALL_USERS['variables'], 'allUsers')
1395
1396
1397def get_artifact_context(token, organization_context, artifact_id):
1398    """
1399    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1400
1401    Args:
1402        token (str):
1403            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1404        organization_context (str):
1405            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1406
1407    Raises:
1408        Exception: Raised if the query fails.
1409
1410    Returns:
1411        dict: Artifact Context Object
1412    """
1413    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1414                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1415
1416    return artifact[0]['ctx']
1417
1418
1419def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1420    """
1421    Gets assets in the organization. Uses pagination to get all results.
1422
1423    Args:
1424        token (str):
1425            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1426        organization_context (str):
1427            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1428        asset_id (str, optional):
1429            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1430        business_unit_id (str, optional):
1431            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1432
1433    Raises:
1434        Exception: Raised if the query fails.
1435
1436    Returns:
1437        list: List of Asset Objects
1438    """
1439    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1440                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1441
1442
1443def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1444    """
1445    Gets asset versions in the organization. Uses pagination to get all results.
1446
1447    Args:
1448        token (str):
1449            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1450        organization_context (str):
1451            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1452        asset_version_id (str, optional):
1453            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1454        asset_id (str, optional):
1455            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1456        business_unit_id (str, optional):
1457            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1458
1459    Raises:
1460        Exception: Raised if the query fails.
1461
1462    Returns:
1463        list: List of AssetVersion Objects
1464    """
1465    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1466                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1467                                                                             asset_id=asset_id,
1468                                                                             business_unit_id=business_unit_id),
1469                                     'allAssetVersions')
1470
1471
1472def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1473    """
1474    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1475
1476    Args:
1477        client_id (str):
1478            CLIENT_ID as specified in the API documentation
1479        client_secret (str):
1480            CLIENT_SECRET as specified in the API documentation
1481        token_url (str, optional):
1482            Token URL, by default TOKEN_URL
1483        audience (str, optional):
1484            Audience, by default AUDIENCE
1485
1486    Raises:
1487        Exception: If the response status code is not 200
1488
1489    Returns:
1490        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1491    """
1492    payload = {
1493        "client_id": client_id,
1494        "client_secret": client_secret,
1495        "audience": AUDIENCE,
1496        "grant_type": "client_credentials"
1497    }
1498
1499    headers = {
1500        'content-type': "application/json"
1501    }
1502
1503    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1504    if response.status_code == 200:
1505        auth_token = response.json()['access_token']
1506    else:
1507        raise Exception(f"Error: {response.status_code} - {response.text}")
1508
1509    return auth_token
1510
1511
1512def get_findings(
1513    token,
1514    organization_context,
1515    asset_version_id=None,
1516    finding_id=None,
1517    category=None,
1518    status=None,
1519    severity=None,
1520    count=False,
1521    limit=None,
1522):
1523    """
1524    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1525    Args:
1526        token (str):
1527            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1528        organization_context (str):
1529            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1530        asset_version_id (str, optional):
1531            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1532        finding_id (str, optional):
1533            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1534        category (str, optional):
1535            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1536            This can be a single string, or an array of values.
1537        status (str, optional):
1538            The status of Findings to return.
1539        severity (str, optional):
1540            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1541        count (bool, optional):
1542            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1543        limit (int, optional):
1544            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1545
1546    Raises:
1547        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1548
1549    Returns:
1550        list: List of Finding Objects
1551    """
1552
1553    if limit and limit > 1000:
1554        raise Exception("Error: limit must be less than 1000")
1555    if limit and limit < 1:
1556        raise Exception("Error: limit must be greater than 0")
1557
1558    if count:
1559        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1560                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1561                                                                          finding_id=finding_id, category=category,
1562                                                                          status=status, severity=severity,
1563                                                                          limit=limit))["data"]["_allFindingsMeta"]
1564    else:
1565        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1566                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1567                                                                           finding_id=finding_id, category=category,
1568                                                                           status=status, severity=severity,
1569                                                                           limit=limit), 'allFindings', limit=limit)
1570
1571
1572def get_product_asset_versions(token, organization_context, product_id=None):
1573    """
1574    Gets all the asset versions for a product.
1575    Args:
1576        token (str):
1577            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1578        organization_context (str):
1579            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1580        product_id (str, optional):
1581            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1582    Raises:
1583        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1584    Returns:
1585        list: List of AssetVersion Objects
1586    """
1587    if not product_id:
1588        raise Exception("Product ID is required")
1589
1590    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1591                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1592
1593
1594def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1595    """
1596    Gets all the products for the specified business unit.
1597    Args:
1598        token (str):
1599            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1600        organization_context (str):
1601            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1602        product_id (str, optional):
1603            Product ID to get. If not provided, will get all products in the organization.
1604        business_unit_id (str, optional):
1605            Business Unit ID to get products for. If not provided, will get all products in the organization.
1606    Raises:
1607        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1608    Returns:
1609        list: List of Product Objects
1610    """
1611
1612    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1613                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1614                                                                       business_unit_id=business_unit_id),
1615                                     'allProducts')
1616
1617
1618def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1619                                 report_subtype=None, verbose=False) -> str:
1620    """
1621    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1622    This may take several minutes to complete, depending on the size of the report.
1623
1624    Args:
1625        token (str):
1626            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1627        organization_context (str):
1628            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1629        asset_version_id (str, optional):
1630            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1631        product_id (str, optional):
1632            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1633        report_type (str, required):
1634            The file type of the report to download. Valid values are "CSV" and "PDF".
1635        report_subtype (str, required):
1636            The type of report to download. Based on available reports for the `report_type` specified
1637            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1638            Valid values for PDF are "RISK_SUMMARY".
1639        verbose (bool, optional):
1640            If True, print additional information to the console. Defaults to False.
1641    """
1642    if not report_type:
1643        raise ValueError("Report Type is required")
1644    if not report_subtype:
1645        raise ValueError("Report Subtype is required")
1646    if not asset_version_id and not product_id:
1647        raise ValueError("Asset Version ID or Product ID is required")
1648
1649    if asset_version_id and product_id:
1650        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1651
1652    if report_type not in ["CSV", "PDF"]:
1653        raise Exception(f"Report Type {report_type} not supported")
1654
1655    if report_type == "CSV":
1656        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1657            raise Exception(f"Report Subtype {report_subtype} not supported")
1658
1659        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1660                                                            report_type=report_type, report_subtype=report_subtype)
1661        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1662                                                              report_type=report_type, report_subtype=report_subtype)
1663
1664        response_data = send_graphql_query(token, organization_context, mutation, variables)
1665        if verbose:
1666            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1667
1668        # get exportJobId from the result
1669        if asset_version_id:
1670            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1671        elif product_id:
1672            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1673        else:
1674            raise Exception(
1675                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1676
1677        if verbose:
1678            print(f'Export Job ID: {export_job_id}')
1679
1680    if report_type == "PDF":
1681        if report_subtype not in ["RISK_SUMMARY"]:
1682            raise Exception(f"Report Subtype {report_subtype} not supported")
1683
1684        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1685                                                            report_type=report_type, report_subtype=report_subtype)
1686        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1687                                                              report_type=report_type, report_subtype=report_subtype)
1688
1689        response_data = send_graphql_query(token, organization_context, mutation, variables)
1690        if verbose:
1691            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1692
1693        # get exportJobId from the result
1694        if asset_version_id:
1695            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1696        elif product_id:
1697            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1698        else:
1699            raise Exception(
1700                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1701
1702        if verbose:
1703            print(f'Export Job ID: {export_job_id}')
1704
1705    if not export_job_id:
1706        raise Exception(
1707            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1708
1709    # poll the API until the export job is complete
1710    sleep_time = 10
1711    total_time = 0
1712    if verbose:
1713        print(f'Polling every {sleep_time} seconds for export job to complete')
1714
1715    while True:
1716        time.sleep(sleep_time)
1717        total_time += sleep_time
1718        if verbose:
1719            print(f'Total time elapsed: {total_time} seconds')
1720
1721        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1722        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1723
1724        response_data = send_graphql_query(token, organization_context, query, variables)
1725
1726        if verbose:
1727            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1728
1729        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1730            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1731                if verbose:
1732                    print(
1733                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1734                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1735
1736
1737def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1738                               verbose=False) -> str:
1739    """
1740    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1741    This may take several minutes to complete, depending on the size of SBOM.
1742
1743    Args:
1744        token (str):
1745            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1746        organization_context (str):
1747            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1748        sbom_type (str, required):
1749            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1750        sbom_subtype (str, required):
1751            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1752        asset_version_id (str, required):
1753            Asset Version ID to download the SBOM for.
1754        verbose (bool, optional):
1755            If True, print additional information to the console. Defaults to False.
1756
1757    Raises:
1758        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1759        Exception: Raised if the query fails.
1760
1761    Returns:
1762        str: URL to download the SBOM from.
1763    """
1764
1765    if not sbom_type:
1766        raise ValueError("SBOM Type is required")
1767    if not sbom_subtype:
1768        raise ValueError("SBOM Subtype is required")
1769    if not asset_version_id:
1770        raise ValueError("Asset Version ID is required")
1771
1772    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1773        raise Exception(f"SBOM Type {sbom_type} not supported")
1774
1775    if sbom_type == "CYCLONEDX":
1776        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1777            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1778
1779        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1780        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1781
1782        response_data = send_graphql_query(token, organization_context, mutation, variables)
1783        if verbose:
1784            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1785
1786        # get exportJobId from the result
1787        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1788        if verbose:
1789            print(f'Export Job ID: {export_job_id}')
1790
1791    if sbom_type == "SPDX":
1792        if sbom_subtype not in ["SBOM_ONLY"]:
1793            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1794
1795        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1796        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1797
1798        response_data = send_graphql_query(token, organization_context, mutation, variables)
1799        if verbose:
1800            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1801
1802        # get exportJobId from the result
1803        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1804        if verbose:
1805            print(f'Export Job ID: {export_job_id}')
1806
1807    if not export_job_id:
1808        raise Exception(
1809            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1810
1811    # poll the API until the export job is complete
1812    sleep_time = 10
1813    total_time = 0
1814    if verbose:
1815        print(f'Polling every {sleep_time} seconds for export job to complete')
1816    while True:
1817        time.sleep(sleep_time)
1818        total_time += sleep_time
1819        if verbose:
1820            print(f'Total time elapsed: {total_time} seconds')
1821
1822        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1823        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1824
1825        response_data = send_graphql_query(token, organization_context, query, variables)
1826
1827        if verbose:
1828            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1829
1830        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1831            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1832                if verbose:
1833                    print(
1834                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1835                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1836
1837
1838def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1839    """
1840    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1841    Args:
1842        token (str):
1843            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1844        organization_context (str):
1845            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1846        asset_version_id (str, optional):
1847            Asset Version ID to get software components for.
1848        type (str, optional):
1849            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1850    Raises:
1851        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1852    Returns:
1853        list: List of Software Component Objects
1854    """
1855    if not asset_version_id:
1856        raise Exception("Asset Version ID is required")
1857
1858    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1859                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1860                                                                                  type=type),
1861                                     'allSoftwareComponentInstances')
1862
1863
1864def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1865                case_sensitive=False) -> list:
1866    """
1867    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1868    Search Methods: EXACT or CONTAINS
1869    An exact match will return only the software component whose name matches the name exactly.
1870    A contains match will return all software components whose name contains the search string.
1871
1872    Args:
1873        token (str):
1874            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1875        organization_context (str):
1876            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1877        name (str, required):
1878            Name of the software component to search for.
1879        version (str, optional):
1880            Version of the software component to search for. If not specified, will search for all versions of the software component.
1881        asset_version_id (str, optional):
1882            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1883        search_method (str, optional):
1884            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1885        case_sensitive (bool, optional):
1886            Whether or not to perform a case sensitive search. Defaults to False.
1887    Raises:
1888        ValueError: Raised if name is not provided.
1889        Exception: Raised if the query fails.
1890    Returns:
1891        list: List of SoftwareComponentInstance Objects
1892    """
1893    if asset_version_id:
1894        query = """
1895query GetSoftwareComponentInstances_SDK(
1896    $filter: SoftwareComponentInstanceFilter
1897    $after: String
1898    $first: Int
1899) {
1900    allSoftwareComponentInstances(
1901        filter: $filter
1902        after: $after
1903        first: $first
1904    ) {
1905        _cursor
1906        id
1907        name
1908        version
1909        originalComponents {
1910            id
1911            name
1912            version
1913        }
1914    }
1915}
1916"""
1917    else:
1918        # gets the asset version info that contains the software component
1919        query = """
1920query GetSoftwareComponentInstances_SDK(
1921    $filter: SoftwareComponentInstanceFilter
1922    $after: String
1923    $first: Int
1924) {
1925    allSoftwareComponentInstances(
1926        filter: $filter
1927        after: $after
1928        first: $first
1929    ) {
1930        _cursor
1931        id
1932        name
1933        version
1934        assetVersion {
1935            id
1936            name
1937            asset {
1938                id
1939                name
1940            }
1941        }
1942    }
1943}
1944"""
1945
1946    variables = {
1947        "filter": {
1948            "mergedComponentRefId": None
1949        },
1950        "after": None,
1951        "first": 100
1952    }
1953
1954    if asset_version_id:
1955        variables["filter"]["assetVersionRefId"] = asset_version_id
1956
1957    if search_method == 'EXACT':
1958        if case_sensitive:
1959            variables["filter"]["name"] = name
1960        else:
1961            variables["filter"]["name_like"] = name
1962    elif search_method == 'CONTAINS':
1963        variables["filter"]["name_contains"] = name
1964
1965    if version:
1966        if search_method == 'EXACT':
1967            variables["filter"]["version"] = version
1968        elif search_method == 'CONTAINS':
1969            variables["filter"]["version_contains"] = version
1970
1971    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1972                                        field="allSoftwareComponentInstances")
1973
1974    return records
1975
1976
1977@retry(stop=stop_after_attempt(5), wait=wait_fixed(5), retry=retry_if_exception(is_not_breakout_exception))
1978def send_graphql_query(token, organization_context, query, variables=None):
1979    """
1980    Send a GraphQL query to the API
1981
1982    Args:
1983        token (str):
1984            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1985        organization_context (str):
1986            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1987        query (str):
1988            The GraphQL query string
1989        variables (dict, optional):
1990            Variables to be used in the GraphQL query, by default None
1991
1992    Raises:
1993        Exception: If the response status code is not 200
1994
1995    Returns:
1996        dict: Response JSON
1997    """
1998    headers = {
1999        "Content-Type": "application/json",
2000        "Authorization": f"Bearer {token}",
2001        "Organization-Context": organization_context,
2002    }
2003    data = {"query": query, "variables": variables}
2004
2005    response = requests.post(API_URL, headers=headers, json=data)
2006    if response.status_code == 200:
2007        thejson = response.json()
2008
2009        if "errors" in thejson:
2010            # Raise a BreakoutException for GraphQL errors
2011            raise BreakoutException(f"Error: {thejson['errors']}")
2012
2013        return thejson
2014    else:
2015        is_mutation_operation = is_mutation(query)
2016        if is_mutation_operation:
2017            raise BreakoutException(f"Error: {response.status_code} - {response.text}")
2018        else:
2019            raise Exception(f"Error: {response.status_code} - {response.text}")
2020
2021
2022def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2023                            justification=None, response=None, comment=None):
2024    """
2025    Updates the status of a findings or multiple findings. This is a blocking call.
2026
2027    Args:
2028        token (str):
2029            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2030        organization_context (str):
2031            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2032        user_id (str, required):
2033            User ID to update the finding status for.
2034        finding_ids (str, required):
2035            Finding ID to update the status for.
2036        status (str, required):
2037            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2038        justification (str, optional):
2039            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2040        response (str, optional):
2041            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2042        comment (str, optional):
2043            Optional comment to add to the finding status update.
2044
2045    Raises:
2046        ValueError: Raised if required parameters are not provided.
2047        Exception: Raised if the query fails.
2048
2049    Returns:
2050        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2051    """
2052    if not user_id:
2053        raise ValueError("User Id is required")
2054    if not finding_ids:
2055        raise ValueError("Finding Ids is required")
2056    if not status:
2057        raise ValueError("Status is required")
2058
2059    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2060    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2061                                                             justification=justification, response=response,
2062                                                             comment=comment)
2063
2064    return send_graphql_query(token, organization_context, mutation, variables)
2065
2066
2067def upload_file_for_binary_analysis(
2068    token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False
2069):
2070    """
2071    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk.
2072    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2073
2074    Args:
2075        token (str):
2076            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2077        organization_context (str):
2078            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2079        test_id (str, required):
2080            Test ID to upload the file for.
2081        file_path (str, required):
2082            Local path to the file to upload.
2083        chunk_size (int, optional):
2084            The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
2085        quick_scan (bool, optional):
2086            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2087        enable_bandit_scan (bool, optional):
2088            If True, will create an additional bandit scan in addition to the default binary analysis scan.
2089
2090    Raises:
2091        ValueError: Raised if test_id or file_path are not provided.
2092        Exception: Raised if the query fails.
2093
2094    Returns:
2095        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2096    """
2097    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2098    if not test_id:
2099        raise ValueError("Test Id is required")
2100    if not file_path:
2101        raise ValueError("File Path is required")
2102    if chunk_size < MIN_CHUNK_SIZE:
2103        raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes")
2104    if chunk_size >= MAX_CHUNK_SIZE:
2105        raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes")
2106
2107    # Start Multi-part Upload
2108    graphql_query = """
2109    mutation Start_SDK($testId: ID!) {
2110        startMultipartUploadV2(testId: $testId) {
2111            uploadId
2112            key
2113        }
2114    }
2115    """
2116
2117    variables = {
2118        "testId": test_id
2119    }
2120
2121    response = send_graphql_query(token, organization_context, graphql_query, variables)
2122
2123    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2124    upload_key = response['data']['startMultipartUploadV2']['key']
2125
2126    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2127    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2128    # and upload the file to the returned upload URL
2129    i = 0
2130    part_data = []
2131    for chunk in file_chunks(file_path, chunk_size):
2132        i = i + 1
2133        graphql_query = """
2134        mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2135            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2136                key
2137                uploadUrl
2138            }
2139        }
2140        """
2141
2142        variables = {
2143            "partNumber": i,
2144            "uploadId": upload_id,
2145            "uploadKey": upload_key
2146        }
2147
2148        response = send_graphql_query(token, organization_context, graphql_query, variables)
2149
2150        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2151
2152        # upload the chunk to the upload URL
2153        response = upload_bytes_to_url(chunk_upload_url, chunk)
2154
2155        part_data.append({
2156            "ETag": response.headers['ETag'],
2157            "PartNumber": i
2158        })
2159
2160    # call completeMultipartUploadV2
2161    graphql_query = """
2162    mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2163        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2164            key
2165        }
2166    }
2167    """
2168
2169    variables = {
2170        "partData": part_data,
2171        "uploadId": upload_id,
2172        "uploadKey": upload_key
2173    }
2174
2175    response = send_graphql_query(token, organization_context, graphql_query, variables)
2176
2177    # get key from the result
2178    key = response['data']['completeMultipartUploadV2']['key']
2179
2180    variables = {
2181        "key": key,
2182        "testId": test_id
2183    }
2184
2185    # call launchBinaryUploadProcessing
2186    if quick_scan or enable_bandit_scan:
2187        graphql_query = """
2188        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2189            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2190                key
2191                newBanditScanId
2192            }
2193        }
2194        """
2195    else:
2196        graphql_query = """
2197        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) {
2198            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2199                key
2200                newBanditScanId
2201            }
2202        }
2203        """
2204
2205    if quick_scan:
2206        variables["configurationOptions"] = ["QUICK_SCAN"]
2207
2208    if enable_bandit_scan:
2209        config_options = variables.get("configurationOptions", [])
2210        config_options.append("ENABLE_BANDIT_SCAN")
2211        variables["configurationOptions"] = config_options
2212
2213    response = send_graphql_query(token, organization_context, graphql_query, variables)
2214
2215    return response['data']
2216
2217
2218def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2219    """
2220    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2221
2222    Args:
2223        token (str):
2224            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2225        organization_context (str):
2226            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2227        test_id (str, required):
2228            Test ID to upload the file for.
2229        file_path (str, required):
2230            Local path to the file to upload.
2231
2232    Raises:
2233        ValueError: Raised if test_id or file_path are not provided.
2234        Exception: Raised if the query fails.
2235
2236    Returns:
2237        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2238    """
2239    if not test_id:
2240        raise ValueError("Test Id is required")
2241    if not file_path:
2242        raise ValueError("File Path is required")
2243
2244    # Gerneate Test Result Upload URL
2245    graphql_query = """
2246    mutation GenerateTestResultUploadUrl_SDK($testId: ID!) {
2247        generateSinglePartUploadUrl(testId: $testId) {
2248            uploadUrl
2249            key
2250        }
2251    }
2252    """
2253
2254    variables = {
2255        "testId": test_id
2256    }
2257
2258    response = send_graphql_query(token, organization_context, graphql_query, variables)
2259
2260    # get the upload URL and key
2261    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2262    key = response['data']['generateSinglePartUploadUrl']['key']
2263
2264    # upload the file
2265    upload_file_to_url(upload_url, file_path)
2266
2267    # complete the upload
2268    graphql_query = """
2269    mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) {
2270        launchTestResultProcessing(key: $key, testId: $testId) {
2271            key
2272        }
2273    }
2274    """
2275
2276    variables = {
2277        "testId": test_id,
2278        "key": key
2279    }
2280
2281    response = send_graphql_query(token, organization_context, graphql_query, variables)
2282    return response['data']
2283
2284
2285def upload_bytes_to_url(url, bytes):
2286    """
2287    Used for uploading a file to a pre-signed S3 URL
2288
2289    Args:
2290        url (str):
2291            (Pre-signed S3) URL
2292        bytes (bytes):
2293            Bytes to upload
2294
2295    Raises:
2296        Exception: If the response status code is not 200
2297
2298    Returns:
2299        requests.Response: Response object
2300    """
2301    response = requests.put(url, data=bytes)
2302
2303    if response.status_code == 200:
2304        return response
2305    else:
2306        raise Exception(f"Error: {response.status_code} - {response.text}")
2307
2308
2309def upload_file_to_url(url, file_path):
2310    """
2311    Used for uploading a file to a pre-signed S3 URL
2312
2313    Args:
2314        url (str):
2315            (Pre-signed S3) URL
2316        file_path (str):
2317            Local path to file to upload
2318
2319    Raises:
2320        Exception: If the response status code is not 200
2321
2322    Returns:
2323        requests.Response: Response object
2324    """
2325    with open(file_path, 'rb') as file:
2326        response = requests.put(url, data=file)
2327
2328    if response.status_code == 200:
2329        return response
2330    else:
2331        raise Exception(f"Error: {response.status_code} - {response.text}")
API_URL = 'https://platform.finitestate.io/api/v1/graphql'
AUDIENCE = 'https://platform.finitestate.io/api/v1/graphql'
TOKEN_URL = 'https://platform.finitestate.io/api/v1/auth/token'

DEFAULT CHUNK SIZE: 1000 MiB

DEFAULT_CHUNK_SIZE = 1048576000

MAX CHUNK SIZE: 2 GiB

MAX_CHUNK_SIZE = 2097152000

MIN CHUNK SIZE: 5 MiB

MIN_CHUNK_SIZE = 5242880
class UploadMethod(enum.Enum):
34class UploadMethod(Enum):
35    """
36    Enumeration class representing different upload methods.
37
38    Attributes:
39        WEB_APP_UI: Upload method via web application UI.
40        API: Upload method via API.
41        GITHUB_INTEGRATION: Upload method via GitHub integration.
42        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
43
44    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
45    """
46    WEB_APP_UI = "WEB_APP_UI"
47    API = "API"
48    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
49    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"

Enumeration class representing different upload methods.

Attributes:
  • WEB_APP_UI: Upload method via web application UI.
  • API: Upload method via API.
  • GITHUB_INTEGRATION: Upload method via GitHub integration.
  • AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.

To use any value from this enumeration, use UploadMethod. i.e. finite_state_sdk.UploadMethod.WEB_APP_UI

WEB_APP_UI = <UploadMethod.WEB_APP_UI: 'WEB_APP_UI'>
API = <UploadMethod.API: 'API'>
GITHUB_INTEGRATION = <UploadMethod.GITHUB_INTEGRATION: 'GITHUB_INTEGRATION'>
AZURE_DEVOPS_INTEGRATION = <UploadMethod.AZURE_DEVOPS_INTEGRATION: 'AZURE_DEVOPS_INTEGRATION'>
Inherited Members
enum.Enum
name
value
def create_artifact( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_version_id=None, artifact_name=None, product_id=None):
 52def create_artifact(
 53    token,
 54    organization_context,
 55    business_unit_id=None,
 56    created_by_user_id=None,
 57    asset_version_id=None,
 58    artifact_name=None,
 59    product_id=None,
 60):
 61    """
 62    Create a new Artifact.
 63    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 64    Please see the examples in the Github repository for more information:
 65    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 66    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 67
 68    Args:
 69        token (str):
 70            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 71        organization_context (str):
 72            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 73        business_unit_id (str, required):
 74            Business Unit ID to associate the artifact with.
 75        created_by_user_id (str, required):
 76            User ID of the user creating the artifact.
 77        asset_version_id (str, required):
 78            Asset Version ID to associate the artifact with.
 79        artifact_name (str, required):
 80            The name of the Artifact being created.
 81        product_id (str, optional):
 82            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
 83
 84    Raises:
 85        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
 86        Exception: Raised if the query fails.
 87
 88    Returns:
 89        dict: createArtifact Object
 90    """
 91    if not business_unit_id:
 92        raise ValueError("Business unit ID is required")
 93    if not created_by_user_id:
 94        raise ValueError("Created by user ID is required")
 95    if not asset_version_id:
 96        raise ValueError("Asset version ID is required")
 97    if not artifact_name:
 98        raise ValueError("Artifact name is required")
 99
100    graphql_query = """
101    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
102        createArtifact(input: $input) {
103            id
104            name
105            assetVersion {
106                id
107                name
108                asset {
109                    id
110                    name
111                }
112            }
113            createdBy {
114                id
115                email
116            }
117            ctx {
118                asset
119                products
120                businessUnits
121            }
122        }
123    }
124    """
125
126    # Asset name, business unit context, and creating user are required
127    variables = {
128        "input": {
129            "name": artifact_name,
130            "createdBy": created_by_user_id,
131            "assetVersion": asset_version_id,
132            "ctx": {
133                "asset": asset_version_id,
134                "businessUnits": [business_unit_id]
135            }
136        }
137    }
138
139    if product_id is not None:
140        variables["input"]["ctx"]["products"] = product_id
141
142    response = send_graphql_query(token, organization_context, graphql_query, variables)
143    return response['data']

Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the artifact with.
  • created_by_user_id (str, required): User ID of the user creating the artifact.
  • asset_version_id (str, required): Asset Version ID to associate the artifact with.
  • artifact_name (str, required): The name of the Artifact being created.
  • product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createArtifact Object

def create_asset( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
146def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
147    """
148    Create a new Asset.
149
150    Args:
151        token (str):
152            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
153        organization_context (str):
154            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
155        business_unit_id (str, required):
156            Business Unit ID to associate the asset with.
157        created_by_user_id (str, required):
158            User ID of the user creating the asset.
159        asset_name (str, required):
160            The name of the Asset being created.
161        product_id (str, optional):
162            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
163
164    Raises:
165        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
166        Exception: Raised if the query fails.
167
168    Returns:
169        dict: createAsset Object
170    """
171    if not business_unit_id:
172        raise ValueError("Business unit ID is required")
173    if not created_by_user_id:
174        raise ValueError("Created by user ID is required")
175    if not asset_name:
176        raise ValueError("Asset name is required")
177
178    graphql_query = """
179    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
180        createAsset(input: $input) {
181            id
182            name
183            dependentProducts {
184                id
185                name
186            }
187            group {
188                id
189                name
190            }
191            createdBy {
192                id
193                email
194            }
195            ctx {
196                asset
197                products
198                businessUnits
199            }
200        }
201    }
202    """
203
204    # Asset name, business unit context, and creating user are required
205    variables = {
206        "input": {
207            "name": asset_name,
208            "group": business_unit_id,
209            "createdBy": created_by_user_id,
210            "ctx": {
211                "businessUnits": [business_unit_id]
212            }
213        }
214    }
215
216    if product_id is not None:
217        variables["input"]["ctx"]["products"] = product_id
218
219    response = send_graphql_query(token, organization_context, graphql_query, variables)
220    return response['data']

Create a new Asset.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset with.
  • created_by_user_id (str, required): User ID of the user creating the asset.
  • asset_name (str, required): The name of the Asset being created.
  • product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAsset Object

def create_asset_version( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
223def create_asset_version(
224    token,
225    organization_context,
226    business_unit_id=None,
227    created_by_user_id=None,
228    asset_id=None,
229    asset_version_name=None,
230    product_id=None,
231):
232    """
233    Create a new Asset Version.
234
235    Args:
236        token (str):
237            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
238        organization_context (str):
239            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
240        business_unit_id (str, required):
241            Business Unit ID to associate the asset version with.
242        created_by_user_id (str, required):
243            User ID of the user creating the asset version.
244        asset_id (str, required):
245            Asset ID to associate the asset version with.
246        asset_version_name (str, required):
247            The name of the Asset Version being created.
248        product_id (str, optional):
249            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
250
251    Raises:
252        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
253        Exception: Raised if the query fails.
254
255    Returns:
256        dict: createAssetVersion Object
257
258    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
259    """
260    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
261    if not business_unit_id:
262        raise ValueError("Business unit ID is required")
263    if not created_by_user_id:
264        raise ValueError("Created by user ID is required")
265    if not asset_id:
266        raise ValueError("Asset ID is required")
267    if not asset_version_name:
268        raise ValueError("Asset version name is required")
269
270    graphql_query = """
271    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
272        createAssetVersion(input: $input) {
273            id
274            name
275            asset {
276                id
277                name
278            }
279            createdBy {
280                id
281                email
282            }
283            ctx {
284                asset
285                products
286                businessUnits
287            }
288        }
289    }
290    """
291
292    # Asset name, business unit context, and creating user are required
293    variables = {
294        "input": {
295            "name": asset_version_name,
296            "createdBy": created_by_user_id,
297            "asset": asset_id,
298            "ctx": {
299                "asset": asset_id,
300                "businessUnits": [business_unit_id]
301            }
302        }
303    }
304
305    if product_id is not None:
306        variables["input"]["ctx"]["products"] = product_id
307
308    response = send_graphql_query(token, organization_context, graphql_query, variables)
309    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset version with.
  • created_by_user_id (str, required): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
  • product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

deprecated:: 0.1.7. Use create_asset_version_on_asset instead.

def create_asset_version_on_asset( token, organization_context, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
312def create_asset_version_on_asset(
313    token,
314    organization_context,
315    created_by_user_id=None,
316    asset_id=None,
317    asset_version_name=None,
318    product_id=None,
319):
320    """
321    Create a new Asset Version.
322
323    Args:
324        token (str):
325            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
326        organization_context (str):
327            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
328        created_by_user_id (str, optional):
329            User ID of the user creating the asset version.
330        asset_id (str, required):
331            Asset ID to associate the asset version with.
332        asset_version_name (str, required):
333            The name of the Asset Version being created.
334
335    Raises:
336        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
337        Exception: Raised if the query fails.
338
339    Returns:
340        dict: createAssetVersion Object
341    """
342    if not asset_id:
343        raise ValueError("Asset ID is required")
344    if not asset_version_name:
345        raise ValueError("Asset version name is required")
346
347    graphql_query = """
348        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) {
349            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) {
350                id
351                assetVersion {
352                    id
353                }
354            }
355        }
356    """
357
358    # Asset name, business unit context, and creating user are required
359    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
360
361    if created_by_user_id:
362        variables["createdByUserId"] = created_by_user_id
363
364    if product_id:
365        variables["productId"] = product_id
366
367    response = send_graphql_query(token, organization_context, graphql_query, variables)
368    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • created_by_user_id (str, optional): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

def create_new_asset_version_artifact_and_test_for_upload( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, product_id=None, test_type=None, artifact_description=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
371def create_new_asset_version_artifact_and_test_for_upload(
372    token,
373    organization_context,
374    business_unit_id=None,
375    created_by_user_id=None,
376    asset_id=None,
377    version=None,
378    product_id=None,
379    test_type=None,
380    artifact_description=None,
381    upload_method: UploadMethod = UploadMethod.API,
382):
383    """
384    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
385    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
386
387    Args:
388        token (str):
389            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
390        organization_context (str):
391            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
392        business_unit_id (str, optional):
393            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
394        created_by_user_id (str, optional):
395            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
396        asset_id (str, required):
397            Asset ID to create the asset version for. If not provided, the default asset will be used.
398        version (str, required):
399            Version to create the asset version for.
400        product_id (str, optional):
401            Product ID to create the entities for. If not provided, the default product will be used.
402        test_type (str, required):
403            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
404        artifact_description (str, optional):
405            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
406        upload_method (UploadMethod, optional):
407            The method of uploading the test results. Default is UploadMethod.API.
408
409
410    Raises:
411        ValueError: Raised if asset_id or version are not provided.
412        Exception: Raised if the query fails.
413
414    Returns:
415        str: The Test ID of the newly created test that is used for uploading the file.
416    """
417    if not asset_id:
418        raise ValueError("Asset ID is required")
419    if not version:
420        raise ValueError("Version is required")
421
422    assets = get_all_assets(token, organization_context, asset_id=asset_id)
423    if not assets:
424        raise ValueError("No assets found with the provided asset ID")
425    asset = assets[0]
426
427    # get the asset name
428    asset_name = asset['name']
429
430    # get the existing asset product IDs
431    asset_product_ids = asset['ctx']['products']
432
433    # get the asset product ID
434    if product_id and product_id not in asset_product_ids:
435        asset_product_ids.append(product_id)
436
437    # if business_unit_id or created_by_user_id are not provided, get the existing asset
438    if not business_unit_id or not created_by_user_id:
439        if not business_unit_id:
440            business_unit_id = asset['group']['id']
441        if not created_by_user_id:
442            created_by_user_id = asset['createdBy']['id']
443
444        if not business_unit_id:
445            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
446        if not created_by_user_id:
447            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
448
449    # create the asset version
450    response = create_asset_version_on_asset(
451        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id
452    )
453    # get the asset version ID
454    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
455
456    # create the test
457    if test_type == "finite_state_binary_analysis":
458        # create the artifact
459        if not artifact_description:
460            artifact_description = "Binary"
461        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
462        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
463                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
464                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
465
466        # get the artifact ID
467        binary_artifact_id = response['createArtifact']['id']
468
469        # create the test
470        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
471        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
472                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
473                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
474                                                  test_name=test_name, upload_method=upload_method)
475        test_id = response['createTest']['id']
476        return test_id
477
478    else:
479        # create the artifact
480        if not artifact_description:
481            artifact_description = "Unspecified Artifact"
482        artifact_name = f"{asset_name} {version} - {artifact_description}"
483        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
484                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
485                                   artifact_name=artifact_name, product_id=asset_product_ids)
486
487        # get the artifact ID
488        binary_artifact_id = response['createArtifact']['id']
489
490        # create the test
491        test_name = f"{asset_name} {version} - {test_type}"
492        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
493                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
494                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
495                                                      test_name=test_name, test_type=test_type,
496                                                      upload_method=upload_method)
497        test_id = response['createTest']['id']
498        return test_id

Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
  • created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
  • version (str, required): Version to create the asset version for.
  • product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
  • test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
  • artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id or version are not provided.
  • Exception: Raised if the query fails.
Returns:

str: The Test ID of the newly created test that is used for uploading the file.

def create_new_asset_version_and_upload_binary( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, artifact_description=None, quick_scan=False, upload_method: UploadMethod = <UploadMethod.API: 'API'>, enable_bandit_scan: bool = False):
501def create_new_asset_version_and_upload_binary(
502    token,
503    organization_context,
504    business_unit_id=None,
505    created_by_user_id=None,
506    asset_id=None,
507    version=None,
508    file_path=None,
509    product_id=None,
510    artifact_description=None,
511    quick_scan=False,
512    upload_method: UploadMethod = UploadMethod.API,
513    enable_bandit_scan: bool = False,
514):
515    """
516    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
517    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
518
519    Args:
520        token (str):
521            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
522        organization_context (str):
523            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
524        business_unit_id (str, optional):
525            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
526        created_by_user_id (str, optional):
527            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
528        asset_id (str, required):
529            Asset ID to create the asset version for.
530        version (str, required):
531            Version to create the asset version for.
532        file_path (str, required):
533            Local path to the file to upload.
534        product_id (str, optional):
535            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
536        artifact_description (str, optional):
537            Description of the artifact. If not provided, the default is "Firmware Binary".
538        quick_scan (bool, optional):
539            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
540        upload_method (UploadMethod, optional):
541            The method of uploading the test results. Default is UploadMethod.API.
542        enable_bandit_scan (bool, optional):
543            If True, will create an additional bandit scan in addition to the default binary analysis scan.
544
545    Raises:
546        ValueError: Raised if asset_id, version, or file_path are not provided.
547        Exception: Raised if any of the queries fail.
548
549    Returns:
550        dict: The response from the GraphQL query, a createAssetVersion Object.
551    """
552    if not asset_id:
553        raise ValueError("Asset ID is required")
554    if not version:
555        raise ValueError("Version is required")
556    if not file_path:
557        raise ValueError("File path is required")
558
559    # create the asset version and binary test
560    if not artifact_description:
561        artifact_description = "Firmware Binary"
562    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
563        token,
564        organization_context,
565        business_unit_id=business_unit_id,
566        created_by_user_id=created_by_user_id,
567        asset_id=asset_id,
568        version=version,
569        product_id=product_id,
570        test_type="finite_state_binary_analysis",
571        artifact_description=artifact_description,
572        upload_method=upload_method,
573    )
574
575    # upload file for binary test
576    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
577                                               quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan)
578    return response

Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Local path to the file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
  • artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
  • quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
  • enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
  • ValueError: Raised if asset_id, version, or file_path are not provided.
  • Exception: Raised if any of the queries fail.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_new_asset_version_and_upload_test_results( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, test_type=None, artifact_description='', upload_method: UploadMethod = <UploadMethod.API: 'API'>):
581def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
582                                                     created_by_user_id=None, asset_id=None, version=None,
583                                                     file_path=None, product_id=None, test_type=None,
584                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
585    """
586    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
587    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
588
589    Args:
590        token (str):
591            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
592        organization_context (str):
593            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
594        business_unit_id (str, optional):
595            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
596        created_by_user_id (str, optional):
597            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
598        asset_id (str, required):
599            Asset ID to create the asset version for.
600        version (str, required):
601            Version to create the asset version for.
602        file_path (str, required):
603            Path to the test results file to upload.
604        product_id (str, optional):
605            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
606        test_type (str, required):
607            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
608        artifact_description (str, optional):
609            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
610        upload_method (UploadMethod, optional):
611            The method of uploading the test results. Default is UploadMethod.API.
612
613    Raises:
614        ValueError: If the asset_id, version, or file_path are not provided.
615        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
616
617    Returns:
618        dict: The response from the GraphQL query, a createAssetVersion Object.
619    """
620    if not asset_id:
621        raise ValueError("Asset ID is required")
622    if not version:
623        raise ValueError("Version is required")
624    if not file_path:
625        raise ValueError("File path is required")
626    if not test_type:
627        raise ValueError("Test type is required")
628
629    # create the asset version and test
630    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
631                                                                    business_unit_id=business_unit_id,
632                                                                    created_by_user_id=created_by_user_id,
633                                                                    asset_id=asset_id, version=version,
634                                                                    product_id=product_id, test_type=test_type,
635                                                                    artifact_description=artifact_description,
636                                                                    upload_method=upload_method)
637
638    # upload test results file
639    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
640    return response

Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Path to the test results file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
  • test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
  • artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: If the asset_id, version, or file_path are not provided.
  • Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_product( token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, product_description=None, vendor_id=None, vendor_name=None):
643def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
644                   product_description=None, vendor_id=None, vendor_name=None):
645    """
646    Create a new Product.
647
648    Args:
649        token (str):
650            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
651        organization_context (str):
652            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
653        business_unit_id (str, required):
654            Business Unit ID to associate the product with.
655        created_by_user_id (str, required):
656            User ID of the user creating the product.
657        product_name (str, required):
658            The name of the Product being created.
659        product_description (str, optional):
660            The description of the Product being created.
661        vendor_id (str, optional):
662            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
663        vendor_name (str, optional):
664            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
665
666    Raises:
667        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
668        Exception: Raised if the query fails.
669
670    Returns:
671        dict: createProduct Object
672    """
673
674    if not business_unit_id:
675        raise ValueError("Business unit ID is required")
676    if not created_by_user_id:
677        raise ValueError("Created by user ID is required")
678    if not product_name:
679        raise ValueError("Product name is required")
680
681    graphql_query = """
682    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
683        createProduct(input: $input) {
684            id
685            name
686            vendor {
687                name
688            }
689            group {
690                id
691                name
692            }
693            createdBy {
694                id
695                email
696            }
697            ctx {
698                businessUnit
699            }
700        }
701    }
702    """
703
704    # Product name, business unit context, and creating user are required
705    variables = {
706        "input": {
707            "name": product_name,
708            "group": business_unit_id,
709            "createdBy": created_by_user_id,
710            "ctx": {
711                "businessUnit": business_unit_id
712            }
713        }
714    }
715
716    if product_description is not None:
717        variables["input"]["description"] = product_description
718
719    # If the vendor ID is specified, this will link the new product to the existing vendor
720    if vendor_id is not None:
721        variables["input"]["vendor"] = {
722            "id": vendor_id
723        }
724
725    # If the vendor name is specified, this will create a new vendor and link it to the new product
726    if vendor_name is not None:
727        variables["input"]["createVendor"] = {
728            "name": vendor_name
729        }
730
731    response = send_graphql_query(token, organization_context, graphql_query, variables)
732
733    return response['data']

Create a new Product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the product with.
  • created_by_user_id (str, required): User ID of the user creating the product.
  • product_name (str, required): The name of the Product being created.
  • product_description (str, optional): The description of the Product being created.
  • vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
  • vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createProduct Object

def create_test( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, tools=[], upload_method: UploadMethod = <UploadMethod.API: 'API'>):
736def create_test(
737    token,
738    organization_context,
739    business_unit_id=None,
740    created_by_user_id=None,
741    asset_id=None,
742    artifact_id=None,
743    test_name=None,
744    product_id=None,
745    test_type=None,
746    tools=[],
747    upload_method: UploadMethod = UploadMethod.API,
748):
749    """
750    Create a new Test object for uploading files.
751    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
752    Please see the examples in the Github repository for more information:
753    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
754    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
755
756    Args:
757        token (str):
758            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
759        organization_context (str):
760            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
761        business_unit_id (str, required):
762            Business Unit ID to associate the Test with.
763        created_by_user_id (str, required):
764            User ID of the user creating the Test.
765        asset_id (str, required):
766            Asset ID to associate the Test with.
767        artifact_id (str, required):
768            Artifact ID to associate the Test with.
769        test_name (str, required):
770            The name of the Test being created.
771        product_id (str, optional):
772            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
773        test_type (str, required):
774            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
775        tools (list, optional):
776            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
777        upload_method (UploadMethod, required):
778            The method of uploading the test results.
779
780    Raises:
781        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
782        Exception: Raised if the query fails.
783
784    Returns:
785        dict: createTest Object
786    """
787    if not business_unit_id:
788        raise ValueError("Business unit ID is required")
789    if not created_by_user_id:
790        raise ValueError("Created by user ID is required")
791    if not asset_id:
792        raise ValueError("Asset ID is required")
793    if not artifact_id:
794        raise ValueError("Artifact ID is required")
795    if not test_name:
796        raise ValueError("Test name is required")
797    if not test_type:
798        raise ValueError("Test type is required")
799
800    graphql_query = """
801    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
802        createTest(input: $input) {
803            id
804            name
805            artifactUnderTest {
806                id
807                name
808                assetVersion {
809                    id
810                    name
811                    asset {
812                        id
813                        name
814                        dependentProducts {
815                            id
816                            name
817                        }
818                    }
819                }
820            }
821            createdBy {
822                id
823                email
824            }
825            ctx {
826                asset
827                products
828                businessUnits
829            }
830            uploadMethod
831        }
832    }
833    """
834
835    # Asset name, business unit context, and creating user are required
836    variables = {
837        "input": {
838            "name": test_name,
839            "createdBy": created_by_user_id,
840            "artifactUnderTest": artifact_id,
841            "testResultFileFormat": test_type,
842            "ctx": {
843                "asset": asset_id,
844                "businessUnits": [business_unit_id]
845            },
846            "tools": tools,
847            "uploadMethod": upload_method.value
848        }
849    }
850
851    if product_id is not None:
852        variables["input"]["ctx"]["products"] = product_id
853
854    response = send_graphql_query(token, organization_context, graphql_query, variables)
855    return response['data']

Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
  • tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
  • upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_binary_analysis( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
858def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
859                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
860                                   upload_method: UploadMethod = UploadMethod.API):
861    """
862    Create a new Test object for uploading files for Finite State Binary Analysis.
863
864    Args:
865        token (str):
866            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
867        organization_context (str):
868            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
869        business_unit_id (str, required):
870            Business Unit ID to associate the Test with.
871        created_by_user_id (str, required):
872            User ID of the user creating the Test.
873        asset_id (str, required):
874            Asset ID to associate the Test with.
875        artifact_id (str, required):
876            Artifact ID to associate the Test with.
877        test_name (str, required):
878            The name of the Test being created.
879        product_id (str, optional):
880            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
881        upload_method (UploadMethod, optional):
882            The method of uploading the test results. Default is UploadMethod.API.
883
884    Raises:
885        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
886        Exception: Raised if the query fails.
887
888    Returns:
889        dict: createTest Object
890    """
891    tools = [
892        {
893            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
894            "name": "Finite State Binary Analysis"
895        }
896    ]
897    return create_test(token, organization_context, business_unit_id=business_unit_id,
898                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
899                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
900                       tools=tools, upload_method=upload_method)

Create a new Test object for uploading files for Finite State Binary Analysis.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_cyclone_dx( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
903def create_test_as_cyclone_dx(
904    token,
905    organization_context,
906    business_unit_id=None,
907    created_by_user_id=None,
908    asset_id=None,
909    artifact_id=None,
910    test_name=None,
911    product_id=None,
912    upload_method: UploadMethod = UploadMethod.API,
913):
914    """
915    Create a new Test object for uploading CycloneDX files.
916
917    Args:
918        token (str):
919            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
920        organization_context (str):
921            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
922        business_unit_id (str, required):
923            Business Unit ID to associate the Test with.
924        created_by_user_id (str, required):
925            User ID of the user creating the Test.
926        asset_id (str, required):
927            Asset ID to associate the Test with.
928        artifact_id (str, required):
929            Artifact ID to associate the Test with.
930        test_name (str, required):
931            The name of the Test being created.
932        product_id (str, optional):
933            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
934        upload_method (UploadMethod, optional):
935            The method of uploading the test results. Default is UploadMethod.API.
936
937    Raises:
938        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
939        Exception: Raised if the query fails.
940
941    Returns:
942        dict: createTest Object
943    """
944    return create_test(token, organization_context, business_unit_id=business_unit_id,
945                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
946                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)

Create a new Test object for uploading CycloneDX files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_third_party_scanner( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
949def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
950                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
951                                       upload_method: UploadMethod = UploadMethod.API):
952    """
953    Create a new Test object for uploading Third Party Scanner files.
954
955    Args:
956        token (str):
957            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
958        organization_context (str):
959            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
960        business_unit_id (str, required):
961            Business Unit ID to associate the Test with.
962        created_by_user_id (str, required):
963            User ID of the user creating the Test.
964        asset_id (str, required):
965            Asset ID to associate the Test with.
966        artifact_id (str, required):
967            Artifact ID to associate the Test with.
968        test_name (str, required):
969            The name of the Test being created.
970        product_id (str, optional):
971            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
972        test_type (str, required):
973            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
974        upload_method (UploadMethod, optional):
975            The method of uploading the test results. Default is UploadMethod.API.
976
977    Raises:
978        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
979        Exception: Raised if the query fails.
980
981    Returns:
982        dict: createTest Object
983    """
984    return create_test(token, organization_context, business_unit_id=business_unit_id,
985                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
986                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)

Create a new Test object for uploading Third Party Scanner files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def download_asset_version_report( token, organization_context, asset_version_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
 989def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 990                                  report_subtype=None, output_filename=None, verbose=False):
 991    """
 992    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 993
 994    Args:
 995        token (str):
 996            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 997        organization_context (str):
 998            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 999        asset_version_id (str, required):
1000            The Asset Version ID to download the report for.
1001        report_type (str, required):
1002            The file type of the report to download. Valid values are "CSV" and "PDF".
1003        report_subtype (str, required):
1004            The type of report to download. Based on available reports for the `report_type` specified
1005            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1006            Valid values for PDF are "RISK_SUMMARY".
1007        output_filename (str, optional):
1008            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1009        verbose (bool, optional):
1010            If True, will print additional information to the console. Defaults to False.
1011
1012    Raises:
1013        ValueError: Raised if required parameters are not provided.
1014        Exception: Raised if the query fails.
1015
1016    Returns:
1017        None
1018    """
1019    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1020                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1021
1022    # Send an HTTP GET request to the URL
1023    response = requests.get(url)
1024
1025    # Check if the request was successful (status code 200)
1026    if response.status_code == 200:
1027        # Open a local file in binary write mode and write the content to it
1028        if verbose:
1029            print("File downloaded successfully.")
1030        with open(output_filename, 'wb') as file:
1031            file.write(response.content)
1032            if verbose:
1033                print(f'Wrote file to {output_filename}')
1034    else:
1035        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, required): The Asset Version ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def download_product_report( token, organization_context, product_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
1038def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1039                            output_filename=None, verbose=False):
1040    """
1041    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1042
1043    Args:
1044        token (str):
1045            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1046        organization_context (str):
1047            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1048        product_id (str, required):
1049            The Product ID to download the report for.
1050        report_type (str, required):
1051            The file type of the report to download. Valid values are "CSV".
1052        report_subtype (str, required):
1053            The type of report to download. Based on available reports for the `report_type` specified
1054            Valid values for CSV are "ALL_FINDINGS".
1055        output_filename (str, optional):
1056            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1057        verbose (bool, optional):
1058            If True, will print additional information to the console. Defaults to False.
1059    """
1060    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1061                                       report_subtype=report_subtype, verbose=verbose)
1062
1063    # Send an HTTP GET request to the URL
1064    response = requests.get(url)
1065
1066    # Check if the request was successful (status code 200)
1067    if response.status_code == 200:
1068        # Open a local file in binary write mode and write the content to it
1069        if verbose:
1070            print("File downloaded successfully.")
1071        with open(output_filename, 'wb') as file:
1072            file.write(response.content)
1073            if verbose:
1074                print(f'Wrote file to {output_filename}')
1075    else:
1076        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, required): The Product ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
def download_sbom( token, organization_context, sbom_type='CYCLONEDX', sbom_subtype='SBOM_ONLY', asset_version_id=None, output_filename='sbom.json', verbose=False):
1079def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1080                  output_filename="sbom.json", verbose=False):
1081    """
1082    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1083
1084    Args:
1085        token (str):
1086            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1087        organization_context (str):
1088            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1089        sbom_type (str, required):
1090            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1091        sbom_subtype (str, required):
1092            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1093        asset_version_id (str, required):
1094            The Asset Version ID to download the SBOM for.
1095        output_filename (str, required):
1096            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1097        verbose (bool, optional):
1098            If True, will print additional information to the console. Defaults to False.
1099
1100    Raises:
1101        ValueError: Raised if required parameters are not provided.
1102        Exception: Raised if the query fails.
1103
1104    Returns:
1105        None
1106    """
1107    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1108                                     asset_version_id=asset_version_id, verbose=verbose)
1109
1110    # Send an HTTP GET request to the URL
1111    response = requests.get(url)
1112
1113    # Check if the request was successful (status code 200)
1114    if response.status_code == 200:
1115        # Open a local file in binary write mode and write the content to it
1116        if verbose:
1117            print("File downloaded successfully.")
1118        with open(output_filename, 'wb') as file:
1119            file.write(response.content)
1120            if verbose:
1121                print(f'Wrote file to {output_filename}')
1122    else:
1123        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
  • asset_version_id (str, required): The Asset Version ID to download the SBOM for.
  • output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def file_chunks(file_path, chunk_size=1048576000):
1126def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1127    """
1128    Helper method to read a file in chunks.
1129
1130    Args:
1131        file_path (str):
1132            Local path to the file to read.
1133        chunk_size (int, optional):
1134            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1135
1136    Yields:
1137        bytes: The next chunk of the file.
1138
1139    Raises:
1140        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1141    """
1142    with open(file_path, 'rb') as f:
1143        while True:
1144            chunk = f.read(chunk_size)
1145            if chunk:
1146                yield chunk
1147            else:
1148                break

Helper method to read a file in chunks.

Arguments:
  • file_path (str): Local path to the file to read.
  • chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:

bytes: The next chunk of the file.

Raises:
  • FileIO Exceptions: Raised if the file cannot be opened or read correctly.
def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1151def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1152    """
1153    Get all artifacts in the organization. Uses pagination to get all results.
1154
1155    Args:
1156        token (str):
1157            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1158        organization_context (str):
1159            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1160        artifact_id (str, optional):
1161            An optional Artifact ID if this is used to get a single artifact, by default None
1162        business_unit_id (str, optional):
1163            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1164
1165    Raises:
1166        Exception: Raised if the query fails.
1167
1168    Returns:
1169        list: List of Artifact Objects
1170    """
1171    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1172                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')

Get all artifacts in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
  • business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Artifact Objects

def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1175def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1176    """
1177    Gets all assets in the organization. Uses pagination to get all results.
1178
1179    Args:
1180        token (str):
1181            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1182        organization_context (str):
1183            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1184        asset_id (str, optional):
1185            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1186        business_unit_id (str, optional):
1187            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1188
1189    Raises:
1190        Exception: Raised if the query fails.
1191
1192    Returns:
1193        list: List of Asset Objects
1194    """
1195    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1196                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets all assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_all_asset_versions(token, organization_context):
1199def get_all_asset_versions(token, organization_context):
1200    """
1201    Get all asset versions in the organization. Uses pagination to get all results.
1202
1203    Args:
1204        token (str):
1205            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1206        organization_context (str):
1207            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1208
1209    Raises:
1210        Exception: Raised if the query fails.
1211
1212    Returns:
1213        list: List of AssetVersion Objects
1214    """
1215    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1216                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')

Get all asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_all_asset_versions_for_product(token, organization_context, product_id):
1219def get_all_asset_versions_for_product(token, organization_context, product_id):
1220    """
1221    Get all asset versions for a product. Uses pagination to get all results.
1222
1223    Args:
1224        token (str):
1225            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1226        organization_context (str):
1227            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1228        product_id (str):
1229            The Product ID to get asset versions for
1230
1231    Returns:
1232        list: List of AssetVersion Objects
1233    """
1234    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1235                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Get all asset versions for a product. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str): The Product ID to get asset versions for
Returns:

list: List of AssetVersion Objects

def get_all_business_units(token, organization_context):
1238def get_all_business_units(token, organization_context):
1239    """
1240    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1241
1242    Args:
1243        token (str):
1244            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1245        organization_context (str):
1246            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1247
1248    Raises:
1249        Exception: Raised if the query fails.
1250
1251    Returns:
1252        list: List of Group Objects
1253    """
1254    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1255                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')

Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Group Objects

def get_all_organizations(token, organization_context):
1258def get_all_organizations(token, organization_context):
1259    """
1260    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1261
1262    Args:
1263        token (str):
1264            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1265        organization_context (str):
1266            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1267
1268    Raises:
1269        Exception: Raised if the query fails.
1270
1271    Returns:
1272        list: List of Organization Objects
1273    """
1274    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1275                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')

Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Organization Objects

def get_all_paginated_results( token, organization_context, query, variables=None, field=None, limit=None):
1278def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1279    """
1280    Get all results from a paginated GraphQL query
1281
1282    Args:
1283        token (str):
1284            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1285        organization_context (str):
1286            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1287        query (str):
1288            The GraphQL query string
1289        variables (dict, optional):
1290            Variables to be used in the GraphQL query, by default None
1291        field (str, required):
1292            The field in the response JSON that contains the results
1293        limit (int, Optional):
1294            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1295
1296    Raises:
1297        Exception: If the response status code is not 200, or if the field is not in the response JSON
1298
1299    Returns:
1300        list: List of results
1301    """
1302
1303    if not field:
1304        raise Exception("Error: field is required")
1305    if limit and limit > 1000:
1306        raise Exception("Error: limit cannot be greater than 1000")
1307    if limit and limit < 1:
1308        raise Exception("Error: limit cannot be less than 1")
1309    if not variables["first"]:
1310        raise Exception("Error: first is required")
1311    if variables["first"] < 1:
1312        raise Exception("Error: first cannot be less than 1")
1313    if variables["first"] > 1000:
1314        raise Exception("Error: limit cannot be greater than 1000")
1315
1316    # query the API for the first page of results
1317    response_data = send_graphql_query(token, organization_context, query, variables)
1318
1319    # if there are no results, return an empty list
1320    if not response_data:
1321        return []
1322
1323    # create a list to store the results
1324    results = []
1325
1326    # add the first page of results to the list
1327    if field in response_data['data']:
1328        results.extend(response_data['data'][field])
1329    else:
1330        raise Exception(f"Error: {field} not in response JSON")
1331
1332    if len(response_data['data'][field]) > 0:
1333        # get the cursor from the last entry in the list
1334        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1335
1336        while cursor:
1337            if limit and len(results) == limit:
1338                break
1339
1340            variables['after'] = cursor
1341
1342            # add the next page of results to the list
1343            response_data = send_graphql_query(token, organization_context, query, variables)
1344            results.extend(response_data['data'][field])
1345
1346            try:
1347                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1348            except IndexError:
1349                # when there is no additional cursor, stop getting more pages
1350                cursor = None
1351
1352    return results

Get all results from a paginated GraphQL query

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
  • field (str, required): The field in the response JSON that contains the results
  • limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
  • Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:

list: List of results

def get_all_products(token, organization_context):
1355def get_all_products(token, organization_context):
1356    """
1357    Get all products in the organization. Uses pagination to get all results.
1358
1359    Args:
1360        token (str):
1361            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1362        organization_context (str):
1363            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1364
1365    Raises:
1366        Exception: Raised if the query fails.
1367
1368    Returns:
1369        list: List of Product Objects
1370
1371    .. deprecated:: 0.1.4. Use get_products instead.
1372    """
1373    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1374    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1375                                     queries.ALL_PRODUCTS['variables'], 'allProducts')

Get all products in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Product Objects

Deprecated since version 0.1.4. Use get_products instead..

def get_all_users(token, organization_context):
1378def get_all_users(token, organization_context):
1379    """
1380    Get all users in the organization. Uses pagination to get all results.
1381
1382    Args:
1383        token (str):
1384            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1385        organization_context (str):
1386            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1387
1388    Raises:
1389        Exception: Raised if the query fails.
1390
1391    Returns:
1392        list: List of User Objects
1393    """
1394    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1395                                     queries.ALL_USERS['variables'], 'allUsers')

Get all users in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of User Objects

def get_artifact_context(token, organization_context, artifact_id):
1398def get_artifact_context(token, organization_context, artifact_id):
1399    """
1400    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1401
1402    Args:
1403        token (str):
1404            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1405        organization_context (str):
1406            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1407
1408    Raises:
1409        Exception: Raised if the query fails.
1410
1411    Returns:
1412        dict: Artifact Context Object
1413    """
1414    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1415                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1416
1417    return artifact[0]['ctx']

Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

dict: Artifact Context Object

def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1420def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1421    """
1422    Gets assets in the organization. Uses pagination to get all results.
1423
1424    Args:
1425        token (str):
1426            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1427        organization_context (str):
1428            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1429        asset_id (str, optional):
1430            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1431        business_unit_id (str, optional):
1432            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1433
1434    Raises:
1435        Exception: Raised if the query fails.
1436
1437    Returns:
1438        list: List of Asset Objects
1439    """
1440    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1441                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_asset_versions( token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1444def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1445    """
1446    Gets asset versions in the organization. Uses pagination to get all results.
1447
1448    Args:
1449        token (str):
1450            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1451        organization_context (str):
1452            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1453        asset_version_id (str, optional):
1454            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1455        asset_id (str, optional):
1456            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1457        business_unit_id (str, optional):
1458            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1459
1460    Raises:
1461        Exception: Raised if the query fails.
1462
1463    Returns:
1464        list: List of AssetVersion Objects
1465    """
1466    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1467                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1468                                                                             asset_id=asset_id,
1469                                                                             business_unit_id=business_unit_id),
1470                                     'allAssetVersions')

Gets asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
  • asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_auth_token( client_id, client_secret, token_url='https://platform.finitestate.io/api/v1/auth/token', audience='https://platform.finitestate.io/api/v1/graphql'):
1473def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1474    """
1475    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1476
1477    Args:
1478        client_id (str):
1479            CLIENT_ID as specified in the API documentation
1480        client_secret (str):
1481            CLIENT_SECRET as specified in the API documentation
1482        token_url (str, optional):
1483            Token URL, by default TOKEN_URL
1484        audience (str, optional):
1485            Audience, by default AUDIENCE
1486
1487    Raises:
1488        Exception: If the response status code is not 200
1489
1490    Returns:
1491        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1492    """
1493    payload = {
1494        "client_id": client_id,
1495        "client_secret": client_secret,
1496        "audience": AUDIENCE,
1497        "grant_type": "client_credentials"
1498    }
1499
1500    headers = {
1501        'content-type': "application/json"
1502    }
1503
1504    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1505    if response.status_code == 200:
1506        auth_token = response.json()['access_token']
1507    else:
1508        raise Exception(f"Error: {response.status_code} - {response.text}")
1509
1510    return auth_token

Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET

Arguments:
  • client_id (str): CLIENT_ID as specified in the API documentation
  • client_secret (str): CLIENT_SECRET as specified in the API documentation
  • token_url (str, optional): Token URL, by default TOKEN_URL
  • audience (str, optional): Audience, by default AUDIENCE
Raises:
  • Exception: If the response status code is not 200
Returns:

str: Auth token. Use this token as the Authorization header in subsequent API calls.

def get_findings( token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None, severity=None, count=False, limit=None):
1513def get_findings(
1514    token,
1515    organization_context,
1516    asset_version_id=None,
1517    finding_id=None,
1518    category=None,
1519    status=None,
1520    severity=None,
1521    count=False,
1522    limit=None,
1523):
1524    """
1525    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1526    Args:
1527        token (str):
1528            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1529        organization_context (str):
1530            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1531        asset_version_id (str, optional):
1532            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1533        finding_id (str, optional):
1534            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1535        category (str, optional):
1536            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1537            This can be a single string, or an array of values.
1538        status (str, optional):
1539            The status of Findings to return.
1540        severity (str, optional):
1541            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1542        count (bool, optional):
1543            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1544        limit (int, optional):
1545            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1546
1547    Raises:
1548        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1549
1550    Returns:
1551        list: List of Finding Objects
1552    """
1553
1554    if limit and limit > 1000:
1555        raise Exception("Error: limit must be less than 1000")
1556    if limit and limit < 1:
1557        raise Exception("Error: limit must be greater than 0")
1558
1559    if count:
1560        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1561                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1562                                                                          finding_id=finding_id, category=category,
1563                                                                          status=status, severity=severity,
1564                                                                          limit=limit))["data"]["_allFindingsMeta"]
1565    else:
1566        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1567                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1568                                                                           finding_id=finding_id, category=category,
1569                                                                           status=status, severity=severity,
1570                                                                           limit=limit), 'allFindings', limit=limit)

Gets all the Findings for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
  • finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
  • category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
  • status (str, optional): The status of Findings to return.
  • severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
  • count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
  • limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Finding Objects

def get_product_asset_versions(token, organization_context, product_id=None):
1573def get_product_asset_versions(token, organization_context, product_id=None):
1574    """
1575    Gets all the asset versions for a product.
1576    Args:
1577        token (str):
1578            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1579        organization_context (str):
1580            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1581        product_id (str, optional):
1582            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1583    Raises:
1584        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1585    Returns:
1586        list: List of AssetVersion Objects
1587    """
1588    if not product_id:
1589        raise Exception("Product ID is required")
1590
1591    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1592                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Gets all the asset versions for a product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of AssetVersion Objects

def get_products( token, organization_context, product_id=None, business_unit_id=None) -> list:
1595def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1596    """
1597    Gets all the products for the specified business unit.
1598    Args:
1599        token (str):
1600            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1601        organization_context (str):
1602            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1603        product_id (str, optional):
1604            Product ID to get. If not provided, will get all products in the organization.
1605        business_unit_id (str, optional):
1606            Business Unit ID to get products for. If not provided, will get all products in the organization.
1607    Raises:
1608        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1609    Returns:
1610        list: List of Product Objects
1611    """
1612
1613    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1614                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1615                                                                       business_unit_id=business_unit_id),
1616                                     'allProducts')

Gets all the products for the specified business unit.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
  • business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Product Objects

def generate_report_download_url( token, organization_context, asset_version_id=None, product_id=None, report_type=None, report_subtype=None, verbose=False) -> str:
1619def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1620                                 report_subtype=None, verbose=False) -> str:
1621    """
1622    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1623    This may take several minutes to complete, depending on the size of the report.
1624
1625    Args:
1626        token (str):
1627            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1628        organization_context (str):
1629            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1630        asset_version_id (str, optional):
1631            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1632        product_id (str, optional):
1633            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1634        report_type (str, required):
1635            The file type of the report to download. Valid values are "CSV" and "PDF".
1636        report_subtype (str, required):
1637            The type of report to download. Based on available reports for the `report_type` specified
1638            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1639            Valid values for PDF are "RISK_SUMMARY".
1640        verbose (bool, optional):
1641            If True, print additional information to the console. Defaults to False.
1642    """
1643    if not report_type:
1644        raise ValueError("Report Type is required")
1645    if not report_subtype:
1646        raise ValueError("Report Subtype is required")
1647    if not asset_version_id and not product_id:
1648        raise ValueError("Asset Version ID or Product ID is required")
1649
1650    if asset_version_id and product_id:
1651        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1652
1653    if report_type not in ["CSV", "PDF"]:
1654        raise Exception(f"Report Type {report_type} not supported")
1655
1656    if report_type == "CSV":
1657        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1658            raise Exception(f"Report Subtype {report_subtype} not supported")
1659
1660        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1661                                                            report_type=report_type, report_subtype=report_subtype)
1662        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1663                                                              report_type=report_type, report_subtype=report_subtype)
1664
1665        response_data = send_graphql_query(token, organization_context, mutation, variables)
1666        if verbose:
1667            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1668
1669        # get exportJobId from the result
1670        if asset_version_id:
1671            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1672        elif product_id:
1673            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1674        else:
1675            raise Exception(
1676                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1677
1678        if verbose:
1679            print(f'Export Job ID: {export_job_id}')
1680
1681    if report_type == "PDF":
1682        if report_subtype not in ["RISK_SUMMARY"]:
1683            raise Exception(f"Report Subtype {report_subtype} not supported")
1684
1685        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1686                                                            report_type=report_type, report_subtype=report_subtype)
1687        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1688                                                              report_type=report_type, report_subtype=report_subtype)
1689
1690        response_data = send_graphql_query(token, organization_context, mutation, variables)
1691        if verbose:
1692            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1693
1694        # get exportJobId from the result
1695        if asset_version_id:
1696            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1697        elif product_id:
1698            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1699        else:
1700            raise Exception(
1701                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1702
1703        if verbose:
1704            print(f'Export Job ID: {export_job_id}')
1705
1706    if not export_job_id:
1707        raise Exception(
1708            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1709
1710    # poll the API until the export job is complete
1711    sleep_time = 10
1712    total_time = 0
1713    if verbose:
1714        print(f'Polling every {sleep_time} seconds for export job to complete')
1715
1716    while True:
1717        time.sleep(sleep_time)
1718        total_time += sleep_time
1719        if verbose:
1720            print(f'Total time elapsed: {total_time} seconds')
1721
1722        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1723        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1724
1725        response_data = send_graphql_query(token, organization_context, query, variables)
1726
1727        if verbose:
1728            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1729
1730        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1731            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1732                if verbose:
1733                    print(
1734                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1735                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to download the report for. Either asset_version_id or product_id are required.
  • product_id (str, optional): Product ID to download the report for. Either asset_version_id or product_id are required.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
def generate_sbom_download_url( token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, verbose=False) -> str:
1738def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1739                               verbose=False) -> str:
1740    """
1741    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1742    This may take several minutes to complete, depending on the size of SBOM.
1743
1744    Args:
1745        token (str):
1746            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1747        organization_context (str):
1748            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1749        sbom_type (str, required):
1750            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1751        sbom_subtype (str, required):
1752            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1753        asset_version_id (str, required):
1754            Asset Version ID to download the SBOM for.
1755        verbose (bool, optional):
1756            If True, print additional information to the console. Defaults to False.
1757
1758    Raises:
1759        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1760        Exception: Raised if the query fails.
1761
1762    Returns:
1763        str: URL to download the SBOM from.
1764    """
1765
1766    if not sbom_type:
1767        raise ValueError("SBOM Type is required")
1768    if not sbom_subtype:
1769        raise ValueError("SBOM Subtype is required")
1770    if not asset_version_id:
1771        raise ValueError("Asset Version ID is required")
1772
1773    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1774        raise Exception(f"SBOM Type {sbom_type} not supported")
1775
1776    if sbom_type == "CYCLONEDX":
1777        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1778            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1779
1780        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1781        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1782
1783        response_data = send_graphql_query(token, organization_context, mutation, variables)
1784        if verbose:
1785            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1786
1787        # get exportJobId from the result
1788        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1789        if verbose:
1790            print(f'Export Job ID: {export_job_id}')
1791
1792    if sbom_type == "SPDX":
1793        if sbom_subtype not in ["SBOM_ONLY"]:
1794            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1795
1796        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1797        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1798
1799        response_data = send_graphql_query(token, organization_context, mutation, variables)
1800        if verbose:
1801            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1802
1803        # get exportJobId from the result
1804        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1805        if verbose:
1806            print(f'Export Job ID: {export_job_id}')
1807
1808    if not export_job_id:
1809        raise Exception(
1810            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1811
1812    # poll the API until the export job is complete
1813    sleep_time = 10
1814    total_time = 0
1815    if verbose:
1816        print(f'Polling every {sleep_time} seconds for export job to complete')
1817    while True:
1818        time.sleep(sleep_time)
1819        total_time += sleep_time
1820        if verbose:
1821            print(f'Total time elapsed: {total_time} seconds')
1822
1823        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1824        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1825
1826        response_data = send_graphql_query(token, organization_context, query, variables)
1827
1828        if verbose:
1829            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1830
1831        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1832            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1833                if verbose:
1834                    print(
1835                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1836                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
  • asset_version_id (str, required): Asset Version ID to download the SBOM for.
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
  • Exception: Raised if the query fails.
Returns:

str: URL to download the SBOM from.

def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1839def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1840    """
1841    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1842    Args:
1843        token (str):
1844            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1845        organization_context (str):
1846            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1847        asset_version_id (str, optional):
1848            Asset Version ID to get software components for.
1849        type (str, optional):
1850            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1851    Raises:
1852        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1853    Returns:
1854        list: List of Software Component Objects
1855    """
1856    if not asset_version_id:
1857        raise Exception("Asset Version ID is required")
1858
1859    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1860                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1861                                                                                  type=type),
1862                                     'allSoftwareComponentInstances')

Gets all the Software Components for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get software components for.
  • type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Software Component Objects

def search_sbom( token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False) -> list:
1865def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1866                case_sensitive=False) -> list:
1867    """
1868    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1869    Search Methods: EXACT or CONTAINS
1870    An exact match will return only the software component whose name matches the name exactly.
1871    A contains match will return all software components whose name contains the search string.
1872
1873    Args:
1874        token (str):
1875            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1876        organization_context (str):
1877            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1878        name (str, required):
1879            Name of the software component to search for.
1880        version (str, optional):
1881            Version of the software component to search for. If not specified, will search for all versions of the software component.
1882        asset_version_id (str, optional):
1883            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1884        search_method (str, optional):
1885            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1886        case_sensitive (bool, optional):
1887            Whether or not to perform a case sensitive search. Defaults to False.
1888    Raises:
1889        ValueError: Raised if name is not provided.
1890        Exception: Raised if the query fails.
1891    Returns:
1892        list: List of SoftwareComponentInstance Objects
1893    """
1894    if asset_version_id:
1895        query = """
1896query GetSoftwareComponentInstances_SDK(
1897    $filter: SoftwareComponentInstanceFilter
1898    $after: String
1899    $first: Int
1900) {
1901    allSoftwareComponentInstances(
1902        filter: $filter
1903        after: $after
1904        first: $first
1905    ) {
1906        _cursor
1907        id
1908        name
1909        version
1910        originalComponents {
1911            id
1912            name
1913            version
1914        }
1915    }
1916}
1917"""
1918    else:
1919        # gets the asset version info that contains the software component
1920        query = """
1921query GetSoftwareComponentInstances_SDK(
1922    $filter: SoftwareComponentInstanceFilter
1923    $after: String
1924    $first: Int
1925) {
1926    allSoftwareComponentInstances(
1927        filter: $filter
1928        after: $after
1929        first: $first
1930    ) {
1931        _cursor
1932        id
1933        name
1934        version
1935        assetVersion {
1936            id
1937            name
1938            asset {
1939                id
1940                name
1941            }
1942        }
1943    }
1944}
1945"""
1946
1947    variables = {
1948        "filter": {
1949            "mergedComponentRefId": None
1950        },
1951        "after": None,
1952        "first": 100
1953    }
1954
1955    if asset_version_id:
1956        variables["filter"]["assetVersionRefId"] = asset_version_id
1957
1958    if search_method == 'EXACT':
1959        if case_sensitive:
1960            variables["filter"]["name"] = name
1961        else:
1962            variables["filter"]["name_like"] = name
1963    elif search_method == 'CONTAINS':
1964        variables["filter"]["name_contains"] = name
1965
1966    if version:
1967        if search_method == 'EXACT':
1968            variables["filter"]["version"] = version
1969        elif search_method == 'CONTAINS':
1970            variables["filter"]["version_contains"] = version
1971
1972    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1973                                        field="allSoftwareComponentInstances")
1974
1975    return records

Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • name (str, required): Name of the software component to search for.
  • version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
  • asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
  • search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
  • case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
  • ValueError: Raised if name is not provided.
  • Exception: Raised if the query fails.
Returns:

list: List of SoftwareComponentInstance Objects

@retry(stop=stop_after_attempt(5), wait=wait_fixed(5), retry=retry_if_exception(is_not_breakout_exception))
def send_graphql_query(token, organization_context, query, variables=None):
1978@retry(stop=stop_after_attempt(5), wait=wait_fixed(5), retry=retry_if_exception(is_not_breakout_exception))
1979def send_graphql_query(token, organization_context, query, variables=None):
1980    """
1981    Send a GraphQL query to the API
1982
1983    Args:
1984        token (str):
1985            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1986        organization_context (str):
1987            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1988        query (str):
1989            The GraphQL query string
1990        variables (dict, optional):
1991            Variables to be used in the GraphQL query, by default None
1992
1993    Raises:
1994        Exception: If the response status code is not 200
1995
1996    Returns:
1997        dict: Response JSON
1998    """
1999    headers = {
2000        "Content-Type": "application/json",
2001        "Authorization": f"Bearer {token}",
2002        "Organization-Context": organization_context,
2003    }
2004    data = {"query": query, "variables": variables}
2005
2006    response = requests.post(API_URL, headers=headers, json=data)
2007    if response.status_code == 200:
2008        thejson = response.json()
2009
2010        if "errors" in thejson:
2011            # Raise a BreakoutException for GraphQL errors
2012            raise BreakoutException(f"Error: {thejson['errors']}")
2013
2014        return thejson
2015    else:
2016        is_mutation_operation = is_mutation(query)
2017        if is_mutation_operation:
2018            raise BreakoutException(f"Error: {response.status_code} - {response.text}")
2019        else:
2020            raise Exception(f"Error: {response.status_code} - {response.text}")

Send a GraphQL query to the API

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
  • Exception: If the response status code is not 200
Returns:

dict: Response JSON

def update_finding_statuses( token, organization_context, user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
2023def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2024                            justification=None, response=None, comment=None):
2025    """
2026    Updates the status of a findings or multiple findings. This is a blocking call.
2027
2028    Args:
2029        token (str):
2030            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2031        organization_context (str):
2032            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2033        user_id (str, required):
2034            User ID to update the finding status for.
2035        finding_ids (str, required):
2036            Finding ID to update the status for.
2037        status (str, required):
2038            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2039        justification (str, optional):
2040            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2041        response (str, optional):
2042            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2043        comment (str, optional):
2044            Optional comment to add to the finding status update.
2045
2046    Raises:
2047        ValueError: Raised if required parameters are not provided.
2048        Exception: Raised if the query fails.
2049
2050    Returns:
2051        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2052    """
2053    if not user_id:
2054        raise ValueError("User Id is required")
2055    if not finding_ids:
2056        raise ValueError("Finding Ids is required")
2057    if not status:
2058        raise ValueError("Status is required")
2059
2060    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2061    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2062                                                             justification=justification, response=response,
2063                                                             comment=comment)
2064
2065    return send_graphql_query(token, organization_context, mutation, variables)

Updates the status of a findings or multiple findings. This is a blocking call.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • user_id (str, required): User ID to update the finding status for.
  • finding_ids (str, required): Finding ID to update the status for.
  • status (str, required): Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
  • justification (str, optional): Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
  • response (str, optional): Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
  • comment (str, optional): Optional comment to add to the finding status update.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response

def upload_file_for_binary_analysis( token, organization_context, test_id=None, file_path=None, chunk_size=1048576000, quick_scan=False, enable_bandit_scan: bool = False):
2068def upload_file_for_binary_analysis(
2069    token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False
2070):
2071    """
2072    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk.
2073    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2074
2075    Args:
2076        token (str):
2077            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2078        organization_context (str):
2079            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2080        test_id (str, required):
2081            Test ID to upload the file for.
2082        file_path (str, required):
2083            Local path to the file to upload.
2084        chunk_size (int, optional):
2085            The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
2086        quick_scan (bool, optional):
2087            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2088        enable_bandit_scan (bool, optional):
2089            If True, will create an additional bandit scan in addition to the default binary analysis scan.
2090
2091    Raises:
2092        ValueError: Raised if test_id or file_path are not provided.
2093        Exception: Raised if the query fails.
2094
2095    Returns:
2096        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2097    """
2098    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2099    if not test_id:
2100        raise ValueError("Test Id is required")
2101    if not file_path:
2102        raise ValueError("File Path is required")
2103    if chunk_size < MIN_CHUNK_SIZE:
2104        raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes")
2105    if chunk_size >= MAX_CHUNK_SIZE:
2106        raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes")
2107
2108    # Start Multi-part Upload
2109    graphql_query = """
2110    mutation Start_SDK($testId: ID!) {
2111        startMultipartUploadV2(testId: $testId) {
2112            uploadId
2113            key
2114        }
2115    }
2116    """
2117
2118    variables = {
2119        "testId": test_id
2120    }
2121
2122    response = send_graphql_query(token, organization_context, graphql_query, variables)
2123
2124    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2125    upload_key = response['data']['startMultipartUploadV2']['key']
2126
2127    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2128    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2129    # and upload the file to the returned upload URL
2130    i = 0
2131    part_data = []
2132    for chunk in file_chunks(file_path, chunk_size):
2133        i = i + 1
2134        graphql_query = """
2135        mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2136            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2137                key
2138                uploadUrl
2139            }
2140        }
2141        """
2142
2143        variables = {
2144            "partNumber": i,
2145            "uploadId": upload_id,
2146            "uploadKey": upload_key
2147        }
2148
2149        response = send_graphql_query(token, organization_context, graphql_query, variables)
2150
2151        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2152
2153        # upload the chunk to the upload URL
2154        response = upload_bytes_to_url(chunk_upload_url, chunk)
2155
2156        part_data.append({
2157            "ETag": response.headers['ETag'],
2158            "PartNumber": i
2159        })
2160
2161    # call completeMultipartUploadV2
2162    graphql_query = """
2163    mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2164        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2165            key
2166        }
2167    }
2168    """
2169
2170    variables = {
2171        "partData": part_data,
2172        "uploadId": upload_id,
2173        "uploadKey": upload_key
2174    }
2175
2176    response = send_graphql_query(token, organization_context, graphql_query, variables)
2177
2178    # get key from the result
2179    key = response['data']['completeMultipartUploadV2']['key']
2180
2181    variables = {
2182        "key": key,
2183        "testId": test_id
2184    }
2185
2186    # call launchBinaryUploadProcessing
2187    if quick_scan or enable_bandit_scan:
2188        graphql_query = """
2189        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2190            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2191                key
2192                newBanditScanId
2193            }
2194        }
2195        """
2196    else:
2197        graphql_query = """
2198        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) {
2199            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2200                key
2201                newBanditScanId
2202            }
2203        }
2204        """
2205
2206    if quick_scan:
2207        variables["configurationOptions"] = ["QUICK_SCAN"]
2208
2209    if enable_bandit_scan:
2210        config_options = variables.get("configurationOptions", [])
2211        config_options.append("ENABLE_BANDIT_SCAN")
2212        variables["configurationOptions"] = config_options
2213
2214    response = send_graphql_query(token, organization_context, graphql_query, variables)
2215
2216    return response['data']

Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • test_id (str, required): Test ID to upload the file for.
  • file_path (str, required): Local path to the file to upload.
  • chunk_size (int, optional): The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
  • quick_scan (bool, optional): If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
  • enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
  • ValueError: Raised if test_id or file_path are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: The response from the GraphQL query, a completeMultipartUpload Object.

def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2219def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2220    """
2221    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2222
2223    Args:
2224        token (str):
2225            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2226        organization_context (str):
2227            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2228        test_id (str, required):
2229            Test ID to upload the file for.
2230        file_path (str, required):
2231            Local path to the file to upload.
2232
2233    Raises:
2234        ValueError: Raised if test_id or file_path are not provided.
2235        Exception: Raised if the query fails.
2236
2237    Returns:
2238        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2239    """
2240    if not test_id:
2241        raise ValueError("Test Id is required")
2242    if not file_path:
2243        raise ValueError("File Path is required")
2244
2245    # Gerneate Test Result Upload URL
2246    graphql_query = """
2247    mutation GenerateTestResultUploadUrl_SDK($testId: ID!) {
2248        generateSinglePartUploadUrl(testId: $testId) {
2249            uploadUrl
2250            key
2251        }
2252    }
2253    """
2254
2255    variables = {
2256        "testId": test_id
2257    }
2258
2259    response = send_graphql_query(token, organization_context, graphql_query, variables)
2260
2261    # get the upload URL and key
2262    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2263    key = response['data']['generateSinglePartUploadUrl']['key']
2264
2265    # upload the file
2266    upload_file_to_url(upload_url, file_path)
2267
2268    # complete the upload
2269    graphql_query = """
2270    mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) {
2271        launchTestResultProcessing(key: $key, testId: $testId) {
2272            key
2273        }
2274    }
2275    """
2276
2277    variables = {
2278        "testId": test_id,
2279        "key": key
2280    }
2281
2282    response = send_graphql_query(token, organization_context, graphql_query, variables)
2283    return response['data']

Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • test_id (str, required): Test ID to upload the file for.
  • file_path (str, required): Local path to the file to upload.
Raises:
  • ValueError: Raised if test_id or file_path are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: The response from the GraphQL query, a completeTestResultUpload Object.

def upload_bytes_to_url(url, bytes):
2286def upload_bytes_to_url(url, bytes):
2287    """
2288    Used for uploading a file to a pre-signed S3 URL
2289
2290    Args:
2291        url (str):
2292            (Pre-signed S3) URL
2293        bytes (bytes):
2294            Bytes to upload
2295
2296    Raises:
2297        Exception: If the response status code is not 200
2298
2299    Returns:
2300        requests.Response: Response object
2301    """
2302    response = requests.put(url, data=bytes)
2303
2304    if response.status_code == 200:
2305        return response
2306    else:
2307        raise Exception(f"Error: {response.status_code} - {response.text}")

Used for uploading a file to a pre-signed S3 URL

Arguments:
  • url (str): (Pre-signed S3) URL
  • bytes (bytes): Bytes to upload
Raises:
  • Exception: If the response status code is not 200
Returns:

requests.Response: Response object

def upload_file_to_url(url, file_path):
2310def upload_file_to_url(url, file_path):
2311    """
2312    Used for uploading a file to a pre-signed S3 URL
2313
2314    Args:
2315        url (str):
2316            (Pre-signed S3) URL
2317        file_path (str):
2318            Local path to file to upload
2319
2320    Raises:
2321        Exception: If the response status code is not 200
2322
2323    Returns:
2324        requests.Response: Response object
2325    """
2326    with open(file_path, 'rb') as file:
2327        response = requests.put(url, data=file)
2328
2329    if response.status_code == 200:
2330        return response
2331    else:
2332        raise Exception(f"Error: {response.status_code} - {response.text}")

Used for uploading a file to a pre-signed S3 URL

Arguments:
  • url (str): (Pre-signed S3) URL
  • file_path (str): Local path to file to upload
Raises:
  • Exception: If the response status code is not 200
Returns:

requests.Response: Response object