finite_state_sdk

   1import json
   2from enum import Enum
   3
   4import requests
   5import time
   6from warnings import warn
   7import finite_state_sdk.queries as queries
   8
   9API_URL = 'https://platform.finitestate.io/api/v1/graphql'
  10AUDIENCE = "https://platform.finitestate.io/api/v1/graphql"
  11TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token"
  12
  13"""
  14DEFAULT CHUNK SIZE: 1000 MiB
  15"""
  16DEFAULT_CHUNK_SIZE = 1024**2 * 1000
  17"""
  18MAX CHUNK SIZE: 2 GiB
  19"""
  20MAX_CHUNK_SIZE = 1024**2 * 2000
  21"""
  22MIN CHUNK SIZE: 5 MiB
  23"""
  24MIN_CHUNK_SIZE = 1024**2 * 5
  25
  26
  27class UploadMethod(Enum):
  28    """
  29    Enumeration class representing different upload methods.
  30
  31    Attributes:
  32        WEB_APP_UI: Upload method via web application UI.
  33        API: Upload method via API.
  34        GITHUB_INTEGRATION: Upload method via GitHub integration.
  35        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
  36
  37    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
  38    """
  39    WEB_APP_UI = "WEB_APP_UI"
  40    API = "API"
  41    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
  42    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
  43
  44
  45def create_artifact(
  46    token,
  47    organization_context,
  48    business_unit_id=None,
  49    created_by_user_id=None,
  50    asset_version_id=None,
  51    artifact_name=None,
  52    product_id=None,
  53):
  54    """
  55    Create a new Artifact.
  56    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
  57    Please see the examples in the Github repository for more information:
  58    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
  59    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
  60
  61    Args:
  62        token (str):
  63            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  64        organization_context (str):
  65            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  66        business_unit_id (str, required):
  67            Business Unit ID to associate the artifact with.
  68        created_by_user_id (str, required):
  69            User ID of the user creating the artifact.
  70        asset_version_id (str, required):
  71            Asset Version ID to associate the artifact with.
  72        artifact_name (str, required):
  73            The name of the Artifact being created.
  74        product_id (str, optional):
  75            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
  76
  77    Raises:
  78        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  79        Exception: Raised if the query fails.
  80
  81    Returns:
  82        dict: createArtifact Object
  83    """
  84    if not business_unit_id:
  85        raise ValueError("Business unit ID is required")
  86    if not created_by_user_id:
  87        raise ValueError("Created by user ID is required")
  88    if not asset_version_id:
  89        raise ValueError("Asset version ID is required")
  90    if not artifact_name:
  91        raise ValueError("Artifact name is required")
  92
  93    graphql_query = """
  94    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
  95        createArtifact(input: $input) {
  96            id
  97            name
  98            assetVersion {
  99                id
 100                name
 101                asset {
 102                    id
 103                    name
 104                }
 105            }
 106            createdBy {
 107                id
 108                email
 109            }
 110            ctx {
 111                asset
 112                products
 113                businessUnits
 114            }
 115        }
 116    }
 117    """
 118
 119    # Asset name, business unit context, and creating user are required
 120    variables = {
 121        "input": {
 122            "name": artifact_name,
 123            "createdBy": created_by_user_id,
 124            "assetVersion": asset_version_id,
 125            "ctx": {
 126                "asset": asset_version_id,
 127                "businessUnits": [business_unit_id]
 128            }
 129        }
 130    }
 131
 132    if product_id is not None:
 133        variables["input"]["ctx"]["products"] = product_id
 134
 135    response = send_graphql_query(token, organization_context, graphql_query, variables)
 136    return response['data']
 137
 138
 139def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
 140    """
 141    Create a new Asset.
 142
 143    Args:
 144        token (str):
 145            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 146        organization_context (str):
 147            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 148        business_unit_id (str, required):
 149            Business Unit ID to associate the asset with.
 150        created_by_user_id (str, required):
 151            User ID of the user creating the asset.
 152        asset_name (str, required):
 153            The name of the Asset being created.
 154        product_id (str, optional):
 155            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
 156
 157    Raises:
 158        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
 159        Exception: Raised if the query fails.
 160
 161    Returns:
 162        dict: createAsset Object
 163    """
 164    if not business_unit_id:
 165        raise ValueError("Business unit ID is required")
 166    if not created_by_user_id:
 167        raise ValueError("Created by user ID is required")
 168    if not asset_name:
 169        raise ValueError("Asset name is required")
 170
 171    graphql_query = """
 172    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
 173        createAsset(input: $input) {
 174            id
 175            name
 176            dependentProducts {
 177                id
 178                name
 179            }
 180            group {
 181                id
 182                name
 183            }
 184            createdBy {
 185                id
 186                email
 187            }
 188            ctx {
 189                asset
 190                products
 191                businessUnits
 192            }
 193        }
 194    }
 195    """
 196
 197    # Asset name, business unit context, and creating user are required
 198    variables = {
 199        "input": {
 200            "name": asset_name,
 201            "group": business_unit_id,
 202            "createdBy": created_by_user_id,
 203            "ctx": {
 204                "businessUnits": [business_unit_id]
 205            }
 206        }
 207    }
 208
 209    if product_id is not None:
 210        variables["input"]["ctx"]["products"] = product_id
 211
 212    response = send_graphql_query(token, organization_context, graphql_query, variables)
 213    return response['data']
 214
 215
 216def create_asset_version(
 217    token,
 218    organization_context,
 219    business_unit_id=None,
 220    created_by_user_id=None,
 221    asset_id=None,
 222    asset_version_name=None,
 223    product_id=None,
 224):
 225    """
 226    Create a new Asset Version.
 227
 228    Args:
 229        token (str):
 230            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 231        organization_context (str):
 232            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 233        business_unit_id (str, required):
 234            Business Unit ID to associate the asset version with.
 235        created_by_user_id (str, required):
 236            User ID of the user creating the asset version.
 237        asset_id (str, required):
 238            Asset ID to associate the asset version with.
 239        asset_version_name (str, required):
 240            The name of the Asset Version being created.
 241        product_id (str, optional):
 242            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
 243
 244    Raises:
 245        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 246        Exception: Raised if the query fails.
 247
 248    Returns:
 249        dict: createAssetVersion Object
 250
 251    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
 252    """
 253    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
 254    if not business_unit_id:
 255        raise ValueError("Business unit ID is required")
 256    if not created_by_user_id:
 257        raise ValueError("Created by user ID is required")
 258    if not asset_id:
 259        raise ValueError("Asset ID is required")
 260    if not asset_version_name:
 261        raise ValueError("Asset version name is required")
 262
 263    graphql_query = """
 264    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
 265        createAssetVersion(input: $input) {
 266            id
 267            name
 268            asset {
 269                id
 270                name
 271            }
 272            createdBy {
 273                id
 274                email
 275            }
 276            ctx {
 277                asset
 278                products
 279                businessUnits
 280            }
 281        }
 282    }
 283    """
 284
 285    # Asset name, business unit context, and creating user are required
 286    variables = {
 287        "input": {
 288            "name": asset_version_name,
 289            "createdBy": created_by_user_id,
 290            "asset": asset_id,
 291            "ctx": {
 292                "asset": asset_id,
 293                "businessUnits": [business_unit_id]
 294            }
 295        }
 296    }
 297
 298    if product_id is not None:
 299        variables["input"]["ctx"]["products"] = product_id
 300
 301    response = send_graphql_query(token, organization_context, graphql_query, variables)
 302    return response['data']
 303
 304
 305def create_asset_version_on_asset(
 306    token,
 307    organization_context,
 308    created_by_user_id=None,
 309    asset_id=None,
 310    asset_version_name=None,
 311    product_id=None,
 312):
 313    """
 314    Create a new Asset Version.
 315
 316    Args:
 317        token (str):
 318            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 319        organization_context (str):
 320            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 321        created_by_user_id (str, optional):
 322            User ID of the user creating the asset version.
 323        asset_id (str, required):
 324            Asset ID to associate the asset version with.
 325        asset_version_name (str, required):
 326            The name of the Asset Version being created.
 327
 328    Raises:
 329        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 330        Exception: Raised if the query fails.
 331
 332    Returns:
 333        dict: createAssetVersion Object
 334    """
 335    if not asset_id:
 336        raise ValueError("Asset ID is required")
 337    if not asset_version_name:
 338        raise ValueError("Asset version name is required")
 339
 340    graphql_query = """
 341        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) {
 342            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) {
 343                id
 344                assetVersion {
 345                    id
 346                }
 347            }
 348        }
 349    """
 350
 351    # Asset name, business unit context, and creating user are required
 352    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
 353
 354    if created_by_user_id:
 355        variables["createdByUserId"] = created_by_user_id
 356
 357    if product_id:
 358        variables["productId"] = product_id
 359
 360    response = send_graphql_query(token, organization_context, graphql_query, variables)
 361    return response['data']
 362
 363
 364def create_new_asset_version_artifact_and_test_for_upload(
 365    token,
 366    organization_context,
 367    business_unit_id=None,
 368    created_by_user_id=None,
 369    asset_id=None,
 370    version=None,
 371    product_id=None,
 372    test_type=None,
 373    artifact_description=None,
 374    upload_method: UploadMethod = UploadMethod.API,
 375):
 376    """
 377    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
 378    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
 379
 380    Args:
 381        token (str):
 382            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 383        organization_context (str):
 384            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 385        business_unit_id (str, optional):
 386            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
 387        created_by_user_id (str, optional):
 388            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
 389        asset_id (str, required):
 390            Asset ID to create the asset version for. If not provided, the default asset will be used.
 391        version (str, required):
 392            Version to create the asset version for.
 393        product_id (str, optional):
 394            Product ID to create the entities for. If not provided, the default product will be used.
 395        test_type (str, required):
 396            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
 397        artifact_description (str, optional):
 398            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
 399        upload_method (UploadMethod, optional):
 400            The method of uploading the test results. Default is UploadMethod.API.
 401
 402
 403    Raises:
 404        ValueError: Raised if asset_id or version are not provided.
 405        Exception: Raised if the query fails.
 406
 407    Returns:
 408        str: The Test ID of the newly created test that is used for uploading the file.
 409    """
 410    if not asset_id:
 411        raise ValueError("Asset ID is required")
 412    if not version:
 413        raise ValueError("Version is required")
 414
 415    assets = get_all_assets(token, organization_context, asset_id=asset_id)
 416    if not assets:
 417        raise ValueError("No assets found with the provided asset ID")
 418    asset = assets[0]
 419
 420    # get the asset name
 421    asset_name = asset['name']
 422
 423    # get the existing asset product IDs
 424    asset_product_ids = asset['ctx']['products']
 425
 426    # get the asset product ID
 427    if product_id and product_id not in asset_product_ids:
 428        asset_product_ids.append(product_id)
 429
 430    # if business_unit_id or created_by_user_id are not provided, get the existing asset
 431    if not business_unit_id or not created_by_user_id:
 432        if not business_unit_id:
 433            business_unit_id = asset['group']['id']
 434        if not created_by_user_id:
 435            created_by_user_id = asset['createdBy']['id']
 436
 437        if not business_unit_id:
 438            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
 439        if not created_by_user_id:
 440            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
 441
 442    # create the asset version
 443    response = create_asset_version_on_asset(
 444        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id
 445    )
 446    # get the asset version ID
 447    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
 448
 449    # create the test
 450    if test_type == "finite_state_binary_analysis":
 451        # create the artifact
 452        if not artifact_description:
 453            artifact_description = "Binary"
 454        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
 455        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 456                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 457                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
 458
 459        # get the artifact ID
 460        binary_artifact_id = response['createArtifact']['id']
 461
 462        # create the test
 463        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
 464        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
 465                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
 466                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
 467                                                  test_name=test_name, upload_method=upload_method)
 468        test_id = response['createTest']['id']
 469        return test_id
 470
 471    else:
 472        # create the artifact
 473        if not artifact_description:
 474            artifact_description = "Unspecified Artifact"
 475        artifact_name = f"{asset_name} {version} - {artifact_description}"
 476        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 477                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 478                                   artifact_name=artifact_name, product_id=asset_product_ids)
 479
 480        # get the artifact ID
 481        binary_artifact_id = response['createArtifact']['id']
 482
 483        # create the test
 484        test_name = f"{asset_name} {version} - {test_type}"
 485        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
 486                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
 487                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
 488                                                      test_name=test_name, test_type=test_type,
 489                                                      upload_method=upload_method)
 490        test_id = response['createTest']['id']
 491        return test_id
 492
 493
 494def create_new_asset_version_and_upload_binary(
 495    token,
 496    organization_context,
 497    business_unit_id=None,
 498    created_by_user_id=None,
 499    asset_id=None,
 500    version=None,
 501    file_path=None,
 502    product_id=None,
 503    artifact_description=None,
 504    quick_scan=False,
 505    upload_method: UploadMethod = UploadMethod.API,
 506    enable_bandit_scan: bool = False,
 507):
 508    """
 509    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
 510    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 511
 512    Args:
 513        token (str):
 514            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 515        organization_context (str):
 516            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 517        business_unit_id (str, optional):
 518            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 519        created_by_user_id (str, optional):
 520            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 521        asset_id (str, required):
 522            Asset ID to create the asset version for.
 523        version (str, required):
 524            Version to create the asset version for.
 525        file_path (str, required):
 526            Local path to the file to upload.
 527        product_id (str, optional):
 528            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
 529        artifact_description (str, optional):
 530            Description of the artifact. If not provided, the default is "Firmware Binary".
 531        quick_scan (bool, optional):
 532            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
 533        upload_method (UploadMethod, optional):
 534            The method of uploading the test results. Default is UploadMethod.API.
 535        enable_bandit_scan (bool, optional):
 536            If True, will create an additional bandit scan in addition to the default binary analysis scan.
 537
 538    Raises:
 539        ValueError: Raised if asset_id, version, or file_path are not provided.
 540        Exception: Raised if any of the queries fail.
 541
 542    Returns:
 543        dict: The response from the GraphQL query, a createAssetVersion Object.
 544    """
 545    if not asset_id:
 546        raise ValueError("Asset ID is required")
 547    if not version:
 548        raise ValueError("Version is required")
 549    if not file_path:
 550        raise ValueError("File path is required")
 551
 552    # create the asset version and binary test
 553    if not artifact_description:
 554        artifact_description = "Firmware Binary"
 555    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
 556        token,
 557        organization_context,
 558        business_unit_id=business_unit_id,
 559        created_by_user_id=created_by_user_id,
 560        asset_id=asset_id,
 561        version=version,
 562        product_id=product_id,
 563        test_type="finite_state_binary_analysis",
 564        artifact_description=artifact_description,
 565        upload_method=upload_method,
 566    )
 567
 568    # upload file for binary test
 569    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
 570                                               quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan)
 571    return response
 572
 573
 574def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
 575                                                     created_by_user_id=None, asset_id=None, version=None,
 576                                                     file_path=None, product_id=None, test_type=None,
 577                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
 578    """
 579    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
 580    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 581
 582    Args:
 583        token (str):
 584            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 585        organization_context (str):
 586            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 587        business_unit_id (str, optional):
 588            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 589        created_by_user_id (str, optional):
 590            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 591        asset_id (str, required):
 592            Asset ID to create the asset version for.
 593        version (str, required):
 594            Version to create the asset version for.
 595        file_path (str, required):
 596            Path to the test results file to upload.
 597        product_id (str, optional):
 598            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
 599        test_type (str, required):
 600            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
 601        artifact_description (str, optional):
 602            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
 603        upload_method (UploadMethod, optional):
 604            The method of uploading the test results. Default is UploadMethod.API.
 605
 606    Raises:
 607        ValueError: If the asset_id, version, or file_path are not provided.
 608        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
 609
 610    Returns:
 611        dict: The response from the GraphQL query, a createAssetVersion Object.
 612    """
 613    if not asset_id:
 614        raise ValueError("Asset ID is required")
 615    if not version:
 616        raise ValueError("Version is required")
 617    if not file_path:
 618        raise ValueError("File path is required")
 619    if not test_type:
 620        raise ValueError("Test type is required")
 621
 622    # create the asset version and test
 623    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
 624                                                                    business_unit_id=business_unit_id,
 625                                                                    created_by_user_id=created_by_user_id,
 626                                                                    asset_id=asset_id, version=version,
 627                                                                    product_id=product_id, test_type=test_type,
 628                                                                    artifact_description=artifact_description,
 629                                                                    upload_method=upload_method)
 630
 631    # upload test results file
 632    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
 633    return response
 634
 635
 636def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
 637                   product_description=None, vendor_id=None, vendor_name=None):
 638    """
 639    Create a new Product.
 640
 641    Args:
 642        token (str):
 643            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 644        organization_context (str):
 645            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 646        business_unit_id (str, required):
 647            Business Unit ID to associate the product with.
 648        created_by_user_id (str, required):
 649            User ID of the user creating the product.
 650        product_name (str, required):
 651            The name of the Product being created.
 652        product_description (str, optional):
 653            The description of the Product being created.
 654        vendor_id (str, optional):
 655            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
 656        vendor_name (str, optional):
 657            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
 658
 659    Raises:
 660        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
 661        Exception: Raised if the query fails.
 662
 663    Returns:
 664        dict: createProduct Object
 665    """
 666
 667    if not business_unit_id:
 668        raise ValueError("Business unit ID is required")
 669    if not created_by_user_id:
 670        raise ValueError("Created by user ID is required")
 671    if not product_name:
 672        raise ValueError("Product name is required")
 673
 674    graphql_query = """
 675    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
 676        createProduct(input: $input) {
 677            id
 678            name
 679            vendor {
 680                name
 681            }
 682            group {
 683                id
 684                name
 685            }
 686            createdBy {
 687                id
 688                email
 689            }
 690            ctx {
 691                businessUnit
 692            }
 693        }
 694    }
 695    """
 696
 697    # Product name, business unit context, and creating user are required
 698    variables = {
 699        "input": {
 700            "name": product_name,
 701            "group": business_unit_id,
 702            "createdBy": created_by_user_id,
 703            "ctx": {
 704                "businessUnit": business_unit_id
 705            }
 706        }
 707    }
 708
 709    if product_description is not None:
 710        variables["input"]["description"] = product_description
 711
 712    # If the vendor ID is specified, this will link the new product to the existing vendor
 713    if vendor_id is not None:
 714        variables["input"]["vendor"] = {
 715            "id": vendor_id
 716        }
 717
 718    # If the vendor name is specified, this will create a new vendor and link it to the new product
 719    if vendor_name is not None:
 720        variables["input"]["createVendor"] = {
 721            "name": vendor_name
 722        }
 723
 724    response = send_graphql_query(token, organization_context, graphql_query, variables)
 725
 726    return response['data']
 727
 728
 729def create_test(
 730    token,
 731    organization_context,
 732    business_unit_id=None,
 733    created_by_user_id=None,
 734    asset_id=None,
 735    artifact_id=None,
 736    test_name=None,
 737    product_id=None,
 738    test_type=None,
 739    tools=[],
 740    upload_method: UploadMethod = UploadMethod.API,
 741):
 742    """
 743    Create a new Test object for uploading files.
 744    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 745    Please see the examples in the Github repository for more information:
 746    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 747    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 748
 749    Args:
 750        token (str):
 751            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 752        organization_context (str):
 753            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 754        business_unit_id (str, required):
 755            Business Unit ID to associate the Test with.
 756        created_by_user_id (str, required):
 757            User ID of the user creating the Test.
 758        asset_id (str, required):
 759            Asset ID to associate the Test with.
 760        artifact_id (str, required):
 761            Artifact ID to associate the Test with.
 762        test_name (str, required):
 763            The name of the Test being created.
 764        product_id (str, optional):
 765            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 766        test_type (str, required):
 767            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
 768        tools (list, optional):
 769            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
 770        upload_method (UploadMethod, required):
 771            The method of uploading the test results.
 772
 773    Raises:
 774        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
 775        Exception: Raised if the query fails.
 776
 777    Returns:
 778        dict: createTest Object
 779    """
 780    if not business_unit_id:
 781        raise ValueError("Business unit ID is required")
 782    if not created_by_user_id:
 783        raise ValueError("Created by user ID is required")
 784    if not asset_id:
 785        raise ValueError("Asset ID is required")
 786    if not artifact_id:
 787        raise ValueError("Artifact ID is required")
 788    if not test_name:
 789        raise ValueError("Test name is required")
 790    if not test_type:
 791        raise ValueError("Test type is required")
 792
 793    graphql_query = """
 794    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
 795        createTest(input: $input) {
 796            id
 797            name
 798            artifactUnderTest {
 799                id
 800                name
 801                assetVersion {
 802                    id
 803                    name
 804                    asset {
 805                        id
 806                        name
 807                        dependentProducts {
 808                            id
 809                            name
 810                        }
 811                    }
 812                }
 813            }
 814            createdBy {
 815                id
 816                email
 817            }
 818            ctx {
 819                asset
 820                products
 821                businessUnits
 822            }
 823            uploadMethod
 824        }
 825    }
 826    """
 827
 828    # Asset name, business unit context, and creating user are required
 829    variables = {
 830        "input": {
 831            "name": test_name,
 832            "createdBy": created_by_user_id,
 833            "artifactUnderTest": artifact_id,
 834            "testResultFileFormat": test_type,
 835            "ctx": {
 836                "asset": asset_id,
 837                "businessUnits": [business_unit_id]
 838            },
 839            "tools": tools,
 840            "uploadMethod": upload_method.value
 841        }
 842    }
 843
 844    if product_id is not None:
 845        variables["input"]["ctx"]["products"] = product_id
 846
 847    response = send_graphql_query(token, organization_context, graphql_query, variables)
 848    return response['data']
 849
 850
 851def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
 852                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
 853                                   upload_method: UploadMethod = UploadMethod.API):
 854    """
 855    Create a new Test object for uploading files for Finite State Binary Analysis.
 856
 857    Args:
 858        token (str):
 859            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 860        organization_context (str):
 861            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 862        business_unit_id (str, required):
 863            Business Unit ID to associate the Test with.
 864        created_by_user_id (str, required):
 865            User ID of the user creating the Test.
 866        asset_id (str, required):
 867            Asset ID to associate the Test with.
 868        artifact_id (str, required):
 869            Artifact ID to associate the Test with.
 870        test_name (str, required):
 871            The name of the Test being created.
 872        product_id (str, optional):
 873            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 874        upload_method (UploadMethod, optional):
 875            The method of uploading the test results. Default is UploadMethod.API.
 876
 877    Raises:
 878        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 879        Exception: Raised if the query fails.
 880
 881    Returns:
 882        dict: createTest Object
 883    """
 884    tools = [
 885        {
 886            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
 887            "name": "Finite State Binary Analysis"
 888        }
 889    ]
 890    return create_test(token, organization_context, business_unit_id=business_unit_id,
 891                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 892                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
 893                       tools=tools, upload_method=upload_method)
 894
 895
 896def create_test_as_cyclone_dx(
 897    token,
 898    organization_context,
 899    business_unit_id=None,
 900    created_by_user_id=None,
 901    asset_id=None,
 902    artifact_id=None,
 903    test_name=None,
 904    product_id=None,
 905    upload_method: UploadMethod = UploadMethod.API,
 906):
 907    """
 908    Create a new Test object for uploading CycloneDX files.
 909
 910    Args:
 911        token (str):
 912            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 913        organization_context (str):
 914            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 915        business_unit_id (str, required):
 916            Business Unit ID to associate the Test with.
 917        created_by_user_id (str, required):
 918            User ID of the user creating the Test.
 919        asset_id (str, required):
 920            Asset ID to associate the Test with.
 921        artifact_id (str, required):
 922            Artifact ID to associate the Test with.
 923        test_name (str, required):
 924            The name of the Test being created.
 925        product_id (str, optional):
 926            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 927        upload_method (UploadMethod, optional):
 928            The method of uploading the test results. Default is UploadMethod.API.
 929
 930    Raises:
 931        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 932        Exception: Raised if the query fails.
 933
 934    Returns:
 935        dict: createTest Object
 936    """
 937    return create_test(token, organization_context, business_unit_id=business_unit_id,
 938                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 939                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
 940
 941
 942def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
 943                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
 944                                       upload_method: UploadMethod = UploadMethod.API):
 945    """
 946    Create a new Test object for uploading Third Party Scanner files.
 947
 948    Args:
 949        token (str):
 950            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 951        organization_context (str):
 952            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 953        business_unit_id (str, required):
 954            Business Unit ID to associate the Test with.
 955        created_by_user_id (str, required):
 956            User ID of the user creating the Test.
 957        asset_id (str, required):
 958            Asset ID to associate the Test with.
 959        artifact_id (str, required):
 960            Artifact ID to associate the Test with.
 961        test_name (str, required):
 962            The name of the Test being created.
 963        product_id (str, optional):
 964            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 965        test_type (str, required):
 966            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
 967        upload_method (UploadMethod, optional):
 968            The method of uploading the test results. Default is UploadMethod.API.
 969
 970    Raises:
 971        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 972        Exception: Raised if the query fails.
 973
 974    Returns:
 975        dict: createTest Object
 976    """
 977    return create_test(token, organization_context, business_unit_id=business_unit_id,
 978                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 979                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
 980
 981
 982def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 983                                  report_subtype=None, output_filename=None, verbose=False):
 984    """
 985    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 986
 987    Args:
 988        token (str):
 989            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 990        organization_context (str):
 991            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 992        asset_version_id (str, required):
 993            The Asset Version ID to download the report for.
 994        report_type (str, required):
 995            The file type of the report to download. Valid values are "CSV" and "PDF".
 996        report_subtype (str, required):
 997            The type of report to download. Based on available reports for the `report_type` specified
 998            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
 999            Valid values for PDF are "RISK_SUMMARY".
1000        output_filename (str, optional):
1001            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1002        verbose (bool, optional):
1003            If True, will print additional information to the console. Defaults to False.
1004
1005    Raises:
1006        ValueError: Raised if required parameters are not provided.
1007        Exception: Raised if the query fails.
1008
1009    Returns:
1010        None
1011    """
1012    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1013                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1014
1015    # Send an HTTP GET request to the URL
1016    response = requests.get(url)
1017
1018    # Check if the request was successful (status code 200)
1019    if response.status_code == 200:
1020        # Open a local file in binary write mode and write the content to it
1021        if verbose:
1022            print("File downloaded successfully.")
1023        with open(output_filename, 'wb') as file:
1024            file.write(response.content)
1025            if verbose:
1026                print(f'Wrote file to {output_filename}')
1027    else:
1028        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1029
1030
1031def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1032                            output_filename=None, verbose=False):
1033    """
1034    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1035
1036    Args:
1037        token (str):
1038            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1039        organization_context (str):
1040            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1041        product_id (str, required):
1042            The Product ID to download the report for.
1043        report_type (str, required):
1044            The file type of the report to download. Valid values are "CSV".
1045        report_subtype (str, required):
1046            The type of report to download. Based on available reports for the `report_type` specified
1047            Valid values for CSV are "ALL_FINDINGS".
1048        output_filename (str, optional):
1049            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1050        verbose (bool, optional):
1051            If True, will print additional information to the console. Defaults to False.
1052    """
1053    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1054                                       report_subtype=report_subtype, verbose=verbose)
1055
1056    # Send an HTTP GET request to the URL
1057    response = requests.get(url)
1058
1059    # Check if the request was successful (status code 200)
1060    if response.status_code == 200:
1061        # Open a local file in binary write mode and write the content to it
1062        if verbose:
1063            print("File downloaded successfully.")
1064        with open(output_filename, 'wb') as file:
1065            file.write(response.content)
1066            if verbose:
1067                print(f'Wrote file to {output_filename}')
1068    else:
1069        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1070
1071
1072def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1073                  output_filename="sbom.json", verbose=False):
1074    """
1075    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1076
1077    Args:
1078        token (str):
1079            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1080        organization_context (str):
1081            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1082        sbom_type (str, required):
1083            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1084        sbom_subtype (str, required):
1085            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1086        asset_version_id (str, required):
1087            The Asset Version ID to download the SBOM for.
1088        output_filename (str, required):
1089            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1090        verbose (bool, optional):
1091            If True, will print additional information to the console. Defaults to False.
1092
1093    Raises:
1094        ValueError: Raised if required parameters are not provided.
1095        Exception: Raised if the query fails.
1096
1097    Returns:
1098        None
1099    """
1100    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1101                                     asset_version_id=asset_version_id, verbose=verbose)
1102
1103    # Send an HTTP GET request to the URL
1104    response = requests.get(url)
1105
1106    # Check if the request was successful (status code 200)
1107    if response.status_code == 200:
1108        # Open a local file in binary write mode and write the content to it
1109        if verbose:
1110            print("File downloaded successfully.")
1111        with open(output_filename, 'wb') as file:
1112            file.write(response.content)
1113            if verbose:
1114                print(f'Wrote file to {output_filename}')
1115    else:
1116        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1117
1118
1119def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1120    """
1121    Helper method to read a file in chunks.
1122
1123    Args:
1124        file_path (str):
1125            Local path to the file to read.
1126        chunk_size (int, optional):
1127            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1128
1129    Yields:
1130        bytes: The next chunk of the file.
1131
1132    Raises:
1133        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1134    """
1135    with open(file_path, 'rb') as f:
1136        while True:
1137            chunk = f.read(chunk_size)
1138            if chunk:
1139                yield chunk
1140            else:
1141                break
1142
1143
1144def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1145    """
1146    Get all artifacts in the organization. Uses pagination to get all results.
1147
1148    Args:
1149        token (str):
1150            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1151        organization_context (str):
1152            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1153        artifact_id (str, optional):
1154            An optional Artifact ID if this is used to get a single artifact, by default None
1155        business_unit_id (str, optional):
1156            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1157
1158    Raises:
1159        Exception: Raised if the query fails.
1160
1161    Returns:
1162        list: List of Artifact Objects
1163    """
1164    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1165                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
1166
1167
1168def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1169    """
1170    Gets all assets in the organization. Uses pagination to get all results.
1171
1172    Args:
1173        token (str):
1174            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1175        organization_context (str):
1176            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1177        asset_id (str, optional):
1178            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1179        business_unit_id (str, optional):
1180            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1181
1182    Raises:
1183        Exception: Raised if the query fails.
1184
1185    Returns:
1186        list: List of Asset Objects
1187    """
1188    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1189                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1190
1191
1192def get_all_asset_versions(token, organization_context):
1193    """
1194    Get all asset versions in the organization. Uses pagination to get all results.
1195
1196    Args:
1197        token (str):
1198            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1199        organization_context (str):
1200            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1201
1202    Raises:
1203        Exception: Raised if the query fails.
1204
1205    Returns:
1206        list: List of AssetVersion Objects
1207    """
1208    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1209                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
1210
1211
1212def get_all_asset_versions_for_product(token, organization_context, product_id):
1213    """
1214    Get all asset versions for a product. Uses pagination to get all results.
1215
1216    Args:
1217        token (str):
1218            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1219        organization_context (str):
1220            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1221        product_id (str):
1222            The Product ID to get asset versions for
1223
1224    Returns:
1225        list: List of AssetVersion Objects
1226    """
1227    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1228                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1229
1230
1231def get_all_business_units(token, organization_context):
1232    """
1233    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1234
1235    Args:
1236        token (str):
1237            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1238        organization_context (str):
1239            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1240
1241    Raises:
1242        Exception: Raised if the query fails.
1243
1244    Returns:
1245        list: List of Group Objects
1246    """
1247    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1248                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
1249
1250
1251def get_all_organizations(token, organization_context):
1252    """
1253    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1254
1255    Args:
1256        token (str):
1257            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1258        organization_context (str):
1259            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1260
1261    Raises:
1262        Exception: Raised if the query fails.
1263
1264    Returns:
1265        list: List of Organization Objects
1266    """
1267    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1268                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
1269
1270
1271def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1272    """
1273    Get all results from a paginated GraphQL query
1274
1275    Args:
1276        token (str):
1277            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1278        organization_context (str):
1279            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1280        query (str):
1281            The GraphQL query string
1282        variables (dict, optional):
1283            Variables to be used in the GraphQL query, by default None
1284        field (str, required):
1285            The field in the response JSON that contains the results
1286        limit (int, Optional):
1287            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1288
1289    Raises:
1290        Exception: If the response status code is not 200, or if the field is not in the response JSON
1291
1292    Returns:
1293        list: List of results
1294    """
1295
1296    if not field:
1297        raise Exception("Error: field is required")
1298    if limit and limit > 1000:
1299        raise Exception("Error: limit cannot be greater than 1000")
1300    if limit and limit < 1:
1301        raise Exception("Error: limit cannot be less than 1")
1302    if not variables["first"]:
1303        raise Exception("Error: first is required")
1304    if variables["first"] < 1:
1305        raise Exception("Error: first cannot be less than 1")
1306    if variables["first"] > 1000:
1307        raise Exception("Error: limit cannot be greater than 1000")
1308
1309    # query the API for the first page of results
1310    response_data = send_graphql_query(token, organization_context, query, variables)
1311
1312    # if there are no results, return an empty list
1313    if not response_data:
1314        return []
1315
1316    # create a list to store the results
1317    results = []
1318
1319    # add the first page of results to the list
1320    if field in response_data['data']:
1321        results.extend(response_data['data'][field])
1322    else:
1323        raise Exception(f"Error: {field} not in response JSON")
1324
1325    if len(response_data['data'][field]) > 0:
1326        # get the cursor from the last entry in the list
1327        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1328
1329        while cursor:
1330            if limit and len(results) == limit:
1331                break
1332
1333            variables['after'] = cursor
1334
1335            # add the next page of results to the list
1336            response_data = send_graphql_query(token, organization_context, query, variables)
1337            results.extend(response_data['data'][field])
1338
1339            try:
1340                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1341            except IndexError:
1342                # when there is no additional cursor, stop getting more pages
1343                cursor = None
1344
1345    return results
1346
1347
1348def get_all_products(token, organization_context):
1349    """
1350    Get all products in the organization. Uses pagination to get all results.
1351
1352    Args:
1353        token (str):
1354            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1355        organization_context (str):
1356            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1357
1358    Raises:
1359        Exception: Raised if the query fails.
1360
1361    Returns:
1362        list: List of Product Objects
1363
1364    .. deprecated:: 0.1.4. Use get_products instead.
1365    """
1366    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1367    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1368                                     queries.ALL_PRODUCTS['variables'], 'allProducts')
1369
1370
1371def get_all_users(token, organization_context):
1372    """
1373    Get all users in the organization. Uses pagination to get all results.
1374
1375    Args:
1376        token (str):
1377            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1378        organization_context (str):
1379            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1380
1381    Raises:
1382        Exception: Raised if the query fails.
1383
1384    Returns:
1385        list: List of User Objects
1386    """
1387    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1388                                     queries.ALL_USERS['variables'], 'allUsers')
1389
1390
1391def get_artifact_context(token, organization_context, artifact_id):
1392    """
1393    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1394
1395    Args:
1396        token (str):
1397            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1398        organization_context (str):
1399            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1400
1401    Raises:
1402        Exception: Raised if the query fails.
1403
1404    Returns:
1405        dict: Artifact Context Object
1406    """
1407    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1408                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1409
1410    return artifact[0]['ctx']
1411
1412
1413def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1414    """
1415    Gets assets in the organization. Uses pagination to get all results.
1416
1417    Args:
1418        token (str):
1419            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1420        organization_context (str):
1421            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1422        asset_id (str, optional):
1423            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1424        business_unit_id (str, optional):
1425            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1426
1427    Raises:
1428        Exception: Raised if the query fails.
1429
1430    Returns:
1431        list: List of Asset Objects
1432    """
1433    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1434                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1435
1436
1437def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1438    """
1439    Gets asset versions in the organization. Uses pagination to get all results.
1440
1441    Args:
1442        token (str):
1443            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1444        organization_context (str):
1445            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1446        asset_version_id (str, optional):
1447            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1448        asset_id (str, optional):
1449            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1450        business_unit_id (str, optional):
1451            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1452
1453    Raises:
1454        Exception: Raised if the query fails.
1455
1456    Returns:
1457        list: List of AssetVersion Objects
1458    """
1459    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1460                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1461                                                                             asset_id=asset_id,
1462                                                                             business_unit_id=business_unit_id),
1463                                     'allAssetVersions')
1464
1465
1466def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1467    """
1468    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1469
1470    Args:
1471        client_id (str):
1472            CLIENT_ID as specified in the API documentation
1473        client_secret (str):
1474            CLIENT_SECRET as specified in the API documentation
1475        token_url (str, optional):
1476            Token URL, by default TOKEN_URL
1477        audience (str, optional):
1478            Audience, by default AUDIENCE
1479
1480    Raises:
1481        Exception: If the response status code is not 200
1482
1483    Returns:
1484        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1485    """
1486    payload = {
1487        "client_id": client_id,
1488        "client_secret": client_secret,
1489        "audience": AUDIENCE,
1490        "grant_type": "client_credentials"
1491    }
1492
1493    headers = {
1494        'content-type': "application/json"
1495    }
1496
1497    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1498    if response.status_code == 200:
1499        auth_token = response.json()['access_token']
1500    else:
1501        raise Exception(f"Error: {response.status_code} - {response.text}")
1502
1503    return auth_token
1504
1505
1506def get_findings(
1507    token,
1508    organization_context,
1509    asset_version_id=None,
1510    finding_id=None,
1511    category=None,
1512    status=None,
1513    severity=None,
1514    count=False,
1515    limit=None,
1516):
1517    """
1518    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1519    Args:
1520        token (str):
1521            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1522        organization_context (str):
1523            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1524        asset_version_id (str, optional):
1525            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1526        finding_id (str, optional):
1527            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1528        category (str, optional):
1529            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1530            This can be a single string, or an array of values.
1531        status (str, optional):
1532            The status of Findings to return.
1533        severity (str, optional):
1534            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1535        count (bool, optional):
1536            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1537        limit (int, optional):
1538            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1539
1540    Raises:
1541        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1542
1543    Returns:
1544        list: List of Finding Objects
1545    """
1546
1547    if limit and limit > 1000:
1548        raise Exception("Error: limit must be less than 1000")
1549    if limit and limit < 1:
1550        raise Exception("Error: limit must be greater than 0")
1551
1552    if count:
1553        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1554                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1555                                                                          finding_id=finding_id, category=category,
1556                                                                          status=status, severity=severity,
1557                                                                          limit=limit))["data"]["_allFindingsMeta"]
1558    else:
1559        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1560                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1561                                                                           finding_id=finding_id, category=category,
1562                                                                           status=status, severity=severity,
1563                                                                           limit=limit), 'allFindings', limit=limit)
1564
1565
1566def get_product_asset_versions(token, organization_context, product_id=None):
1567    """
1568    Gets all the asset versions for a product.
1569    Args:
1570        token (str):
1571            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1572        organization_context (str):
1573            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1574        product_id (str, optional):
1575            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1576    Raises:
1577        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1578    Returns:
1579        list: List of AssetVersion Objects
1580    """
1581    if not product_id:
1582        raise Exception("Product ID is required")
1583
1584    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1585                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1586
1587
1588def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1589    """
1590    Gets all the products for the specified business unit.
1591    Args:
1592        token (str):
1593            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1594        organization_context (str):
1595            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1596        product_id (str, optional):
1597            Product ID to get. If not provided, will get all products in the organization.
1598        business_unit_id (str, optional):
1599            Business Unit ID to get products for. If not provided, will get all products in the organization.
1600    Raises:
1601        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1602    Returns:
1603        list: List of Product Objects
1604    """
1605
1606    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1607                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1608                                                                       business_unit_id=business_unit_id),
1609                                     'allProducts')
1610
1611
1612def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1613                                 report_subtype=None, verbose=False) -> str:
1614    """
1615    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1616    This may take several minutes to complete, depending on the size of the report.
1617
1618    Args:
1619        token (str):
1620            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1621        organization_context (str):
1622            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1623        asset_version_id (str, optional):
1624            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1625        product_id (str, optional):
1626            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1627        report_type (str, required):
1628            The file type of the report to download. Valid values are "CSV" and "PDF".
1629        report_subtype (str, required):
1630            The type of report to download. Based on available reports for the `report_type` specified
1631            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1632            Valid values for PDF are "RISK_SUMMARY".
1633        verbose (bool, optional):
1634            If True, print additional information to the console. Defaults to False.
1635    """
1636    if not report_type:
1637        raise ValueError("Report Type is required")
1638    if not report_subtype:
1639        raise ValueError("Report Subtype is required")
1640    if not asset_version_id and not product_id:
1641        raise ValueError("Asset Version ID or Product ID is required")
1642
1643    if asset_version_id and product_id:
1644        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1645
1646    if report_type not in ["CSV", "PDF"]:
1647        raise Exception(f"Report Type {report_type} not supported")
1648
1649    if report_type == "CSV":
1650        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1651            raise Exception(f"Report Subtype {report_subtype} not supported")
1652
1653        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1654                                                            report_type=report_type, report_subtype=report_subtype)
1655        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1656                                                              report_type=report_type, report_subtype=report_subtype)
1657
1658        response_data = send_graphql_query(token, organization_context, mutation, variables)
1659        if verbose:
1660            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1661
1662        # get exportJobId from the result
1663        if asset_version_id:
1664            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1665        elif product_id:
1666            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1667        else:
1668            raise Exception(
1669                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1670
1671        if verbose:
1672            print(f'Export Job ID: {export_job_id}')
1673
1674    if report_type == "PDF":
1675        if report_subtype not in ["RISK_SUMMARY"]:
1676            raise Exception(f"Report Subtype {report_subtype} not supported")
1677
1678        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1679                                                            report_type=report_type, report_subtype=report_subtype)
1680        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1681                                                              report_type=report_type, report_subtype=report_subtype)
1682
1683        response_data = send_graphql_query(token, organization_context, mutation, variables)
1684        if verbose:
1685            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1686
1687        # get exportJobId from the result
1688        if asset_version_id:
1689            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1690        elif product_id:
1691            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1692        else:
1693            raise Exception(
1694                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1695
1696        if verbose:
1697            print(f'Export Job ID: {export_job_id}')
1698
1699    if not export_job_id:
1700        raise Exception(
1701            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1702
1703    # poll the API until the export job is complete
1704    sleep_time = 10
1705    total_time = 0
1706    if verbose:
1707        print(f'Polling every {sleep_time} seconds for export job to complete')
1708
1709    while True:
1710        time.sleep(sleep_time)
1711        total_time += sleep_time
1712        if verbose:
1713            print(f'Total time elapsed: {total_time} seconds')
1714
1715        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1716        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1717
1718        response_data = send_graphql_query(token, organization_context, query, variables)
1719
1720        if verbose:
1721            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1722
1723        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1724            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1725                if verbose:
1726                    print(
1727                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1728                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1729
1730
1731def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1732                               verbose=False) -> str:
1733    """
1734    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1735    This may take several minutes to complete, depending on the size of SBOM.
1736
1737    Args:
1738        token (str):
1739            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1740        organization_context (str):
1741            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1742        sbom_type (str, required):
1743            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1744        sbom_subtype (str, required):
1745            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1746        asset_version_id (str, required):
1747            Asset Version ID to download the SBOM for.
1748        verbose (bool, optional):
1749            If True, print additional information to the console. Defaults to False.
1750
1751    Raises:
1752        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1753        Exception: Raised if the query fails.
1754
1755    Returns:
1756        str: URL to download the SBOM from.
1757    """
1758
1759    if not sbom_type:
1760        raise ValueError("SBOM Type is required")
1761    if not sbom_subtype:
1762        raise ValueError("SBOM Subtype is required")
1763    if not asset_version_id:
1764        raise ValueError("Asset Version ID is required")
1765
1766    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1767        raise Exception(f"SBOM Type {sbom_type} not supported")
1768
1769    if sbom_type == "CYCLONEDX":
1770        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1771            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1772
1773        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1774        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1775
1776        response_data = send_graphql_query(token, organization_context, mutation, variables)
1777        if verbose:
1778            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1779
1780        # get exportJobId from the result
1781        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1782        if verbose:
1783            print(f'Export Job ID: {export_job_id}')
1784
1785    if sbom_type == "SPDX":
1786        if sbom_subtype not in ["SBOM_ONLY"]:
1787            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1788
1789        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1790        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1791
1792        response_data = send_graphql_query(token, organization_context, mutation, variables)
1793        if verbose:
1794            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1795
1796        # get exportJobId from the result
1797        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1798        if verbose:
1799            print(f'Export Job ID: {export_job_id}')
1800
1801    if not export_job_id:
1802        raise Exception(
1803            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1804
1805    # poll the API until the export job is complete
1806    sleep_time = 10
1807    total_time = 0
1808    if verbose:
1809        print(f'Polling every {sleep_time} seconds for export job to complete')
1810    while True:
1811        time.sleep(sleep_time)
1812        total_time += sleep_time
1813        if verbose:
1814            print(f'Total time elapsed: {total_time} seconds')
1815
1816        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1817        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1818
1819        response_data = send_graphql_query(token, organization_context, query, variables)
1820
1821        if verbose:
1822            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1823
1824        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1825            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1826                if verbose:
1827                    print(
1828                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1829                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1830
1831
1832def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1833    """
1834    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1835    Args:
1836        token (str):
1837            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1838        organization_context (str):
1839            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1840        asset_version_id (str, optional):
1841            Asset Version ID to get software components for.
1842        type (str, optional):
1843            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1844    Raises:
1845        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1846    Returns:
1847        list: List of Software Component Objects
1848    """
1849    if not asset_version_id:
1850        raise Exception("Asset Version ID is required")
1851
1852    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1853                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1854                                                                                  type=type),
1855                                     'allSoftwareComponentInstances')
1856
1857
1858def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1859                case_sensitive=False) -> list:
1860    """
1861    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1862    Search Methods: EXACT or CONTAINS
1863    An exact match will return only the software component whose name matches the name exactly.
1864    A contains match will return all software components whose name contains the search string.
1865
1866    Args:
1867        token (str):
1868            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1869        organization_context (str):
1870            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1871        name (str, required):
1872            Name of the software component to search for.
1873        version (str, optional):
1874            Version of the software component to search for. If not specified, will search for all versions of the software component.
1875        asset_version_id (str, optional):
1876            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1877        search_method (str, optional):
1878            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1879        case_sensitive (bool, optional):
1880            Whether or not to perform a case sensitive search. Defaults to False.
1881    Raises:
1882        ValueError: Raised if name is not provided.
1883        Exception: Raised if the query fails.
1884    Returns:
1885        list: List of SoftwareComponentInstance Objects
1886    """
1887    if asset_version_id:
1888        query = """
1889query GetSoftwareComponentInstances_SDK(
1890    $filter: SoftwareComponentInstanceFilter
1891    $after: String
1892    $first: Int
1893) {
1894    allSoftwareComponentInstances(
1895        filter: $filter
1896        after: $after
1897        first: $first
1898    ) {
1899        _cursor
1900        id
1901        name
1902        version
1903        originalComponents {
1904            id
1905            name
1906            version
1907        }
1908    }
1909}
1910"""
1911    else:
1912        # gets the asset version info that contains the software component
1913        query = """
1914query GetSoftwareComponentInstances_SDK(
1915    $filter: SoftwareComponentInstanceFilter
1916    $after: String
1917    $first: Int
1918) {
1919    allSoftwareComponentInstances(
1920        filter: $filter
1921        after: $after
1922        first: $first
1923    ) {
1924        _cursor
1925        id
1926        name
1927        version
1928        assetVersion {
1929            id
1930            name
1931            asset {
1932                id
1933                name
1934            }
1935        }
1936    }
1937}
1938"""
1939
1940    variables = {
1941        "filter": {
1942            "mergedComponentRefId": None
1943        },
1944        "after": None,
1945        "first": 100
1946    }
1947
1948    if asset_version_id:
1949        variables["filter"]["assetVersionRefId"] = asset_version_id
1950
1951    if search_method == 'EXACT':
1952        if case_sensitive:
1953            variables["filter"]["name"] = name
1954        else:
1955            variables["filter"]["name_like"] = name
1956    elif search_method == 'CONTAINS':
1957        variables["filter"]["name_contains"] = name
1958
1959    if version:
1960        if search_method == 'EXACT':
1961            variables["filter"]["version"] = version
1962        elif search_method == 'CONTAINS':
1963            variables["filter"]["version_contains"] = version
1964
1965    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1966                                        field="allSoftwareComponentInstances")
1967
1968    return records
1969
1970
1971def send_graphql_query(token, organization_context, query, variables=None):
1972    """
1973    Send a GraphQL query to the API
1974
1975    Args:
1976        token (str):
1977            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1978        organization_context (str):
1979            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1980        query (str):
1981            The GraphQL query string
1982        variables (dict, optional):
1983            Variables to be used in the GraphQL query, by default None
1984
1985    Raises:
1986        Exception: If the response status code is not 200
1987
1988    Returns:
1989        dict: Response JSON
1990    """
1991    headers = {
1992        'Content-Type': 'application/json',
1993        'Authorization': f'Bearer {token}',
1994        'Organization-Context': organization_context
1995    }
1996    data = {
1997        'query': query,
1998        'variables': variables
1999    }
2000
2001    response = requests.post(API_URL, headers=headers, json=data)
2002
2003    if response.status_code == 200:
2004        thejson = response.json()
2005
2006        if "errors" in thejson:
2007            raise Exception(f"Error: {thejson['errors']}")
2008
2009        return thejson
2010    else:
2011        raise Exception(f"Error: {response.status_code} - {response.text}")
2012
2013
2014def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2015                            justification=None, response=None, comment=None):
2016    """
2017    Updates the status of a findings or multiple findings. This is a blocking call.
2018
2019    Args:
2020        token (str):
2021            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2022        organization_context (str):
2023            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2024        user_id (str, required):
2025            User ID to update the finding status for.
2026        finding_ids (str, required):
2027            Finding ID to update the status for.
2028        status (str, required):
2029            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2030        justification (str, optional):
2031            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2032        response (str, optional):
2033            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2034        comment (str, optional):
2035            Optional comment to add to the finding status update.
2036
2037    Raises:
2038        ValueError: Raised if required parameters are not provided.
2039        Exception: Raised if the query fails.
2040
2041    Returns:
2042        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2043    """
2044    if not user_id:
2045        raise ValueError("User Id is required")
2046    if not finding_ids:
2047        raise ValueError("Finding Ids is required")
2048    if not status:
2049        raise ValueError("Status is required")
2050
2051    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2052    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2053                                                             justification=justification, response=response,
2054                                                             comment=comment)
2055
2056    return send_graphql_query(token, organization_context, mutation, variables)
2057
2058
2059def upload_file_for_binary_analysis(
2060    token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False
2061):
2062    """
2063    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk.
2064    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2065
2066    Args:
2067        token (str):
2068            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2069        organization_context (str):
2070            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2071        test_id (str, required):
2072            Test ID to upload the file for.
2073        file_path (str, required):
2074            Local path to the file to upload.
2075        chunk_size (int, optional):
2076            The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
2077        quick_scan (bool, optional):
2078            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2079        enable_bandit_scan (bool, optional):
2080            If True, will create an additional bandit scan in addition to the default binary analysis scan.
2081
2082    Raises:
2083        ValueError: Raised if test_id or file_path are not provided.
2084        Exception: Raised if the query fails.
2085
2086    Returns:
2087        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2088    """
2089    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2090    if not test_id:
2091        raise ValueError("Test Id is required")
2092    if not file_path:
2093        raise ValueError("File Path is required")
2094    if chunk_size < MIN_CHUNK_SIZE:
2095        raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes")
2096    if chunk_size >= MAX_CHUNK_SIZE:
2097        raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes")
2098
2099    # Start Multi-part Upload
2100    graphql_query = """
2101    mutation Start_SDK($testId: ID!) {
2102        startMultipartUploadV2(testId: $testId) {
2103            uploadId
2104            key
2105        }
2106    }
2107    """
2108
2109    variables = {
2110        "testId": test_id
2111    }
2112
2113    response = send_graphql_query(token, organization_context, graphql_query, variables)
2114
2115    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2116    upload_key = response['data']['startMultipartUploadV2']['key']
2117
2118    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2119    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2120    # and upload the file to the returned upload URL
2121    i = 0
2122    part_data = []
2123    for chunk in file_chunks(file_path, chunk_size):
2124        i = i + 1
2125        graphql_query = """
2126        mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2127            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2128                key
2129                uploadUrl
2130            }
2131        }
2132        """
2133
2134        variables = {
2135            "partNumber": i,
2136            "uploadId": upload_id,
2137            "uploadKey": upload_key
2138        }
2139
2140        response = send_graphql_query(token, organization_context, graphql_query, variables)
2141
2142        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2143
2144        # upload the chunk to the upload URL
2145        response = upload_bytes_to_url(chunk_upload_url, chunk)
2146
2147        part_data.append({
2148            "ETag": response.headers['ETag'],
2149            "PartNumber": i
2150        })
2151
2152    # call completeMultipartUploadV2
2153    graphql_query = """
2154    mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2155        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2156            key
2157        }
2158    }
2159    """
2160
2161    variables = {
2162        "partData": part_data,
2163        "uploadId": upload_id,
2164        "uploadKey": upload_key
2165    }
2166
2167    response = send_graphql_query(token, organization_context, graphql_query, variables)
2168
2169    # get key from the result
2170    key = response['data']['completeMultipartUploadV2']['key']
2171
2172    variables = {
2173        "key": key,
2174        "testId": test_id
2175    }
2176
2177    # call launchBinaryUploadProcessing
2178    if quick_scan or enable_bandit_scan:
2179        graphql_query = """
2180        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2181            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2182                key
2183                newBanditScanId
2184            }
2185        }
2186        """
2187    else:
2188        graphql_query = """
2189        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) {
2190            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2191                key
2192                newBanditScanId
2193            }
2194        }
2195        """
2196
2197    if quick_scan:
2198        variables["configurationOptions"] = ["QUICK_SCAN"]
2199
2200    if enable_bandit_scan:
2201        config_options = variables.get("configurationOptions", [])
2202        config_options.append("ENABLE_BANDIT_SCAN")
2203        variables["configurationOptions"] = config_options
2204
2205    response = send_graphql_query(token, organization_context, graphql_query, variables)
2206
2207    return response['data']
2208
2209
2210def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2211    """
2212    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2213
2214    Args:
2215        token (str):
2216            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2217        organization_context (str):
2218            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2219        test_id (str, required):
2220            Test ID to upload the file for.
2221        file_path (str, required):
2222            Local path to the file to upload.
2223
2224    Raises:
2225        ValueError: Raised if test_id or file_path are not provided.
2226        Exception: Raised if the query fails.
2227
2228    Returns:
2229        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2230    """
2231    if not test_id:
2232        raise ValueError("Test Id is required")
2233    if not file_path:
2234        raise ValueError("File Path is required")
2235
2236    # Gerneate Test Result Upload URL
2237    graphql_query = """
2238    mutation GenerateTestResultUploadUrl_SDK($testId: ID!) {
2239        generateSinglePartUploadUrl(testId: $testId) {
2240            uploadUrl
2241            key
2242        }
2243    }
2244    """
2245
2246    variables = {
2247        "testId": test_id
2248    }
2249
2250    response = send_graphql_query(token, organization_context, graphql_query, variables)
2251
2252    # get the upload URL and key
2253    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2254    key = response['data']['generateSinglePartUploadUrl']['key']
2255
2256    # upload the file
2257    upload_file_to_url(upload_url, file_path)
2258
2259    # complete the upload
2260    graphql_query = """
2261    mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) {
2262        launchTestResultProcessing(key: $key, testId: $testId) {
2263            key
2264        }
2265    }
2266    """
2267
2268    variables = {
2269        "testId": test_id,
2270        "key": key
2271    }
2272
2273    response = send_graphql_query(token, organization_context, graphql_query, variables)
2274    return response['data']
2275
2276
2277def upload_bytes_to_url(url, bytes):
2278    """
2279    Used for uploading a file to a pre-signed S3 URL
2280
2281    Args:
2282        url (str):
2283            (Pre-signed S3) URL
2284        bytes (bytes):
2285            Bytes to upload
2286
2287    Raises:
2288        Exception: If the response status code is not 200
2289
2290    Returns:
2291        requests.Response: Response object
2292    """
2293    response = requests.put(url, data=bytes)
2294
2295    if response.status_code == 200:
2296        return response
2297    else:
2298        raise Exception(f"Error: {response.status_code} - {response.text}")
2299
2300
2301def upload_file_to_url(url, file_path):
2302    """
2303    Used for uploading a file to a pre-signed S3 URL
2304
2305    Args:
2306        url (str):
2307            (Pre-signed S3) URL
2308        file_path (str):
2309            Local path to file to upload
2310
2311    Raises:
2312        Exception: If the response status code is not 200
2313
2314    Returns:
2315        requests.Response: Response object
2316    """
2317    with open(file_path, 'rb') as file:
2318        response = requests.put(url, data=file)
2319
2320    if response.status_code == 200:
2321        return response
2322    else:
2323        raise Exception(f"Error: {response.status_code} - {response.text}")
API_URL = 'https://platform.finitestate.io/api/v1/graphql'
AUDIENCE = 'https://platform.finitestate.io/api/v1/graphql'
TOKEN_URL = 'https://platform.finitestate.io/api/v1/auth/token'

DEFAULT CHUNK SIZE: 1000 MiB

DEFAULT_CHUNK_SIZE = 1048576000

MAX CHUNK SIZE: 2 GiB

MAX_CHUNK_SIZE = 2097152000

MIN CHUNK SIZE: 5 MiB

MIN_CHUNK_SIZE = 5242880
class UploadMethod(enum.Enum):
28class UploadMethod(Enum):
29    """
30    Enumeration class representing different upload methods.
31
32    Attributes:
33        WEB_APP_UI: Upload method via web application UI.
34        API: Upload method via API.
35        GITHUB_INTEGRATION: Upload method via GitHub integration.
36        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
37
38    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
39    """
40    WEB_APP_UI = "WEB_APP_UI"
41    API = "API"
42    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
43    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"

Enumeration class representing different upload methods.

Attributes:
  • WEB_APP_UI: Upload method via web application UI.
  • API: Upload method via API.
  • GITHUB_INTEGRATION: Upload method via GitHub integration.
  • AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.

To use any value from this enumeration, use UploadMethod. i.e. finite_state_sdk.UploadMethod.WEB_APP_UI

WEB_APP_UI = <UploadMethod.WEB_APP_UI: 'WEB_APP_UI'>
API = <UploadMethod.API: 'API'>
GITHUB_INTEGRATION = <UploadMethod.GITHUB_INTEGRATION: 'GITHUB_INTEGRATION'>
AZURE_DEVOPS_INTEGRATION = <UploadMethod.AZURE_DEVOPS_INTEGRATION: 'AZURE_DEVOPS_INTEGRATION'>
Inherited Members
enum.Enum
name
value
def create_artifact( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_version_id=None, artifact_name=None, product_id=None):
 46def create_artifact(
 47    token,
 48    organization_context,
 49    business_unit_id=None,
 50    created_by_user_id=None,
 51    asset_version_id=None,
 52    artifact_name=None,
 53    product_id=None,
 54):
 55    """
 56    Create a new Artifact.
 57    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 58    Please see the examples in the Github repository for more information:
 59    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 60    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 61
 62    Args:
 63        token (str):
 64            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 65        organization_context (str):
 66            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 67        business_unit_id (str, required):
 68            Business Unit ID to associate the artifact with.
 69        created_by_user_id (str, required):
 70            User ID of the user creating the artifact.
 71        asset_version_id (str, required):
 72            Asset Version ID to associate the artifact with.
 73        artifact_name (str, required):
 74            The name of the Artifact being created.
 75        product_id (str, optional):
 76            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
 77
 78    Raises:
 79        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
 80        Exception: Raised if the query fails.
 81
 82    Returns:
 83        dict: createArtifact Object
 84    """
 85    if not business_unit_id:
 86        raise ValueError("Business unit ID is required")
 87    if not created_by_user_id:
 88        raise ValueError("Created by user ID is required")
 89    if not asset_version_id:
 90        raise ValueError("Asset version ID is required")
 91    if not artifact_name:
 92        raise ValueError("Artifact name is required")
 93
 94    graphql_query = """
 95    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
 96        createArtifact(input: $input) {
 97            id
 98            name
 99            assetVersion {
100                id
101                name
102                asset {
103                    id
104                    name
105                }
106            }
107            createdBy {
108                id
109                email
110            }
111            ctx {
112                asset
113                products
114                businessUnits
115            }
116        }
117    }
118    """
119
120    # Asset name, business unit context, and creating user are required
121    variables = {
122        "input": {
123            "name": artifact_name,
124            "createdBy": created_by_user_id,
125            "assetVersion": asset_version_id,
126            "ctx": {
127                "asset": asset_version_id,
128                "businessUnits": [business_unit_id]
129            }
130        }
131    }
132
133    if product_id is not None:
134        variables["input"]["ctx"]["products"] = product_id
135
136    response = send_graphql_query(token, organization_context, graphql_query, variables)
137    return response['data']

Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the artifact with.
  • created_by_user_id (str, required): User ID of the user creating the artifact.
  • asset_version_id (str, required): Asset Version ID to associate the artifact with.
  • artifact_name (str, required): The name of the Artifact being created.
  • product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createArtifact Object

def create_asset( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
140def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
141    """
142    Create a new Asset.
143
144    Args:
145        token (str):
146            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
147        organization_context (str):
148            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
149        business_unit_id (str, required):
150            Business Unit ID to associate the asset with.
151        created_by_user_id (str, required):
152            User ID of the user creating the asset.
153        asset_name (str, required):
154            The name of the Asset being created.
155        product_id (str, optional):
156            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
157
158    Raises:
159        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
160        Exception: Raised if the query fails.
161
162    Returns:
163        dict: createAsset Object
164    """
165    if not business_unit_id:
166        raise ValueError("Business unit ID is required")
167    if not created_by_user_id:
168        raise ValueError("Created by user ID is required")
169    if not asset_name:
170        raise ValueError("Asset name is required")
171
172    graphql_query = """
173    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
174        createAsset(input: $input) {
175            id
176            name
177            dependentProducts {
178                id
179                name
180            }
181            group {
182                id
183                name
184            }
185            createdBy {
186                id
187                email
188            }
189            ctx {
190                asset
191                products
192                businessUnits
193            }
194        }
195    }
196    """
197
198    # Asset name, business unit context, and creating user are required
199    variables = {
200        "input": {
201            "name": asset_name,
202            "group": business_unit_id,
203            "createdBy": created_by_user_id,
204            "ctx": {
205                "businessUnits": [business_unit_id]
206            }
207        }
208    }
209
210    if product_id is not None:
211        variables["input"]["ctx"]["products"] = product_id
212
213    response = send_graphql_query(token, organization_context, graphql_query, variables)
214    return response['data']

Create a new Asset.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset with.
  • created_by_user_id (str, required): User ID of the user creating the asset.
  • asset_name (str, required): The name of the Asset being created.
  • product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAsset Object

def create_asset_version( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
217def create_asset_version(
218    token,
219    organization_context,
220    business_unit_id=None,
221    created_by_user_id=None,
222    asset_id=None,
223    asset_version_name=None,
224    product_id=None,
225):
226    """
227    Create a new Asset Version.
228
229    Args:
230        token (str):
231            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
232        organization_context (str):
233            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
234        business_unit_id (str, required):
235            Business Unit ID to associate the asset version with.
236        created_by_user_id (str, required):
237            User ID of the user creating the asset version.
238        asset_id (str, required):
239            Asset ID to associate the asset version with.
240        asset_version_name (str, required):
241            The name of the Asset Version being created.
242        product_id (str, optional):
243            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
244
245    Raises:
246        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
247        Exception: Raised if the query fails.
248
249    Returns:
250        dict: createAssetVersion Object
251
252    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
253    """
254    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
255    if not business_unit_id:
256        raise ValueError("Business unit ID is required")
257    if not created_by_user_id:
258        raise ValueError("Created by user ID is required")
259    if not asset_id:
260        raise ValueError("Asset ID is required")
261    if not asset_version_name:
262        raise ValueError("Asset version name is required")
263
264    graphql_query = """
265    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
266        createAssetVersion(input: $input) {
267            id
268            name
269            asset {
270                id
271                name
272            }
273            createdBy {
274                id
275                email
276            }
277            ctx {
278                asset
279                products
280                businessUnits
281            }
282        }
283    }
284    """
285
286    # Asset name, business unit context, and creating user are required
287    variables = {
288        "input": {
289            "name": asset_version_name,
290            "createdBy": created_by_user_id,
291            "asset": asset_id,
292            "ctx": {
293                "asset": asset_id,
294                "businessUnits": [business_unit_id]
295            }
296        }
297    }
298
299    if product_id is not None:
300        variables["input"]["ctx"]["products"] = product_id
301
302    response = send_graphql_query(token, organization_context, graphql_query, variables)
303    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset version with.
  • created_by_user_id (str, required): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
  • product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

deprecated:: 0.1.7. Use create_asset_version_on_asset instead.

def create_asset_version_on_asset( token, organization_context, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
306def create_asset_version_on_asset(
307    token,
308    organization_context,
309    created_by_user_id=None,
310    asset_id=None,
311    asset_version_name=None,
312    product_id=None,
313):
314    """
315    Create a new Asset Version.
316
317    Args:
318        token (str):
319            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
320        organization_context (str):
321            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
322        created_by_user_id (str, optional):
323            User ID of the user creating the asset version.
324        asset_id (str, required):
325            Asset ID to associate the asset version with.
326        asset_version_name (str, required):
327            The name of the Asset Version being created.
328
329    Raises:
330        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
331        Exception: Raised if the query fails.
332
333    Returns:
334        dict: createAssetVersion Object
335    """
336    if not asset_id:
337        raise ValueError("Asset ID is required")
338    if not asset_version_name:
339        raise ValueError("Asset version name is required")
340
341    graphql_query = """
342        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!, $productId: ID) {
343            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId, productId: $productId) {
344                id
345                assetVersion {
346                    id
347                }
348            }
349        }
350    """
351
352    # Asset name, business unit context, and creating user are required
353    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
354
355    if created_by_user_id:
356        variables["createdByUserId"] = created_by_user_id
357
358    if product_id:
359        variables["productId"] = product_id
360
361    response = send_graphql_query(token, organization_context, graphql_query, variables)
362    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • created_by_user_id (str, optional): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

def create_new_asset_version_artifact_and_test_for_upload( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, product_id=None, test_type=None, artifact_description=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
365def create_new_asset_version_artifact_and_test_for_upload(
366    token,
367    organization_context,
368    business_unit_id=None,
369    created_by_user_id=None,
370    asset_id=None,
371    version=None,
372    product_id=None,
373    test_type=None,
374    artifact_description=None,
375    upload_method: UploadMethod = UploadMethod.API,
376):
377    """
378    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
379    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
380
381    Args:
382        token (str):
383            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
384        organization_context (str):
385            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
386        business_unit_id (str, optional):
387            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
388        created_by_user_id (str, optional):
389            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
390        asset_id (str, required):
391            Asset ID to create the asset version for. If not provided, the default asset will be used.
392        version (str, required):
393            Version to create the asset version for.
394        product_id (str, optional):
395            Product ID to create the entities for. If not provided, the default product will be used.
396        test_type (str, required):
397            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
398        artifact_description (str, optional):
399            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
400        upload_method (UploadMethod, optional):
401            The method of uploading the test results. Default is UploadMethod.API.
402
403
404    Raises:
405        ValueError: Raised if asset_id or version are not provided.
406        Exception: Raised if the query fails.
407
408    Returns:
409        str: The Test ID of the newly created test that is used for uploading the file.
410    """
411    if not asset_id:
412        raise ValueError("Asset ID is required")
413    if not version:
414        raise ValueError("Version is required")
415
416    assets = get_all_assets(token, organization_context, asset_id=asset_id)
417    if not assets:
418        raise ValueError("No assets found with the provided asset ID")
419    asset = assets[0]
420
421    # get the asset name
422    asset_name = asset['name']
423
424    # get the existing asset product IDs
425    asset_product_ids = asset['ctx']['products']
426
427    # get the asset product ID
428    if product_id and product_id not in asset_product_ids:
429        asset_product_ids.append(product_id)
430
431    # if business_unit_id or created_by_user_id are not provided, get the existing asset
432    if not business_unit_id or not created_by_user_id:
433        if not business_unit_id:
434            business_unit_id = asset['group']['id']
435        if not created_by_user_id:
436            created_by_user_id = asset['createdBy']['id']
437
438        if not business_unit_id:
439            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
440        if not created_by_user_id:
441            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
442
443    # create the asset version
444    response = create_asset_version_on_asset(
445        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version, product_id=product_id
446    )
447    # get the asset version ID
448    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
449
450    # create the test
451    if test_type == "finite_state_binary_analysis":
452        # create the artifact
453        if not artifact_description:
454            artifact_description = "Binary"
455        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
456        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
457                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
458                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
459
460        # get the artifact ID
461        binary_artifact_id = response['createArtifact']['id']
462
463        # create the test
464        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
465        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
466                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
467                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
468                                                  test_name=test_name, upload_method=upload_method)
469        test_id = response['createTest']['id']
470        return test_id
471
472    else:
473        # create the artifact
474        if not artifact_description:
475            artifact_description = "Unspecified Artifact"
476        artifact_name = f"{asset_name} {version} - {artifact_description}"
477        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
478                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
479                                   artifact_name=artifact_name, product_id=asset_product_ids)
480
481        # get the artifact ID
482        binary_artifact_id = response['createArtifact']['id']
483
484        # create the test
485        test_name = f"{asset_name} {version} - {test_type}"
486        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
487                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
488                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
489                                                      test_name=test_name, test_type=test_type,
490                                                      upload_method=upload_method)
491        test_id = response['createTest']['id']
492        return test_id

Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
  • created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
  • version (str, required): Version to create the asset version for.
  • product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
  • test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
  • artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id or version are not provided.
  • Exception: Raised if the query fails.
Returns:

str: The Test ID of the newly created test that is used for uploading the file.

def create_new_asset_version_and_upload_binary( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, artifact_description=None, quick_scan=False, upload_method: UploadMethod = <UploadMethod.API: 'API'>, enable_bandit_scan: bool = False):
495def create_new_asset_version_and_upload_binary(
496    token,
497    organization_context,
498    business_unit_id=None,
499    created_by_user_id=None,
500    asset_id=None,
501    version=None,
502    file_path=None,
503    product_id=None,
504    artifact_description=None,
505    quick_scan=False,
506    upload_method: UploadMethod = UploadMethod.API,
507    enable_bandit_scan: bool = False,
508):
509    """
510    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
511    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
512
513    Args:
514        token (str):
515            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
516        organization_context (str):
517            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
518        business_unit_id (str, optional):
519            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
520        created_by_user_id (str, optional):
521            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
522        asset_id (str, required):
523            Asset ID to create the asset version for.
524        version (str, required):
525            Version to create the asset version for.
526        file_path (str, required):
527            Local path to the file to upload.
528        product_id (str, optional):
529            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
530        artifact_description (str, optional):
531            Description of the artifact. If not provided, the default is "Firmware Binary".
532        quick_scan (bool, optional):
533            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
534        upload_method (UploadMethod, optional):
535            The method of uploading the test results. Default is UploadMethod.API.
536        enable_bandit_scan (bool, optional):
537            If True, will create an additional bandit scan in addition to the default binary analysis scan.
538
539    Raises:
540        ValueError: Raised if asset_id, version, or file_path are not provided.
541        Exception: Raised if any of the queries fail.
542
543    Returns:
544        dict: The response from the GraphQL query, a createAssetVersion Object.
545    """
546    if not asset_id:
547        raise ValueError("Asset ID is required")
548    if not version:
549        raise ValueError("Version is required")
550    if not file_path:
551        raise ValueError("File path is required")
552
553    # create the asset version and binary test
554    if not artifact_description:
555        artifact_description = "Firmware Binary"
556    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
557        token,
558        organization_context,
559        business_unit_id=business_unit_id,
560        created_by_user_id=created_by_user_id,
561        asset_id=asset_id,
562        version=version,
563        product_id=product_id,
564        test_type="finite_state_binary_analysis",
565        artifact_description=artifact_description,
566        upload_method=upload_method,
567    )
568
569    # upload file for binary test
570    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
571                                               quick_scan=quick_scan, enable_bandit_scan=enable_bandit_scan)
572    return response

Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Local path to the file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
  • artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
  • quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
  • enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
  • ValueError: Raised if asset_id, version, or file_path are not provided.
  • Exception: Raised if any of the queries fail.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_new_asset_version_and_upload_test_results( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, test_type=None, artifact_description='', upload_method: UploadMethod = <UploadMethod.API: 'API'>):
575def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
576                                                     created_by_user_id=None, asset_id=None, version=None,
577                                                     file_path=None, product_id=None, test_type=None,
578                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
579    """
580    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
581    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
582
583    Args:
584        token (str):
585            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
586        organization_context (str):
587            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
588        business_unit_id (str, optional):
589            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
590        created_by_user_id (str, optional):
591            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
592        asset_id (str, required):
593            Asset ID to create the asset version for.
594        version (str, required):
595            Version to create the asset version for.
596        file_path (str, required):
597            Path to the test results file to upload.
598        product_id (str, optional):
599            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
600        test_type (str, required):
601            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
602        artifact_description (str, optional):
603            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
604        upload_method (UploadMethod, optional):
605            The method of uploading the test results. Default is UploadMethod.API.
606
607    Raises:
608        ValueError: If the asset_id, version, or file_path are not provided.
609        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
610
611    Returns:
612        dict: The response from the GraphQL query, a createAssetVersion Object.
613    """
614    if not asset_id:
615        raise ValueError("Asset ID is required")
616    if not version:
617        raise ValueError("Version is required")
618    if not file_path:
619        raise ValueError("File path is required")
620    if not test_type:
621        raise ValueError("Test type is required")
622
623    # create the asset version and test
624    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
625                                                                    business_unit_id=business_unit_id,
626                                                                    created_by_user_id=created_by_user_id,
627                                                                    asset_id=asset_id, version=version,
628                                                                    product_id=product_id, test_type=test_type,
629                                                                    artifact_description=artifact_description,
630                                                                    upload_method=upload_method)
631
632    # upload test results file
633    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
634    return response

Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Path to the test results file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
  • test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
  • artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: If the asset_id, version, or file_path are not provided.
  • Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_product( token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, product_description=None, vendor_id=None, vendor_name=None):
637def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
638                   product_description=None, vendor_id=None, vendor_name=None):
639    """
640    Create a new Product.
641
642    Args:
643        token (str):
644            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
645        organization_context (str):
646            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
647        business_unit_id (str, required):
648            Business Unit ID to associate the product with.
649        created_by_user_id (str, required):
650            User ID of the user creating the product.
651        product_name (str, required):
652            The name of the Product being created.
653        product_description (str, optional):
654            The description of the Product being created.
655        vendor_id (str, optional):
656            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
657        vendor_name (str, optional):
658            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
659
660    Raises:
661        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
662        Exception: Raised if the query fails.
663
664    Returns:
665        dict: createProduct Object
666    """
667
668    if not business_unit_id:
669        raise ValueError("Business unit ID is required")
670    if not created_by_user_id:
671        raise ValueError("Created by user ID is required")
672    if not product_name:
673        raise ValueError("Product name is required")
674
675    graphql_query = """
676    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
677        createProduct(input: $input) {
678            id
679            name
680            vendor {
681                name
682            }
683            group {
684                id
685                name
686            }
687            createdBy {
688                id
689                email
690            }
691            ctx {
692                businessUnit
693            }
694        }
695    }
696    """
697
698    # Product name, business unit context, and creating user are required
699    variables = {
700        "input": {
701            "name": product_name,
702            "group": business_unit_id,
703            "createdBy": created_by_user_id,
704            "ctx": {
705                "businessUnit": business_unit_id
706            }
707        }
708    }
709
710    if product_description is not None:
711        variables["input"]["description"] = product_description
712
713    # If the vendor ID is specified, this will link the new product to the existing vendor
714    if vendor_id is not None:
715        variables["input"]["vendor"] = {
716            "id": vendor_id
717        }
718
719    # If the vendor name is specified, this will create a new vendor and link it to the new product
720    if vendor_name is not None:
721        variables["input"]["createVendor"] = {
722            "name": vendor_name
723        }
724
725    response = send_graphql_query(token, organization_context, graphql_query, variables)
726
727    return response['data']

Create a new Product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the product with.
  • created_by_user_id (str, required): User ID of the user creating the product.
  • product_name (str, required): The name of the Product being created.
  • product_description (str, optional): The description of the Product being created.
  • vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
  • vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createProduct Object

def create_test( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, tools=[], upload_method: UploadMethod = <UploadMethod.API: 'API'>):
730def create_test(
731    token,
732    organization_context,
733    business_unit_id=None,
734    created_by_user_id=None,
735    asset_id=None,
736    artifact_id=None,
737    test_name=None,
738    product_id=None,
739    test_type=None,
740    tools=[],
741    upload_method: UploadMethod = UploadMethod.API,
742):
743    """
744    Create a new Test object for uploading files.
745    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
746    Please see the examples in the Github repository for more information:
747    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
748    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
749
750    Args:
751        token (str):
752            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
753        organization_context (str):
754            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
755        business_unit_id (str, required):
756            Business Unit ID to associate the Test with.
757        created_by_user_id (str, required):
758            User ID of the user creating the Test.
759        asset_id (str, required):
760            Asset ID to associate the Test with.
761        artifact_id (str, required):
762            Artifact ID to associate the Test with.
763        test_name (str, required):
764            The name of the Test being created.
765        product_id (str, optional):
766            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
767        test_type (str, required):
768            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
769        tools (list, optional):
770            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
771        upload_method (UploadMethod, required):
772            The method of uploading the test results.
773
774    Raises:
775        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
776        Exception: Raised if the query fails.
777
778    Returns:
779        dict: createTest Object
780    """
781    if not business_unit_id:
782        raise ValueError("Business unit ID is required")
783    if not created_by_user_id:
784        raise ValueError("Created by user ID is required")
785    if not asset_id:
786        raise ValueError("Asset ID is required")
787    if not artifact_id:
788        raise ValueError("Artifact ID is required")
789    if not test_name:
790        raise ValueError("Test name is required")
791    if not test_type:
792        raise ValueError("Test type is required")
793
794    graphql_query = """
795    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
796        createTest(input: $input) {
797            id
798            name
799            artifactUnderTest {
800                id
801                name
802                assetVersion {
803                    id
804                    name
805                    asset {
806                        id
807                        name
808                        dependentProducts {
809                            id
810                            name
811                        }
812                    }
813                }
814            }
815            createdBy {
816                id
817                email
818            }
819            ctx {
820                asset
821                products
822                businessUnits
823            }
824            uploadMethod
825        }
826    }
827    """
828
829    # Asset name, business unit context, and creating user are required
830    variables = {
831        "input": {
832            "name": test_name,
833            "createdBy": created_by_user_id,
834            "artifactUnderTest": artifact_id,
835            "testResultFileFormat": test_type,
836            "ctx": {
837                "asset": asset_id,
838                "businessUnits": [business_unit_id]
839            },
840            "tools": tools,
841            "uploadMethod": upload_method.value
842        }
843    }
844
845    if product_id is not None:
846        variables["input"]["ctx"]["products"] = product_id
847
848    response = send_graphql_query(token, organization_context, graphql_query, variables)
849    return response['data']

Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
  • tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
  • upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_binary_analysis( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
852def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
853                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
854                                   upload_method: UploadMethod = UploadMethod.API):
855    """
856    Create a new Test object for uploading files for Finite State Binary Analysis.
857
858    Args:
859        token (str):
860            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
861        organization_context (str):
862            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
863        business_unit_id (str, required):
864            Business Unit ID to associate the Test with.
865        created_by_user_id (str, required):
866            User ID of the user creating the Test.
867        asset_id (str, required):
868            Asset ID to associate the Test with.
869        artifact_id (str, required):
870            Artifact ID to associate the Test with.
871        test_name (str, required):
872            The name of the Test being created.
873        product_id (str, optional):
874            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
875        upload_method (UploadMethod, optional):
876            The method of uploading the test results. Default is UploadMethod.API.
877
878    Raises:
879        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
880        Exception: Raised if the query fails.
881
882    Returns:
883        dict: createTest Object
884    """
885    tools = [
886        {
887            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
888            "name": "Finite State Binary Analysis"
889        }
890    ]
891    return create_test(token, organization_context, business_unit_id=business_unit_id,
892                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
893                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
894                       tools=tools, upload_method=upload_method)

Create a new Test object for uploading files for Finite State Binary Analysis.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_cyclone_dx( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
897def create_test_as_cyclone_dx(
898    token,
899    organization_context,
900    business_unit_id=None,
901    created_by_user_id=None,
902    asset_id=None,
903    artifact_id=None,
904    test_name=None,
905    product_id=None,
906    upload_method: UploadMethod = UploadMethod.API,
907):
908    """
909    Create a new Test object for uploading CycloneDX files.
910
911    Args:
912        token (str):
913            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
914        organization_context (str):
915            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
916        business_unit_id (str, required):
917            Business Unit ID to associate the Test with.
918        created_by_user_id (str, required):
919            User ID of the user creating the Test.
920        asset_id (str, required):
921            Asset ID to associate the Test with.
922        artifact_id (str, required):
923            Artifact ID to associate the Test with.
924        test_name (str, required):
925            The name of the Test being created.
926        product_id (str, optional):
927            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
928        upload_method (UploadMethod, optional):
929            The method of uploading the test results. Default is UploadMethod.API.
930
931    Raises:
932        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
933        Exception: Raised if the query fails.
934
935    Returns:
936        dict: createTest Object
937    """
938    return create_test(token, organization_context, business_unit_id=business_unit_id,
939                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
940                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)

Create a new Test object for uploading CycloneDX files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_third_party_scanner( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
943def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
944                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
945                                       upload_method: UploadMethod = UploadMethod.API):
946    """
947    Create a new Test object for uploading Third Party Scanner files.
948
949    Args:
950        token (str):
951            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
952        organization_context (str):
953            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
954        business_unit_id (str, required):
955            Business Unit ID to associate the Test with.
956        created_by_user_id (str, required):
957            User ID of the user creating the Test.
958        asset_id (str, required):
959            Asset ID to associate the Test with.
960        artifact_id (str, required):
961            Artifact ID to associate the Test with.
962        test_name (str, required):
963            The name of the Test being created.
964        product_id (str, optional):
965            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
966        test_type (str, required):
967            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
968        upload_method (UploadMethod, optional):
969            The method of uploading the test results. Default is UploadMethod.API.
970
971    Raises:
972        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
973        Exception: Raised if the query fails.
974
975    Returns:
976        dict: createTest Object
977    """
978    return create_test(token, organization_context, business_unit_id=business_unit_id,
979                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
980                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)

Create a new Test object for uploading Third Party Scanner files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def download_asset_version_report( token, organization_context, asset_version_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
 983def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 984                                  report_subtype=None, output_filename=None, verbose=False):
 985    """
 986    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 987
 988    Args:
 989        token (str):
 990            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 991        organization_context (str):
 992            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 993        asset_version_id (str, required):
 994            The Asset Version ID to download the report for.
 995        report_type (str, required):
 996            The file type of the report to download. Valid values are "CSV" and "PDF".
 997        report_subtype (str, required):
 998            The type of report to download. Based on available reports for the `report_type` specified
 999            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1000            Valid values for PDF are "RISK_SUMMARY".
1001        output_filename (str, optional):
1002            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1003        verbose (bool, optional):
1004            If True, will print additional information to the console. Defaults to False.
1005
1006    Raises:
1007        ValueError: Raised if required parameters are not provided.
1008        Exception: Raised if the query fails.
1009
1010    Returns:
1011        None
1012    """
1013    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1014                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1015
1016    # Send an HTTP GET request to the URL
1017    response = requests.get(url)
1018
1019    # Check if the request was successful (status code 200)
1020    if response.status_code == 200:
1021        # Open a local file in binary write mode and write the content to it
1022        if verbose:
1023            print("File downloaded successfully.")
1024        with open(output_filename, 'wb') as file:
1025            file.write(response.content)
1026            if verbose:
1027                print(f'Wrote file to {output_filename}')
1028    else:
1029        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, required): The Asset Version ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def download_product_report( token, organization_context, product_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
1032def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1033                            output_filename=None, verbose=False):
1034    """
1035    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1036
1037    Args:
1038        token (str):
1039            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1040        organization_context (str):
1041            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1042        product_id (str, required):
1043            The Product ID to download the report for.
1044        report_type (str, required):
1045            The file type of the report to download. Valid values are "CSV".
1046        report_subtype (str, required):
1047            The type of report to download. Based on available reports for the `report_type` specified
1048            Valid values for CSV are "ALL_FINDINGS".
1049        output_filename (str, optional):
1050            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1051        verbose (bool, optional):
1052            If True, will print additional information to the console. Defaults to False.
1053    """
1054    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1055                                       report_subtype=report_subtype, verbose=verbose)
1056
1057    # Send an HTTP GET request to the URL
1058    response = requests.get(url)
1059
1060    # Check if the request was successful (status code 200)
1061    if response.status_code == 200:
1062        # Open a local file in binary write mode and write the content to it
1063        if verbose:
1064            print("File downloaded successfully.")
1065        with open(output_filename, 'wb') as file:
1066            file.write(response.content)
1067            if verbose:
1068                print(f'Wrote file to {output_filename}')
1069    else:
1070        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, required): The Product ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
def download_sbom( token, organization_context, sbom_type='CYCLONEDX', sbom_subtype='SBOM_ONLY', asset_version_id=None, output_filename='sbom.json', verbose=False):
1073def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1074                  output_filename="sbom.json", verbose=False):
1075    """
1076    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1077
1078    Args:
1079        token (str):
1080            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1081        organization_context (str):
1082            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1083        sbom_type (str, required):
1084            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1085        sbom_subtype (str, required):
1086            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1087        asset_version_id (str, required):
1088            The Asset Version ID to download the SBOM for.
1089        output_filename (str, required):
1090            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1091        verbose (bool, optional):
1092            If True, will print additional information to the console. Defaults to False.
1093
1094    Raises:
1095        ValueError: Raised if required parameters are not provided.
1096        Exception: Raised if the query fails.
1097
1098    Returns:
1099        None
1100    """
1101    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1102                                     asset_version_id=asset_version_id, verbose=verbose)
1103
1104    # Send an HTTP GET request to the URL
1105    response = requests.get(url)
1106
1107    # Check if the request was successful (status code 200)
1108    if response.status_code == 200:
1109        # Open a local file in binary write mode and write the content to it
1110        if verbose:
1111            print("File downloaded successfully.")
1112        with open(output_filename, 'wb') as file:
1113            file.write(response.content)
1114            if verbose:
1115                print(f'Wrote file to {output_filename}')
1116    else:
1117        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
  • asset_version_id (str, required): The Asset Version ID to download the SBOM for.
  • output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def file_chunks(file_path, chunk_size=1048576000):
1120def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1121    """
1122    Helper method to read a file in chunks.
1123
1124    Args:
1125        file_path (str):
1126            Local path to the file to read.
1127        chunk_size (int, optional):
1128            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1129
1130    Yields:
1131        bytes: The next chunk of the file.
1132
1133    Raises:
1134        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1135    """
1136    with open(file_path, 'rb') as f:
1137        while True:
1138            chunk = f.read(chunk_size)
1139            if chunk:
1140                yield chunk
1141            else:
1142                break

Helper method to read a file in chunks.

Arguments:
  • file_path (str): Local path to the file to read.
  • chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:

bytes: The next chunk of the file.

Raises:
  • FileIO Exceptions: Raised if the file cannot be opened or read correctly.
def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1145def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1146    """
1147    Get all artifacts in the organization. Uses pagination to get all results.
1148
1149    Args:
1150        token (str):
1151            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1152        organization_context (str):
1153            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1154        artifact_id (str, optional):
1155            An optional Artifact ID if this is used to get a single artifact, by default None
1156        business_unit_id (str, optional):
1157            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1158
1159    Raises:
1160        Exception: Raised if the query fails.
1161
1162    Returns:
1163        list: List of Artifact Objects
1164    """
1165    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1166                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')

Get all artifacts in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
  • business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Artifact Objects

def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1169def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1170    """
1171    Gets all assets in the organization. Uses pagination to get all results.
1172
1173    Args:
1174        token (str):
1175            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1176        organization_context (str):
1177            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1178        asset_id (str, optional):
1179            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1180        business_unit_id (str, optional):
1181            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1182
1183    Raises:
1184        Exception: Raised if the query fails.
1185
1186    Returns:
1187        list: List of Asset Objects
1188    """
1189    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1190                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets all assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_all_asset_versions(token, organization_context):
1193def get_all_asset_versions(token, organization_context):
1194    """
1195    Get all asset versions in the organization. Uses pagination to get all results.
1196
1197    Args:
1198        token (str):
1199            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1200        organization_context (str):
1201            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1202
1203    Raises:
1204        Exception: Raised if the query fails.
1205
1206    Returns:
1207        list: List of AssetVersion Objects
1208    """
1209    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1210                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')

Get all asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_all_asset_versions_for_product(token, organization_context, product_id):
1213def get_all_asset_versions_for_product(token, organization_context, product_id):
1214    """
1215    Get all asset versions for a product. Uses pagination to get all results.
1216
1217    Args:
1218        token (str):
1219            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1220        organization_context (str):
1221            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1222        product_id (str):
1223            The Product ID to get asset versions for
1224
1225    Returns:
1226        list: List of AssetVersion Objects
1227    """
1228    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1229                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Get all asset versions for a product. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str): The Product ID to get asset versions for
Returns:

list: List of AssetVersion Objects

def get_all_business_units(token, organization_context):
1232def get_all_business_units(token, organization_context):
1233    """
1234    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1235
1236    Args:
1237        token (str):
1238            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1239        organization_context (str):
1240            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1241
1242    Raises:
1243        Exception: Raised if the query fails.
1244
1245    Returns:
1246        list: List of Group Objects
1247    """
1248    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1249                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')

Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Group Objects

def get_all_organizations(token, organization_context):
1252def get_all_organizations(token, organization_context):
1253    """
1254    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1255
1256    Args:
1257        token (str):
1258            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1259        organization_context (str):
1260            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1261
1262    Raises:
1263        Exception: Raised if the query fails.
1264
1265    Returns:
1266        list: List of Organization Objects
1267    """
1268    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1269                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')

Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Organization Objects

def get_all_paginated_results( token, organization_context, query, variables=None, field=None, limit=None):
1272def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1273    """
1274    Get all results from a paginated GraphQL query
1275
1276    Args:
1277        token (str):
1278            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1279        organization_context (str):
1280            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1281        query (str):
1282            The GraphQL query string
1283        variables (dict, optional):
1284            Variables to be used in the GraphQL query, by default None
1285        field (str, required):
1286            The field in the response JSON that contains the results
1287        limit (int, Optional):
1288            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1289
1290    Raises:
1291        Exception: If the response status code is not 200, or if the field is not in the response JSON
1292
1293    Returns:
1294        list: List of results
1295    """
1296
1297    if not field:
1298        raise Exception("Error: field is required")
1299    if limit and limit > 1000:
1300        raise Exception("Error: limit cannot be greater than 1000")
1301    if limit and limit < 1:
1302        raise Exception("Error: limit cannot be less than 1")
1303    if not variables["first"]:
1304        raise Exception("Error: first is required")
1305    if variables["first"] < 1:
1306        raise Exception("Error: first cannot be less than 1")
1307    if variables["first"] > 1000:
1308        raise Exception("Error: limit cannot be greater than 1000")
1309
1310    # query the API for the first page of results
1311    response_data = send_graphql_query(token, organization_context, query, variables)
1312
1313    # if there are no results, return an empty list
1314    if not response_data:
1315        return []
1316
1317    # create a list to store the results
1318    results = []
1319
1320    # add the first page of results to the list
1321    if field in response_data['data']:
1322        results.extend(response_data['data'][field])
1323    else:
1324        raise Exception(f"Error: {field} not in response JSON")
1325
1326    if len(response_data['data'][field]) > 0:
1327        # get the cursor from the last entry in the list
1328        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1329
1330        while cursor:
1331            if limit and len(results) == limit:
1332                break
1333
1334            variables['after'] = cursor
1335
1336            # add the next page of results to the list
1337            response_data = send_graphql_query(token, organization_context, query, variables)
1338            results.extend(response_data['data'][field])
1339
1340            try:
1341                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1342            except IndexError:
1343                # when there is no additional cursor, stop getting more pages
1344                cursor = None
1345
1346    return results

Get all results from a paginated GraphQL query

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
  • field (str, required): The field in the response JSON that contains the results
  • limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
  • Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:

list: List of results

def get_all_products(token, organization_context):
1349def get_all_products(token, organization_context):
1350    """
1351    Get all products in the organization. Uses pagination to get all results.
1352
1353    Args:
1354        token (str):
1355            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1356        organization_context (str):
1357            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1358
1359    Raises:
1360        Exception: Raised if the query fails.
1361
1362    Returns:
1363        list: List of Product Objects
1364
1365    .. deprecated:: 0.1.4. Use get_products instead.
1366    """
1367    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1368    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1369                                     queries.ALL_PRODUCTS['variables'], 'allProducts')

Get all products in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Product Objects

Deprecated since version 0.1.4. Use get_products instead..

def get_all_users(token, organization_context):
1372def get_all_users(token, organization_context):
1373    """
1374    Get all users in the organization. Uses pagination to get all results.
1375
1376    Args:
1377        token (str):
1378            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1379        organization_context (str):
1380            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1381
1382    Raises:
1383        Exception: Raised if the query fails.
1384
1385    Returns:
1386        list: List of User Objects
1387    """
1388    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1389                                     queries.ALL_USERS['variables'], 'allUsers')

Get all users in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of User Objects

def get_artifact_context(token, organization_context, artifact_id):
1392def get_artifact_context(token, organization_context, artifact_id):
1393    """
1394    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1395
1396    Args:
1397        token (str):
1398            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1399        organization_context (str):
1400            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1401
1402    Raises:
1403        Exception: Raised if the query fails.
1404
1405    Returns:
1406        dict: Artifact Context Object
1407    """
1408    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1409                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1410
1411    return artifact[0]['ctx']

Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

dict: Artifact Context Object

def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1414def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1415    """
1416    Gets assets in the organization. Uses pagination to get all results.
1417
1418    Args:
1419        token (str):
1420            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1421        organization_context (str):
1422            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1423        asset_id (str, optional):
1424            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1425        business_unit_id (str, optional):
1426            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1427
1428    Raises:
1429        Exception: Raised if the query fails.
1430
1431    Returns:
1432        list: List of Asset Objects
1433    """
1434    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1435                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_asset_versions( token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1438def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1439    """
1440    Gets asset versions in the organization. Uses pagination to get all results.
1441
1442    Args:
1443        token (str):
1444            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1445        organization_context (str):
1446            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1447        asset_version_id (str, optional):
1448            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1449        asset_id (str, optional):
1450            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1451        business_unit_id (str, optional):
1452            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1453
1454    Raises:
1455        Exception: Raised if the query fails.
1456
1457    Returns:
1458        list: List of AssetVersion Objects
1459    """
1460    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1461                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1462                                                                             asset_id=asset_id,
1463                                                                             business_unit_id=business_unit_id),
1464                                     'allAssetVersions')

Gets asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
  • asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_auth_token( client_id, client_secret, token_url='https://platform.finitestate.io/api/v1/auth/token', audience='https://platform.finitestate.io/api/v1/graphql'):
1467def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1468    """
1469    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1470
1471    Args:
1472        client_id (str):
1473            CLIENT_ID as specified in the API documentation
1474        client_secret (str):
1475            CLIENT_SECRET as specified in the API documentation
1476        token_url (str, optional):
1477            Token URL, by default TOKEN_URL
1478        audience (str, optional):
1479            Audience, by default AUDIENCE
1480
1481    Raises:
1482        Exception: If the response status code is not 200
1483
1484    Returns:
1485        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1486    """
1487    payload = {
1488        "client_id": client_id,
1489        "client_secret": client_secret,
1490        "audience": AUDIENCE,
1491        "grant_type": "client_credentials"
1492    }
1493
1494    headers = {
1495        'content-type': "application/json"
1496    }
1497
1498    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1499    if response.status_code == 200:
1500        auth_token = response.json()['access_token']
1501    else:
1502        raise Exception(f"Error: {response.status_code} - {response.text}")
1503
1504    return auth_token

Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET

Arguments:
  • client_id (str): CLIENT_ID as specified in the API documentation
  • client_secret (str): CLIENT_SECRET as specified in the API documentation
  • token_url (str, optional): Token URL, by default TOKEN_URL
  • audience (str, optional): Audience, by default AUDIENCE
Raises:
  • Exception: If the response status code is not 200
Returns:

str: Auth token. Use this token as the Authorization header in subsequent API calls.

def get_findings( token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None, severity=None, count=False, limit=None):
1507def get_findings(
1508    token,
1509    organization_context,
1510    asset_version_id=None,
1511    finding_id=None,
1512    category=None,
1513    status=None,
1514    severity=None,
1515    count=False,
1516    limit=None,
1517):
1518    """
1519    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1520    Args:
1521        token (str):
1522            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1523        organization_context (str):
1524            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1525        asset_version_id (str, optional):
1526            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1527        finding_id (str, optional):
1528            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1529        category (str, optional):
1530            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1531            This can be a single string, or an array of values.
1532        status (str, optional):
1533            The status of Findings to return.
1534        severity (str, optional):
1535            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1536        count (bool, optional):
1537            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1538        limit (int, optional):
1539            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1540
1541    Raises:
1542        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1543
1544    Returns:
1545        list: List of Finding Objects
1546    """
1547
1548    if limit and limit > 1000:
1549        raise Exception("Error: limit must be less than 1000")
1550    if limit and limit < 1:
1551        raise Exception("Error: limit must be greater than 0")
1552
1553    if count:
1554        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1555                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1556                                                                          finding_id=finding_id, category=category,
1557                                                                          status=status, severity=severity,
1558                                                                          limit=limit))["data"]["_allFindingsMeta"]
1559    else:
1560        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1561                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1562                                                                           finding_id=finding_id, category=category,
1563                                                                           status=status, severity=severity,
1564                                                                           limit=limit), 'allFindings', limit=limit)

Gets all the Findings for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
  • finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
  • category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
  • status (str, optional): The status of Findings to return.
  • severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
  • count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
  • limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Finding Objects

def get_product_asset_versions(token, organization_context, product_id=None):
1567def get_product_asset_versions(token, organization_context, product_id=None):
1568    """
1569    Gets all the asset versions for a product.
1570    Args:
1571        token (str):
1572            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1573        organization_context (str):
1574            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1575        product_id (str, optional):
1576            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1577    Raises:
1578        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1579    Returns:
1580        list: List of AssetVersion Objects
1581    """
1582    if not product_id:
1583        raise Exception("Product ID is required")
1584
1585    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1586                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Gets all the asset versions for a product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of AssetVersion Objects

def get_products( token, organization_context, product_id=None, business_unit_id=None) -> list:
1589def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1590    """
1591    Gets all the products for the specified business unit.
1592    Args:
1593        token (str):
1594            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1595        organization_context (str):
1596            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1597        product_id (str, optional):
1598            Product ID to get. If not provided, will get all products in the organization.
1599        business_unit_id (str, optional):
1600            Business Unit ID to get products for. If not provided, will get all products in the organization.
1601    Raises:
1602        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1603    Returns:
1604        list: List of Product Objects
1605    """
1606
1607    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1608                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1609                                                                       business_unit_id=business_unit_id),
1610                                     'allProducts')

Gets all the products for the specified business unit.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
  • business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Product Objects

def generate_report_download_url( token, organization_context, asset_version_id=None, product_id=None, report_type=None, report_subtype=None, verbose=False) -> str:
1613def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1614                                 report_subtype=None, verbose=False) -> str:
1615    """
1616    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1617    This may take several minutes to complete, depending on the size of the report.
1618
1619    Args:
1620        token (str):
1621            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1622        organization_context (str):
1623            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1624        asset_version_id (str, optional):
1625            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1626        product_id (str, optional):
1627            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1628        report_type (str, required):
1629            The file type of the report to download. Valid values are "CSV" and "PDF".
1630        report_subtype (str, required):
1631            The type of report to download. Based on available reports for the `report_type` specified
1632            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1633            Valid values for PDF are "RISK_SUMMARY".
1634        verbose (bool, optional):
1635            If True, print additional information to the console. Defaults to False.
1636    """
1637    if not report_type:
1638        raise ValueError("Report Type is required")
1639    if not report_subtype:
1640        raise ValueError("Report Subtype is required")
1641    if not asset_version_id and not product_id:
1642        raise ValueError("Asset Version ID or Product ID is required")
1643
1644    if asset_version_id and product_id:
1645        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1646
1647    if report_type not in ["CSV", "PDF"]:
1648        raise Exception(f"Report Type {report_type} not supported")
1649
1650    if report_type == "CSV":
1651        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1652            raise Exception(f"Report Subtype {report_subtype} not supported")
1653
1654        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1655                                                            report_type=report_type, report_subtype=report_subtype)
1656        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1657                                                              report_type=report_type, report_subtype=report_subtype)
1658
1659        response_data = send_graphql_query(token, organization_context, mutation, variables)
1660        if verbose:
1661            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1662
1663        # get exportJobId from the result
1664        if asset_version_id:
1665            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1666        elif product_id:
1667            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1668        else:
1669            raise Exception(
1670                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1671
1672        if verbose:
1673            print(f'Export Job ID: {export_job_id}')
1674
1675    if report_type == "PDF":
1676        if report_subtype not in ["RISK_SUMMARY"]:
1677            raise Exception(f"Report Subtype {report_subtype} not supported")
1678
1679        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1680                                                            report_type=report_type, report_subtype=report_subtype)
1681        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1682                                                              report_type=report_type, report_subtype=report_subtype)
1683
1684        response_data = send_graphql_query(token, organization_context, mutation, variables)
1685        if verbose:
1686            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1687
1688        # get exportJobId from the result
1689        if asset_version_id:
1690            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1691        elif product_id:
1692            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1693        else:
1694            raise Exception(
1695                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1696
1697        if verbose:
1698            print(f'Export Job ID: {export_job_id}')
1699
1700    if not export_job_id:
1701        raise Exception(
1702            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1703
1704    # poll the API until the export job is complete
1705    sleep_time = 10
1706    total_time = 0
1707    if verbose:
1708        print(f'Polling every {sleep_time} seconds for export job to complete')
1709
1710    while True:
1711        time.sleep(sleep_time)
1712        total_time += sleep_time
1713        if verbose:
1714            print(f'Total time elapsed: {total_time} seconds')
1715
1716        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1717        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1718
1719        response_data = send_graphql_query(token, organization_context, query, variables)
1720
1721        if verbose:
1722            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1723
1724        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1725            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1726                if verbose:
1727                    print(
1728                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1729                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to download the report for. Either asset_version_id or product_id are required.
  • product_id (str, optional): Product ID to download the report for. Either asset_version_id or product_id are required.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
def generate_sbom_download_url( token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, verbose=False) -> str:
1732def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1733                               verbose=False) -> str:
1734    """
1735    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1736    This may take several minutes to complete, depending on the size of SBOM.
1737
1738    Args:
1739        token (str):
1740            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1741        organization_context (str):
1742            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1743        sbom_type (str, required):
1744            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1745        sbom_subtype (str, required):
1746            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1747        asset_version_id (str, required):
1748            Asset Version ID to download the SBOM for.
1749        verbose (bool, optional):
1750            If True, print additional information to the console. Defaults to False.
1751
1752    Raises:
1753        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1754        Exception: Raised if the query fails.
1755
1756    Returns:
1757        str: URL to download the SBOM from.
1758    """
1759
1760    if not sbom_type:
1761        raise ValueError("SBOM Type is required")
1762    if not sbom_subtype:
1763        raise ValueError("SBOM Subtype is required")
1764    if not asset_version_id:
1765        raise ValueError("Asset Version ID is required")
1766
1767    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1768        raise Exception(f"SBOM Type {sbom_type} not supported")
1769
1770    if sbom_type == "CYCLONEDX":
1771        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1772            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1773
1774        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1775        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1776
1777        response_data = send_graphql_query(token, organization_context, mutation, variables)
1778        if verbose:
1779            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1780
1781        # get exportJobId from the result
1782        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1783        if verbose:
1784            print(f'Export Job ID: {export_job_id}')
1785
1786    if sbom_type == "SPDX":
1787        if sbom_subtype not in ["SBOM_ONLY"]:
1788            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1789
1790        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1791        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1792
1793        response_data = send_graphql_query(token, organization_context, mutation, variables)
1794        if verbose:
1795            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1796
1797        # get exportJobId from the result
1798        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1799        if verbose:
1800            print(f'Export Job ID: {export_job_id}')
1801
1802    if not export_job_id:
1803        raise Exception(
1804            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1805
1806    # poll the API until the export job is complete
1807    sleep_time = 10
1808    total_time = 0
1809    if verbose:
1810        print(f'Polling every {sleep_time} seconds for export job to complete')
1811    while True:
1812        time.sleep(sleep_time)
1813        total_time += sleep_time
1814        if verbose:
1815            print(f'Total time elapsed: {total_time} seconds')
1816
1817        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1818        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1819
1820        response_data = send_graphql_query(token, organization_context, query, variables)
1821
1822        if verbose:
1823            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1824
1825        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1826            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1827                if verbose:
1828                    print(
1829                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1830                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
  • asset_version_id (str, required): Asset Version ID to download the SBOM for.
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
  • Exception: Raised if the query fails.
Returns:

str: URL to download the SBOM from.

def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1833def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1834    """
1835    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1836    Args:
1837        token (str):
1838            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1839        organization_context (str):
1840            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1841        asset_version_id (str, optional):
1842            Asset Version ID to get software components for.
1843        type (str, optional):
1844            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1845    Raises:
1846        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1847    Returns:
1848        list: List of Software Component Objects
1849    """
1850    if not asset_version_id:
1851        raise Exception("Asset Version ID is required")
1852
1853    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1854                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1855                                                                                  type=type),
1856                                     'allSoftwareComponentInstances')

Gets all the Software Components for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get software components for.
  • type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Software Component Objects

def search_sbom( token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False) -> list:
1859def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1860                case_sensitive=False) -> list:
1861    """
1862    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1863    Search Methods: EXACT or CONTAINS
1864    An exact match will return only the software component whose name matches the name exactly.
1865    A contains match will return all software components whose name contains the search string.
1866
1867    Args:
1868        token (str):
1869            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1870        organization_context (str):
1871            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1872        name (str, required):
1873            Name of the software component to search for.
1874        version (str, optional):
1875            Version of the software component to search for. If not specified, will search for all versions of the software component.
1876        asset_version_id (str, optional):
1877            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1878        search_method (str, optional):
1879            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1880        case_sensitive (bool, optional):
1881            Whether or not to perform a case sensitive search. Defaults to False.
1882    Raises:
1883        ValueError: Raised if name is not provided.
1884        Exception: Raised if the query fails.
1885    Returns:
1886        list: List of SoftwareComponentInstance Objects
1887    """
1888    if asset_version_id:
1889        query = """
1890query GetSoftwareComponentInstances_SDK(
1891    $filter: SoftwareComponentInstanceFilter
1892    $after: String
1893    $first: Int
1894) {
1895    allSoftwareComponentInstances(
1896        filter: $filter
1897        after: $after
1898        first: $first
1899    ) {
1900        _cursor
1901        id
1902        name
1903        version
1904        originalComponents {
1905            id
1906            name
1907            version
1908        }
1909    }
1910}
1911"""
1912    else:
1913        # gets the asset version info that contains the software component
1914        query = """
1915query GetSoftwareComponentInstances_SDK(
1916    $filter: SoftwareComponentInstanceFilter
1917    $after: String
1918    $first: Int
1919) {
1920    allSoftwareComponentInstances(
1921        filter: $filter
1922        after: $after
1923        first: $first
1924    ) {
1925        _cursor
1926        id
1927        name
1928        version
1929        assetVersion {
1930            id
1931            name
1932            asset {
1933                id
1934                name
1935            }
1936        }
1937    }
1938}
1939"""
1940
1941    variables = {
1942        "filter": {
1943            "mergedComponentRefId": None
1944        },
1945        "after": None,
1946        "first": 100
1947    }
1948
1949    if asset_version_id:
1950        variables["filter"]["assetVersionRefId"] = asset_version_id
1951
1952    if search_method == 'EXACT':
1953        if case_sensitive:
1954            variables["filter"]["name"] = name
1955        else:
1956            variables["filter"]["name_like"] = name
1957    elif search_method == 'CONTAINS':
1958        variables["filter"]["name_contains"] = name
1959
1960    if version:
1961        if search_method == 'EXACT':
1962            variables["filter"]["version"] = version
1963        elif search_method == 'CONTAINS':
1964            variables["filter"]["version_contains"] = version
1965
1966    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1967                                        field="allSoftwareComponentInstances")
1968
1969    return records

Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • name (str, required): Name of the software component to search for.
  • version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
  • asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
  • search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
  • case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
  • ValueError: Raised if name is not provided.
  • Exception: Raised if the query fails.
Returns:

list: List of SoftwareComponentInstance Objects

def send_graphql_query(token, organization_context, query, variables=None):
1972def send_graphql_query(token, organization_context, query, variables=None):
1973    """
1974    Send a GraphQL query to the API
1975
1976    Args:
1977        token (str):
1978            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1979        organization_context (str):
1980            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1981        query (str):
1982            The GraphQL query string
1983        variables (dict, optional):
1984            Variables to be used in the GraphQL query, by default None
1985
1986    Raises:
1987        Exception: If the response status code is not 200
1988
1989    Returns:
1990        dict: Response JSON
1991    """
1992    headers = {
1993        'Content-Type': 'application/json',
1994        'Authorization': f'Bearer {token}',
1995        'Organization-Context': organization_context
1996    }
1997    data = {
1998        'query': query,
1999        'variables': variables
2000    }
2001
2002    response = requests.post(API_URL, headers=headers, json=data)
2003
2004    if response.status_code == 200:
2005        thejson = response.json()
2006
2007        if "errors" in thejson:
2008            raise Exception(f"Error: {thejson['errors']}")
2009
2010        return thejson
2011    else:
2012        raise Exception(f"Error: {response.status_code} - {response.text}")

Send a GraphQL query to the API

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
  • Exception: If the response status code is not 200
Returns:

dict: Response JSON

def update_finding_statuses( token, organization_context, user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
2015def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2016                            justification=None, response=None, comment=None):
2017    """
2018    Updates the status of a findings or multiple findings. This is a blocking call.
2019
2020    Args:
2021        token (str):
2022            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2023        organization_context (str):
2024            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2025        user_id (str, required):
2026            User ID to update the finding status for.
2027        finding_ids (str, required):
2028            Finding ID to update the status for.
2029        status (str, required):
2030            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2031        justification (str, optional):
2032            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2033        response (str, optional):
2034            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2035        comment (str, optional):
2036            Optional comment to add to the finding status update.
2037
2038    Raises:
2039        ValueError: Raised if required parameters are not provided.
2040        Exception: Raised if the query fails.
2041
2042    Returns:
2043        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2044    """
2045    if not user_id:
2046        raise ValueError("User Id is required")
2047    if not finding_ids:
2048        raise ValueError("Finding Ids is required")
2049    if not status:
2050        raise ValueError("Status is required")
2051
2052    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2053    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2054                                                             justification=justification, response=response,
2055                                                             comment=comment)
2056
2057    return send_graphql_query(token, organization_context, mutation, variables)

Updates the status of a findings or multiple findings. This is a blocking call.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • user_id (str, required): User ID to update the finding status for.
  • finding_ids (str, required): Finding ID to update the status for.
  • status (str, required): Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
  • justification (str, optional): Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
  • response (str, optional): Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
  • comment (str, optional): Optional comment to add to the finding status update.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response

def upload_file_for_binary_analysis( token, organization_context, test_id=None, file_path=None, chunk_size=1048576000, quick_scan=False, enable_bandit_scan: bool = False):
2060def upload_file_for_binary_analysis(
2061    token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False, enable_bandit_scan: bool = False
2062):
2063    """
2064    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk.
2065    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2066
2067    Args:
2068        token (str):
2069            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2070        organization_context (str):
2071            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2072        test_id (str, required):
2073            Test ID to upload the file for.
2074        file_path (str, required):
2075            Local path to the file to upload.
2076        chunk_size (int, optional):
2077            The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
2078        quick_scan (bool, optional):
2079            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2080        enable_bandit_scan (bool, optional):
2081            If True, will create an additional bandit scan in addition to the default binary analysis scan.
2082
2083    Raises:
2084        ValueError: Raised if test_id or file_path are not provided.
2085        Exception: Raised if the query fails.
2086
2087    Returns:
2088        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2089    """
2090    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2091    if not test_id:
2092        raise ValueError("Test Id is required")
2093    if not file_path:
2094        raise ValueError("File Path is required")
2095    if chunk_size < MIN_CHUNK_SIZE:
2096        raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes")
2097    if chunk_size >= MAX_CHUNK_SIZE:
2098        raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes")
2099
2100    # Start Multi-part Upload
2101    graphql_query = """
2102    mutation Start_SDK($testId: ID!) {
2103        startMultipartUploadV2(testId: $testId) {
2104            uploadId
2105            key
2106        }
2107    }
2108    """
2109
2110    variables = {
2111        "testId": test_id
2112    }
2113
2114    response = send_graphql_query(token, organization_context, graphql_query, variables)
2115
2116    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2117    upload_key = response['data']['startMultipartUploadV2']['key']
2118
2119    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2120    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2121    # and upload the file to the returned upload URL
2122    i = 0
2123    part_data = []
2124    for chunk in file_chunks(file_path, chunk_size):
2125        i = i + 1
2126        graphql_query = """
2127        mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2128            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2129                key
2130                uploadUrl
2131            }
2132        }
2133        """
2134
2135        variables = {
2136            "partNumber": i,
2137            "uploadId": upload_id,
2138            "uploadKey": upload_key
2139        }
2140
2141        response = send_graphql_query(token, organization_context, graphql_query, variables)
2142
2143        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2144
2145        # upload the chunk to the upload URL
2146        response = upload_bytes_to_url(chunk_upload_url, chunk)
2147
2148        part_data.append({
2149            "ETag": response.headers['ETag'],
2150            "PartNumber": i
2151        })
2152
2153    # call completeMultipartUploadV2
2154    graphql_query = """
2155    mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2156        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2157            key
2158        }
2159    }
2160    """
2161
2162    variables = {
2163        "partData": part_data,
2164        "uploadId": upload_id,
2165        "uploadKey": upload_key
2166    }
2167
2168    response = send_graphql_query(token, organization_context, graphql_query, variables)
2169
2170    # get key from the result
2171    key = response['data']['completeMultipartUploadV2']['key']
2172
2173    variables = {
2174        "key": key,
2175        "testId": test_id
2176    }
2177
2178    # call launchBinaryUploadProcessing
2179    if quick_scan or enable_bandit_scan:
2180        graphql_query = """
2181        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2182            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2183                key
2184                newBanditScanId
2185            }
2186        }
2187        """
2188    else:
2189        graphql_query = """
2190        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) {
2191            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2192                key
2193                newBanditScanId
2194            }
2195        }
2196        """
2197
2198    if quick_scan:
2199        variables["configurationOptions"] = ["QUICK_SCAN"]
2200
2201    if enable_bandit_scan:
2202        config_options = variables.get("configurationOptions", [])
2203        config_options.append("ENABLE_BANDIT_SCAN")
2204        variables["configurationOptions"] = config_options
2205
2206    response = send_graphql_query(token, organization_context, graphql_query, variables)
2207
2208    return response['data']

Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • test_id (str, required): Test ID to upload the file for.
  • file_path (str, required): Local path to the file to upload.
  • chunk_size (int, optional): The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
  • quick_scan (bool, optional): If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
  • enable_bandit_scan (bool, optional): If True, will create an additional bandit scan in addition to the default binary analysis scan.
Raises:
  • ValueError: Raised if test_id or file_path are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: The response from the GraphQL query, a completeMultipartUpload Object.

def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2211def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2212    """
2213    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2214
2215    Args:
2216        token (str):
2217            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2218        organization_context (str):
2219            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2220        test_id (str, required):
2221            Test ID to upload the file for.
2222        file_path (str, required):
2223            Local path to the file to upload.
2224
2225    Raises:
2226        ValueError: Raised if test_id or file_path are not provided.
2227        Exception: Raised if the query fails.
2228
2229    Returns:
2230        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2231    """
2232    if not test_id:
2233        raise ValueError("Test Id is required")
2234    if not file_path:
2235        raise ValueError("File Path is required")
2236
2237    # Gerneate Test Result Upload URL
2238    graphql_query = """
2239    mutation GenerateTestResultUploadUrl_SDK($testId: ID!) {
2240        generateSinglePartUploadUrl(testId: $testId) {
2241            uploadUrl
2242            key
2243        }
2244    }
2245    """
2246
2247    variables = {
2248        "testId": test_id
2249    }
2250
2251    response = send_graphql_query(token, organization_context, graphql_query, variables)
2252
2253    # get the upload URL and key
2254    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2255    key = response['data']['generateSinglePartUploadUrl']['key']
2256
2257    # upload the file
2258    upload_file_to_url(upload_url, file_path)
2259
2260    # complete the upload
2261    graphql_query = """
2262    mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) {
2263        launchTestResultProcessing(key: $key, testId: $testId) {
2264            key
2265        }
2266    }
2267    """
2268
2269    variables = {
2270        "testId": test_id,
2271        "key": key
2272    }
2273
2274    response = send_graphql_query(token, organization_context, graphql_query, variables)
2275    return response['data']

Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • test_id (str, required): Test ID to upload the file for.
  • file_path (str, required): Local path to the file to upload.
Raises:
  • ValueError: Raised if test_id or file_path are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: The response from the GraphQL query, a completeTestResultUpload Object.

def upload_bytes_to_url(url, bytes):
2278def upload_bytes_to_url(url, bytes):
2279    """
2280    Used for uploading a file to a pre-signed S3 URL
2281
2282    Args:
2283        url (str):
2284            (Pre-signed S3) URL
2285        bytes (bytes):
2286            Bytes to upload
2287
2288    Raises:
2289        Exception: If the response status code is not 200
2290
2291    Returns:
2292        requests.Response: Response object
2293    """
2294    response = requests.put(url, data=bytes)
2295
2296    if response.status_code == 200:
2297        return response
2298    else:
2299        raise Exception(f"Error: {response.status_code} - {response.text}")

Used for uploading a file to a pre-signed S3 URL

Arguments:
  • url (str): (Pre-signed S3) URL
  • bytes (bytes): Bytes to upload
Raises:
  • Exception: If the response status code is not 200
Returns:

requests.Response: Response object

def upload_file_to_url(url, file_path):
2302def upload_file_to_url(url, file_path):
2303    """
2304    Used for uploading a file to a pre-signed S3 URL
2305
2306    Args:
2307        url (str):
2308            (Pre-signed S3) URL
2309        file_path (str):
2310            Local path to file to upload
2311
2312    Raises:
2313        Exception: If the response status code is not 200
2314
2315    Returns:
2316        requests.Response: Response object
2317    """
2318    with open(file_path, 'rb') as file:
2319        response = requests.put(url, data=file)
2320
2321    if response.status_code == 200:
2322        return response
2323    else:
2324        raise Exception(f"Error: {response.status_code} - {response.text}")

Used for uploading a file to a pre-signed S3 URL

Arguments:
  • url (str): (Pre-signed S3) URL
  • file_path (str): Local path to file to upload
Raises:
  • Exception: If the response status code is not 200
Returns:

requests.Response: Response object