finite_state_sdk

   1import json
   2from enum import Enum
   3
   4import requests
   5import time
   6from warnings import warn
   7import finite_state_sdk.queries as queries
   8
   9API_URL = 'https://platform.finitestate.io/api/v1/graphql'
  10AUDIENCE = "https://platform.finitestate.io/api/v1/graphql"
  11TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token"
  12
  13"""
  14DEFAULT CHUNK SIZE: 1000 MiB
  15"""
  16DEFAULT_CHUNK_SIZE = 1024**2 * 1000
  17"""
  18MAX CHUNK SIZE: 2 GiB
  19"""
  20MAX_CHUNK_SIZE = 1024**2 * 2000
  21"""
  22MIN CHUNK SIZE: 5 MiB
  23"""
  24MIN_CHUNK_SIZE = 1024**2 * 5
  25
  26
  27class UploadMethod(Enum):
  28    """
  29    Enumeration class representing different upload methods.
  30
  31    Attributes:
  32        WEB_APP_UI: Upload method via web application UI.
  33        API: Upload method via API.
  34        GITHUB_INTEGRATION: Upload method via GitHub integration.
  35        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
  36
  37    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
  38    """
  39    WEB_APP_UI = "WEB_APP_UI"
  40    API = "API"
  41    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
  42    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
  43
  44
  45def create_artifact(
  46    token,
  47    organization_context,
  48    business_unit_id=None,
  49    created_by_user_id=None,
  50    asset_version_id=None,
  51    artifact_name=None,
  52    product_id=None,
  53):
  54    """
  55    Create a new Artifact.
  56    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
  57    Please see the examples in the Github repository for more information:
  58    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
  59    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
  60
  61    Args:
  62        token (str):
  63            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  64        organization_context (str):
  65            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  66        business_unit_id (str, required):
  67            Business Unit ID to associate the artifact with.
  68        created_by_user_id (str, required):
  69            User ID of the user creating the artifact.
  70        asset_version_id (str, required):
  71            Asset Version ID to associate the artifact with.
  72        artifact_name (str, required):
  73            The name of the Artifact being created.
  74        product_id (str, optional):
  75            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
  76
  77    Raises:
  78        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  79        Exception: Raised if the query fails.
  80
  81    Returns:
  82        dict: createArtifact Object
  83    """
  84    if not business_unit_id:
  85        raise ValueError("Business unit ID is required")
  86    if not created_by_user_id:
  87        raise ValueError("Created by user ID is required")
  88    if not asset_version_id:
  89        raise ValueError("Asset version ID is required")
  90    if not artifact_name:
  91        raise ValueError("Artifact name is required")
  92
  93    graphql_query = """
  94    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
  95        createArtifact(input: $input) {
  96            id
  97            name
  98            assetVersion {
  99                id
 100                name
 101                asset {
 102                    id
 103                    name
 104                }
 105            }
 106            createdBy {
 107                id
 108                email
 109            }
 110            ctx {
 111                asset
 112                products
 113                businessUnits
 114            }
 115        }
 116    }
 117    """
 118
 119    # Asset name, business unit context, and creating user are required
 120    variables = {
 121        "input": {
 122            "name": artifact_name,
 123            "createdBy": created_by_user_id,
 124            "assetVersion": asset_version_id,
 125            "ctx": {
 126                "asset": asset_version_id,
 127                "businessUnits": [business_unit_id]
 128            }
 129        }
 130    }
 131
 132    if product_id is not None:
 133        variables["input"]["ctx"]["products"] = product_id
 134
 135    response = send_graphql_query(token, organization_context, graphql_query, variables)
 136    return response['data']
 137
 138
 139def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
 140    """
 141    Create a new Asset.
 142
 143    Args:
 144        token (str):
 145            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 146        organization_context (str):
 147            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 148        business_unit_id (str, required):
 149            Business Unit ID to associate the asset with.
 150        created_by_user_id (str, required):
 151            User ID of the user creating the asset.
 152        asset_name (str, required):
 153            The name of the Asset being created.
 154        product_id (str, optional):
 155            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
 156
 157    Raises:
 158        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
 159        Exception: Raised if the query fails.
 160
 161    Returns:
 162        dict: createAsset Object
 163    """
 164    if not business_unit_id:
 165        raise ValueError("Business unit ID is required")
 166    if not created_by_user_id:
 167        raise ValueError("Created by user ID is required")
 168    if not asset_name:
 169        raise ValueError("Asset name is required")
 170
 171    graphql_query = """
 172    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
 173        createAsset(input: $input) {
 174            id
 175            name
 176            dependentProducts {
 177                id
 178                name
 179            }
 180            group {
 181                id
 182                name
 183            }
 184            createdBy {
 185                id
 186                email
 187            }
 188            ctx {
 189                asset
 190                products
 191                businessUnits
 192            }
 193        }
 194    }
 195    """
 196
 197    # Asset name, business unit context, and creating user are required
 198    variables = {
 199        "input": {
 200            "name": asset_name,
 201            "group": business_unit_id,
 202            "createdBy": created_by_user_id,
 203            "ctx": {
 204                "businessUnits": [business_unit_id]
 205            }
 206        }
 207    }
 208
 209    if product_id is not None:
 210        variables["input"]["ctx"]["products"] = product_id
 211
 212    response = send_graphql_query(token, organization_context, graphql_query, variables)
 213    return response['data']
 214
 215
 216def create_asset_version(
 217    token,
 218    organization_context,
 219    business_unit_id=None,
 220    created_by_user_id=None,
 221    asset_id=None,
 222    asset_version_name=None,
 223    product_id=None,
 224):
 225    """
 226    Create a new Asset Version.
 227
 228    Args:
 229        token (str):
 230            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 231        organization_context (str):
 232            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 233        business_unit_id (str, required):
 234            Business Unit ID to associate the asset version with.
 235        created_by_user_id (str, required):
 236            User ID of the user creating the asset version.
 237        asset_id (str, required):
 238            Asset ID to associate the asset version with.
 239        asset_version_name (str, required):
 240            The name of the Asset Version being created.
 241        product_id (str, optional):
 242            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
 243
 244    Raises:
 245        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 246        Exception: Raised if the query fails.
 247
 248    Returns:
 249        dict: createAssetVersion Object
 250
 251    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
 252    """
 253    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
 254    if not business_unit_id:
 255        raise ValueError("Business unit ID is required")
 256    if not created_by_user_id:
 257        raise ValueError("Created by user ID is required")
 258    if not asset_id:
 259        raise ValueError("Asset ID is required")
 260    if not asset_version_name:
 261        raise ValueError("Asset version name is required")
 262
 263    graphql_query = """
 264    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
 265        createAssetVersion(input: $input) {
 266            id
 267            name
 268            asset {
 269                id
 270                name
 271            }
 272            createdBy {
 273                id
 274                email
 275            }
 276            ctx {
 277                asset
 278                products
 279                businessUnits
 280            }
 281        }
 282    }
 283    """
 284
 285    # Asset name, business unit context, and creating user are required
 286    variables = {
 287        "input": {
 288            "name": asset_version_name,
 289            "createdBy": created_by_user_id,
 290            "asset": asset_id,
 291            "ctx": {
 292                "asset": asset_id,
 293                "businessUnits": [business_unit_id]
 294            }
 295        }
 296    }
 297
 298    if product_id is not None:
 299        variables["input"]["ctx"]["products"] = product_id
 300
 301    response = send_graphql_query(token, organization_context, graphql_query, variables)
 302    return response['data']
 303
 304
 305def create_asset_version_on_asset(
 306    token,
 307    organization_context,
 308    created_by_user_id=None,
 309    asset_id=None,
 310    asset_version_name=None,
 311):
 312    """
 313    Create a new Asset Version.
 314
 315    Args:
 316        token (str):
 317            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 318        organization_context (str):
 319            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 320        created_by_user_id (str, optional):
 321            User ID of the user creating the asset version.
 322        asset_id (str, required):
 323            Asset ID to associate the asset version with.
 324        asset_version_name (str, required):
 325            The name of the Asset Version being created.
 326
 327    Raises:
 328        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 329        Exception: Raised if the query fails.
 330
 331    Returns:
 332        dict: createAssetVersion Object
 333    """
 334    if not asset_id:
 335        raise ValueError("Asset ID is required")
 336    if not asset_version_name:
 337        raise ValueError("Asset version name is required")
 338
 339    graphql_query = """
 340        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) {
 341            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) {
 342                id
 343                assetVersion {
 344                    id
 345                }
 346            }
 347        }
 348    """
 349
 350    # Asset name, business unit context, and creating user are required
 351    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
 352
 353    if created_by_user_id:
 354        variables["createdByUserId"] = created_by_user_id
 355
 356    response = send_graphql_query(token, organization_context, graphql_query, variables)
 357    return response['data']
 358
 359
 360def create_new_asset_version_artifact_and_test_for_upload(
 361    token,
 362    organization_context,
 363    business_unit_id=None,
 364    created_by_user_id=None,
 365    asset_id=None,
 366    version=None,
 367    product_id=None,
 368    test_type=None,
 369    artifact_description=None,
 370    upload_method: UploadMethod = UploadMethod.API,
 371):
 372    """
 373    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
 374    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
 375
 376    Args:
 377        token (str):
 378            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 379        organization_context (str):
 380            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 381        business_unit_id (str, optional):
 382            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
 383        created_by_user_id (str, optional):
 384            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
 385        asset_id (str, required):
 386            Asset ID to create the asset version for. If not provided, the default asset will be used.
 387        version (str, required):
 388            Version to create the asset version for.
 389        product_id (str, optional):
 390            Product ID to create the entities for. If not provided, the default product will be used.
 391        test_type (str, required):
 392            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
 393        artifact_description (str, optional):
 394            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
 395        upload_method (UploadMethod, optional):
 396            The method of uploading the test results. Default is UploadMethod.API.
 397
 398
 399    Raises:
 400        ValueError: Raised if asset_id or version are not provided.
 401        Exception: Raised if the query fails.
 402
 403    Returns:
 404        str: The Test ID of the newly created test that is used for uploading the file.
 405    """
 406    if not asset_id:
 407        raise ValueError("Asset ID is required")
 408    if not version:
 409        raise ValueError("Version is required")
 410
 411    assets = get_all_assets(token, organization_context, asset_id=asset_id)
 412    asset = assets[0]
 413
 414    # get the asset name
 415    asset_name = asset['name']
 416
 417    # get the existing asset product IDs
 418    asset_product_ids = asset['ctx']['products']
 419
 420    # get the asset product ID
 421    if product_id and product_id not in asset_product_ids:
 422        asset_product_ids.append(product_id)
 423
 424    # if business_unit_id or created_by_user_id are not provided, get the existing asset
 425    if not business_unit_id or not created_by_user_id:
 426        if not business_unit_id:
 427            business_unit_id = asset['group']['id']
 428        if not created_by_user_id:
 429            created_by_user_id = asset['createdBy']['id']
 430
 431        if not business_unit_id:
 432            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
 433        if not created_by_user_id:
 434            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
 435
 436    # create the asset version
 437    response = create_asset_version_on_asset(
 438        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version
 439    )
 440    # get the asset version ID
 441    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
 442
 443    # create the test
 444    if test_type == "finite_state_binary_analysis":
 445        # create the artifact
 446        if not artifact_description:
 447            artifact_description = "Binary"
 448        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
 449        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 450                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 451                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
 452
 453        # get the artifact ID
 454        binary_artifact_id = response['createArtifact']['id']
 455
 456        # create the test
 457        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
 458        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
 459                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
 460                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
 461                                                  test_name=test_name, upload_method=upload_method)
 462        test_id = response['createTest']['id']
 463        return test_id
 464
 465    else:
 466        # create the artifact
 467        if not artifact_description:
 468            artifact_description = "Unspecified Artifact"
 469        artifact_name = f"{asset_name} {version} - {artifact_description}"
 470        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 471                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 472                                   artifact_name=artifact_name, product_id=asset_product_ids)
 473
 474        # get the artifact ID
 475        binary_artifact_id = response['createArtifact']['id']
 476
 477        # create the test
 478        test_name = f"{asset_name} {version} - {test_type}"
 479        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
 480                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
 481                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
 482                                                      test_name=test_name, test_type=test_type,
 483                                                      upload_method=upload_method)
 484        test_id = response['createTest']['id']
 485        return test_id
 486
 487
 488def create_new_asset_version_and_upload_binary(
 489    token,
 490    organization_context,
 491    business_unit_id=None,
 492    created_by_user_id=None,
 493    asset_id=None,
 494    version=None,
 495    file_path=None,
 496    product_id=None,
 497    artifact_description=None,
 498    quick_scan=False,
 499    upload_method: UploadMethod = UploadMethod.API,
 500):
 501    """
 502    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
 503    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 504
 505    Args:
 506        token (str):
 507            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 508        organization_context (str):
 509            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 510        business_unit_id (str, optional):
 511            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 512        created_by_user_id (str, optional):
 513            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 514        asset_id (str, required):
 515            Asset ID to create the asset version for.
 516        version (str, required):
 517            Version to create the asset version for.
 518        file_path (str, required):
 519            Local path to the file to upload.
 520        product_id (str, optional):
 521            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
 522        artifact_description (str, optional):
 523            Description of the artifact. If not provided, the default is "Firmware Binary".
 524        quick_scan (bool, optional):
 525            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
 526        upload_method (UploadMethod, optional):
 527            The method of uploading the test results. Default is UploadMethod.API.
 528
 529    Raises:
 530        ValueError: Raised if asset_id, version, or file_path are not provided.
 531        Exception: Raised if any of the queries fail.
 532
 533    Returns:
 534        dict: The response from the GraphQL query, a createAssetVersion Object.
 535    """
 536    if not asset_id:
 537        raise ValueError("Asset ID is required")
 538    if not version:
 539        raise ValueError("Version is required")
 540    if not file_path:
 541        raise ValueError("File path is required")
 542
 543    # create the asset version and binary test
 544    if not artifact_description:
 545        artifact_description = "Firmware Binary"
 546    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
 547        token,
 548        organization_context,
 549        business_unit_id=business_unit_id,
 550        created_by_user_id=created_by_user_id,
 551        asset_id=asset_id,
 552        version=version,
 553        product_id=product_id,
 554        test_type="finite_state_binary_analysis",
 555        artifact_description=artifact_description,
 556        upload_method=upload_method,
 557    )
 558
 559    # upload file for binary test
 560    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
 561                                               quick_scan=quick_scan)
 562    return response
 563
 564
 565def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
 566                                                     created_by_user_id=None, asset_id=None, version=None,
 567                                                     file_path=None, product_id=None, test_type=None,
 568                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
 569    """
 570    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
 571    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 572
 573    Args:
 574        token (str):
 575            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 576        organization_context (str):
 577            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 578        business_unit_id (str, optional):
 579            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 580        created_by_user_id (str, optional):
 581            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 582        asset_id (str, required):
 583            Asset ID to create the asset version for.
 584        version (str, required):
 585            Version to create the asset version for.
 586        file_path (str, required):
 587            Path to the test results file to upload.
 588        product_id (str, optional):
 589            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
 590        test_type (str, required):
 591            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
 592        artifact_description (str, optional):
 593            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
 594        upload_method (UploadMethod, optional):
 595            The method of uploading the test results. Default is UploadMethod.API.
 596
 597    Raises:
 598        ValueError: If the asset_id, version, or file_path are not provided.
 599        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
 600
 601    Returns:
 602        dict: The response from the GraphQL query, a createAssetVersion Object.
 603    """
 604    if not asset_id:
 605        raise ValueError("Asset ID is required")
 606    if not version:
 607        raise ValueError("Version is required")
 608    if not file_path:
 609        raise ValueError("File path is required")
 610    if not test_type:
 611        raise ValueError("Test type is required")
 612
 613    # create the asset version and test
 614    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
 615                                                                    business_unit_id=business_unit_id,
 616                                                                    created_by_user_id=created_by_user_id,
 617                                                                    asset_id=asset_id, version=version,
 618                                                                    product_id=product_id, test_type=test_type,
 619                                                                    artifact_description=artifact_description,
 620                                                                    upload_method=upload_method)
 621
 622    # upload test results file
 623    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
 624    return response
 625
 626
 627def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
 628                   product_description=None, vendor_id=None, vendor_name=None):
 629    """
 630    Create a new Product.
 631
 632    Args:
 633        token (str):
 634            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 635        organization_context (str):
 636            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 637        business_unit_id (str, required):
 638            Business Unit ID to associate the product with.
 639        created_by_user_id (str, required):
 640            User ID of the user creating the product.
 641        product_name (str, required):
 642            The name of the Product being created.
 643        product_description (str, optional):
 644            The description of the Product being created.
 645        vendor_id (str, optional):
 646            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
 647        vendor_name (str, optional):
 648            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
 649
 650    Raises:
 651        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
 652        Exception: Raised if the query fails.
 653
 654    Returns:
 655        dict: createProduct Object
 656    """
 657
 658    if not business_unit_id:
 659        raise ValueError("Business unit ID is required")
 660    if not created_by_user_id:
 661        raise ValueError("Created by user ID is required")
 662    if not product_name:
 663        raise ValueError("Product name is required")
 664
 665    graphql_query = """
 666    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
 667        createProduct(input: $input) {
 668            id
 669            name
 670            vendor {
 671                name
 672            }
 673            group {
 674                id
 675                name
 676            }
 677            createdBy {
 678                id
 679                email
 680            }
 681            ctx {
 682                businessUnit
 683            }
 684        }
 685    }
 686    """
 687
 688    # Product name, business unit context, and creating user are required
 689    variables = {
 690        "input": {
 691            "name": product_name,
 692            "group": business_unit_id,
 693            "createdBy": created_by_user_id,
 694            "ctx": {
 695                "businessUnit": business_unit_id
 696            }
 697        }
 698    }
 699
 700    if product_description is not None:
 701        variables["input"]["description"] = product_description
 702
 703    # If the vendor ID is specified, this will link the new product to the existing vendor
 704    if vendor_id is not None:
 705        variables["input"]["vendor"] = {
 706            "id": vendor_id
 707        }
 708
 709    # If the vendor name is specified, this will create a new vendor and link it to the new product
 710    if vendor_name is not None:
 711        variables["input"]["createVendor"] = {
 712            "name": vendor_name
 713        }
 714
 715    response = send_graphql_query(token, organization_context, graphql_query, variables)
 716
 717    return response['data']
 718
 719
 720def create_test(
 721    token,
 722    organization_context,
 723    business_unit_id=None,
 724    created_by_user_id=None,
 725    asset_id=None,
 726    artifact_id=None,
 727    test_name=None,
 728    product_id=None,
 729    test_type=None,
 730    tools=[],
 731    upload_method: UploadMethod = UploadMethod.API,
 732):
 733    """
 734    Create a new Test object for uploading files.
 735    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 736    Please see the examples in the Github repository for more information:
 737    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 738    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 739
 740    Args:
 741        token (str):
 742            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 743        organization_context (str):
 744            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 745        business_unit_id (str, required):
 746            Business Unit ID to associate the Test with.
 747        created_by_user_id (str, required):
 748            User ID of the user creating the Test.
 749        asset_id (str, required):
 750            Asset ID to associate the Test with.
 751        artifact_id (str, required):
 752            Artifact ID to associate the Test with.
 753        test_name (str, required):
 754            The name of the Test being created.
 755        product_id (str, optional):
 756            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 757        test_type (str, required):
 758            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
 759        tools (list, optional):
 760            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
 761        upload_method (UploadMethod, required):
 762            The method of uploading the test results.
 763
 764    Raises:
 765        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
 766        Exception: Raised if the query fails.
 767
 768    Returns:
 769        dict: createTest Object
 770    """
 771    if not business_unit_id:
 772        raise ValueError("Business unit ID is required")
 773    if not created_by_user_id:
 774        raise ValueError("Created by user ID is required")
 775    if not asset_id:
 776        raise ValueError("Asset ID is required")
 777    if not artifact_id:
 778        raise ValueError("Artifact ID is required")
 779    if not test_name:
 780        raise ValueError("Test name is required")
 781    if not test_type:
 782        raise ValueError("Test type is required")
 783
 784    graphql_query = """
 785    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
 786        createTest(input: $input) {
 787            id
 788            name
 789            artifactUnderTest {
 790                id
 791                name
 792                assetVersion {
 793                    id
 794                    name
 795                    asset {
 796                        id
 797                        name
 798                        dependentProducts {
 799                            id
 800                            name
 801                        }
 802                    }
 803                }
 804            }
 805            createdBy {
 806                id
 807                email
 808            }
 809            ctx {
 810                asset
 811                products
 812                businessUnits
 813            }
 814            uploadMethod
 815        }
 816    }
 817    """
 818
 819    # Asset name, business unit context, and creating user are required
 820    variables = {
 821        "input": {
 822            "name": test_name,
 823            "createdBy": created_by_user_id,
 824            "artifactUnderTest": artifact_id,
 825            "testResultFileFormat": test_type,
 826            "ctx": {
 827                "asset": asset_id,
 828                "businessUnits": [business_unit_id]
 829            },
 830            "tools": tools,
 831            "uploadMethod": upload_method.value
 832        }
 833    }
 834
 835    if product_id is not None:
 836        variables["input"]["ctx"]["products"] = product_id
 837
 838    response = send_graphql_query(token, organization_context, graphql_query, variables)
 839    return response['data']
 840
 841
 842def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
 843                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
 844                                   upload_method: UploadMethod = UploadMethod.API):
 845    """
 846    Create a new Test object for uploading files for Finite State Binary Analysis.
 847
 848    Args:
 849        token (str):
 850            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 851        organization_context (str):
 852            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 853        business_unit_id (str, required):
 854            Business Unit ID to associate the Test with.
 855        created_by_user_id (str, required):
 856            User ID of the user creating the Test.
 857        asset_id (str, required):
 858            Asset ID to associate the Test with.
 859        artifact_id (str, required):
 860            Artifact ID to associate the Test with.
 861        test_name (str, required):
 862            The name of the Test being created.
 863        product_id (str, optional):
 864            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 865        upload_method (UploadMethod, optional):
 866            The method of uploading the test results. Default is UploadMethod.API.
 867
 868    Raises:
 869        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 870        Exception: Raised if the query fails.
 871
 872    Returns:
 873        dict: createTest Object
 874    """
 875    tools = [
 876        {
 877            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
 878            "name": "Finite State Binary Analysis"
 879        }
 880    ]
 881    return create_test(token, organization_context, business_unit_id=business_unit_id,
 882                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 883                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
 884                       tools=tools, upload_method=upload_method)
 885
 886
 887def create_test_as_cyclone_dx(
 888    token,
 889    organization_context,
 890    business_unit_id=None,
 891    created_by_user_id=None,
 892    asset_id=None,
 893    artifact_id=None,
 894    test_name=None,
 895    product_id=None,
 896    upload_method: UploadMethod = UploadMethod.API,
 897):
 898    """
 899    Create a new Test object for uploading CycloneDX files.
 900
 901    Args:
 902        token (str):
 903            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 904        organization_context (str):
 905            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 906        business_unit_id (str, required):
 907            Business Unit ID to associate the Test with.
 908        created_by_user_id (str, required):
 909            User ID of the user creating the Test.
 910        asset_id (str, required):
 911            Asset ID to associate the Test with.
 912        artifact_id (str, required):
 913            Artifact ID to associate the Test with.
 914        test_name (str, required):
 915            The name of the Test being created.
 916        product_id (str, optional):
 917            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 918        upload_method (UploadMethod, optional):
 919            The method of uploading the test results. Default is UploadMethod.API.
 920
 921    Raises:
 922        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 923        Exception: Raised if the query fails.
 924
 925    Returns:
 926        dict: createTest Object
 927    """
 928    return create_test(token, organization_context, business_unit_id=business_unit_id,
 929                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 930                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
 931
 932
 933def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
 934                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
 935                                       upload_method: UploadMethod = UploadMethod.API):
 936    """
 937    Create a new Test object for uploading Third Party Scanner files.
 938
 939    Args:
 940        token (str):
 941            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 942        organization_context (str):
 943            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 944        business_unit_id (str, required):
 945            Business Unit ID to associate the Test with.
 946        created_by_user_id (str, required):
 947            User ID of the user creating the Test.
 948        asset_id (str, required):
 949            Asset ID to associate the Test with.
 950        artifact_id (str, required):
 951            Artifact ID to associate the Test with.
 952        test_name (str, required):
 953            The name of the Test being created.
 954        product_id (str, optional):
 955            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 956        test_type (str, required):
 957            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
 958        upload_method (UploadMethod, optional):
 959            The method of uploading the test results. Default is UploadMethod.API.
 960
 961    Raises:
 962        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 963        Exception: Raised if the query fails.
 964
 965    Returns:
 966        dict: createTest Object
 967    """
 968    return create_test(token, organization_context, business_unit_id=business_unit_id,
 969                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 970                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
 971
 972
 973def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 974                                  report_subtype=None, output_filename=None, verbose=False):
 975    """
 976    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 977
 978    Args:
 979        token (str):
 980            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 981        organization_context (str):
 982            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 983        asset_version_id (str, required):
 984            The Asset Version ID to download the report for.
 985        report_type (str, required):
 986            The file type of the report to download. Valid values are "CSV" and "PDF".
 987        report_subtype (str, required):
 988            The type of report to download. Based on available reports for the `report_type` specified
 989            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
 990            Valid values for PDF are "RISK_SUMMARY".
 991        output_filename (str, optional):
 992            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
 993        verbose (bool, optional):
 994            If True, will print additional information to the console. Defaults to False.
 995
 996    Raises:
 997        ValueError: Raised if required parameters are not provided.
 998        Exception: Raised if the query fails.
 999
1000    Returns:
1001        None
1002    """
1003    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1004                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1005
1006    # Send an HTTP GET request to the URL
1007    response = requests.get(url)
1008
1009    # Check if the request was successful (status code 200)
1010    if response.status_code == 200:
1011        # Open a local file in binary write mode and write the content to it
1012        if verbose:
1013            print("File downloaded successfully.")
1014        with open(output_filename, 'wb') as file:
1015            file.write(response.content)
1016            if verbose:
1017                print(f'Wrote file to {output_filename}')
1018    else:
1019        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1020
1021
1022def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1023                            output_filename=None, verbose=False):
1024    """
1025    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1026
1027    Args:
1028        token (str):
1029            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1030        organization_context (str):
1031            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1032        product_id (str, required):
1033            The Product ID to download the report for.
1034        report_type (str, required):
1035            The file type of the report to download. Valid values are "CSV".
1036        report_subtype (str, required):
1037            The type of report to download. Based on available reports for the `report_type` specified
1038            Valid values for CSV are "ALL_FINDINGS".
1039        output_filename (str, optional):
1040            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1041        verbose (bool, optional):
1042            If True, will print additional information to the console. Defaults to False.
1043    """
1044    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1045                                       report_subtype=report_subtype, verbose=verbose)
1046
1047    # Send an HTTP GET request to the URL
1048    response = requests.get(url)
1049
1050    # Check if the request was successful (status code 200)
1051    if response.status_code == 200:
1052        # Open a local file in binary write mode and write the content to it
1053        if verbose:
1054            print("File downloaded successfully.")
1055        with open(output_filename, 'wb') as file:
1056            file.write(response.content)
1057            if verbose:
1058                print(f'Wrote file to {output_filename}')
1059    else:
1060        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1061
1062
1063def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1064                  output_filename="sbom.json", verbose=False):
1065    """
1066    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1067
1068    Args:
1069        token (str):
1070            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1071        organization_context (str):
1072            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1073        sbom_type (str, required):
1074            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1075        sbom_subtype (str, required):
1076            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1077        asset_version_id (str, required):
1078            The Asset Version ID to download the SBOM for.
1079        output_filename (str, required):
1080            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1081        verbose (bool, optional):
1082            If True, will print additional information to the console. Defaults to False.
1083
1084    Raises:
1085        ValueError: Raised if required parameters are not provided.
1086        Exception: Raised if the query fails.
1087
1088    Returns:
1089        None
1090    """
1091    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1092                                     asset_version_id=asset_version_id, verbose=verbose)
1093
1094    # Send an HTTP GET request to the URL
1095    response = requests.get(url)
1096
1097    # Check if the request was successful (status code 200)
1098    if response.status_code == 200:
1099        # Open a local file in binary write mode and write the content to it
1100        if verbose:
1101            print("File downloaded successfully.")
1102        with open(output_filename, 'wb') as file:
1103            file.write(response.content)
1104            if verbose:
1105                print(f'Wrote file to {output_filename}')
1106    else:
1107        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1108
1109
1110def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1111    """
1112    Helper method to read a file in chunks.
1113
1114    Args:
1115        file_path (str):
1116            Local path to the file to read.
1117        chunk_size (int, optional):
1118            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1119
1120    Yields:
1121        bytes: The next chunk of the file.
1122
1123    Raises:
1124        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1125    """
1126    with open(file_path, 'rb') as f:
1127        while True:
1128            chunk = f.read(chunk_size)
1129            if chunk:
1130                yield chunk
1131            else:
1132                break
1133
1134
1135def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1136    """
1137    Get all artifacts in the organization. Uses pagination to get all results.
1138
1139    Args:
1140        token (str):
1141            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1142        organization_context (str):
1143            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1144        artifact_id (str, optional):
1145            An optional Artifact ID if this is used to get a single artifact, by default None
1146        business_unit_id (str, optional):
1147            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1148
1149    Raises:
1150        Exception: Raised if the query fails.
1151
1152    Returns:
1153        list: List of Artifact Objects
1154    """
1155    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1156                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
1157
1158
1159def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1160    """
1161    Gets all assets in the organization. Uses pagination to get all results.
1162
1163    Args:
1164        token (str):
1165            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1166        organization_context (str):
1167            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1168        asset_id (str, optional):
1169            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1170        business_unit_id (str, optional):
1171            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1172
1173    Raises:
1174        Exception: Raised if the query fails.
1175
1176    Returns:
1177        list: List of Asset Objects
1178    """
1179    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1180                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1181
1182
1183def get_all_asset_versions(token, organization_context):
1184    """
1185    Get all asset versions in the organization. Uses pagination to get all results.
1186
1187    Args:
1188        token (str):
1189            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1190        organization_context (str):
1191            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1192
1193    Raises:
1194        Exception: Raised if the query fails.
1195
1196    Returns:
1197        list: List of AssetVersion Objects
1198    """
1199    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1200                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
1201
1202
1203def get_all_asset_versions_for_product(token, organization_context, product_id):
1204    """
1205    Get all asset versions for a product. Uses pagination to get all results.
1206
1207    Args:
1208        token (str):
1209            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1210        organization_context (str):
1211            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1212        product_id (str):
1213            The Product ID to get asset versions for
1214
1215    Returns:
1216        list: List of AssetVersion Objects
1217    """
1218    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1219                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1220
1221
1222def get_all_business_units(token, organization_context):
1223    """
1224    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1225
1226    Args:
1227        token (str):
1228            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1229        organization_context (str):
1230            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1231
1232    Raises:
1233        Exception: Raised if the query fails.
1234
1235    Returns:
1236        list: List of Group Objects
1237    """
1238    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1239                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
1240
1241
1242def get_all_organizations(token, organization_context):
1243    """
1244    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1245
1246    Args:
1247        token (str):
1248            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1249        organization_context (str):
1250            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1251
1252    Raises:
1253        Exception: Raised if the query fails.
1254
1255    Returns:
1256        list: List of Organization Objects
1257    """
1258    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1259                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
1260
1261
1262def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1263    """
1264    Get all results from a paginated GraphQL query
1265
1266    Args:
1267        token (str):
1268            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1269        organization_context (str):
1270            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1271        query (str):
1272            The GraphQL query string
1273        variables (dict, optional):
1274            Variables to be used in the GraphQL query, by default None
1275        field (str, required):
1276            The field in the response JSON that contains the results
1277        limit (int, Optional):
1278            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1279
1280    Raises:
1281        Exception: If the response status code is not 200, or if the field is not in the response JSON
1282
1283    Returns:
1284        list: List of results
1285    """
1286
1287    if not field:
1288        raise Exception("Error: field is required")
1289    if limit and limit > 1000:
1290        raise Exception("Error: limit cannot be greater than 1000")
1291    if limit and limit < 1:
1292        raise Exception("Error: limit cannot be less than 1")
1293    if not variables["first"]:
1294        raise Exception("Error: first is required")
1295    if variables["first"] < 1:
1296        raise Exception("Error: first cannot be less than 1")
1297    if variables["first"] > 1000:
1298        raise Exception("Error: limit cannot be greater than 1000")
1299
1300    # query the API for the first page of results
1301    response_data = send_graphql_query(token, organization_context, query, variables)
1302
1303    # if there are no results, return an empty list
1304    if not response_data:
1305        return []
1306
1307    # create a list to store the results
1308    results = []
1309
1310    # add the first page of results to the list
1311    if field in response_data['data']:
1312        results.extend(response_data['data'][field])
1313    else:
1314        raise Exception(f"Error: {field} not in response JSON")
1315
1316    if len(response_data['data'][field]) > 0:
1317        # get the cursor from the last entry in the list
1318        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1319
1320        while cursor:
1321            if limit and len(results) == limit:
1322                break
1323
1324            variables['after'] = cursor
1325
1326            # add the next page of results to the list
1327            response_data = send_graphql_query(token, organization_context, query, variables)
1328            results.extend(response_data['data'][field])
1329
1330            try:
1331                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1332            except IndexError:
1333                # when there is no additional cursor, stop getting more pages
1334                cursor = None
1335
1336    return results
1337
1338
1339def get_all_products(token, organization_context):
1340    """
1341    Get all products in the organization. Uses pagination to get all results.
1342
1343    Args:
1344        token (str):
1345            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1346        organization_context (str):
1347            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1348
1349    Raises:
1350        Exception: Raised if the query fails.
1351
1352    Returns:
1353        list: List of Product Objects
1354
1355    .. deprecated:: 0.1.4. Use get_products instead.
1356    """
1357    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1358    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1359                                     queries.ALL_PRODUCTS['variables'], 'allProducts')
1360
1361
1362def get_all_users(token, organization_context):
1363    """
1364    Get all users in the organization. Uses pagination to get all results.
1365
1366    Args:
1367        token (str):
1368            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1369        organization_context (str):
1370            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1371
1372    Raises:
1373        Exception: Raised if the query fails.
1374
1375    Returns:
1376        list: List of User Objects
1377    """
1378    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1379                                     queries.ALL_USERS['variables'], 'allUsers')
1380
1381
1382def get_artifact_context(token, organization_context, artifact_id):
1383    """
1384    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1385
1386    Args:
1387        token (str):
1388            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1389        organization_context (str):
1390            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1391
1392    Raises:
1393        Exception: Raised if the query fails.
1394
1395    Returns:
1396        dict: Artifact Context Object
1397    """
1398    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1399                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1400
1401    return artifact[0]['ctx']
1402
1403
1404def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1405    """
1406    Gets assets in the organization. Uses pagination to get all results.
1407
1408    Args:
1409        token (str):
1410            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1411        organization_context (str):
1412            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1413        asset_id (str, optional):
1414            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1415        business_unit_id (str, optional):
1416            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1417
1418    Raises:
1419        Exception: Raised if the query fails.
1420
1421    Returns:
1422        list: List of Asset Objects
1423    """
1424    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1425                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1426
1427
1428def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1429    """
1430    Gets asset versions in the organization. Uses pagination to get all results.
1431
1432    Args:
1433        token (str):
1434            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1435        organization_context (str):
1436            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1437        asset_version_id (str, optional):
1438            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1439        asset_id (str, optional):
1440            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1441        business_unit_id (str, optional):
1442            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1443
1444    Raises:
1445        Exception: Raised if the query fails.
1446
1447    Returns:
1448        list: List of AssetVersion Objects
1449    """
1450    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1451                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1452                                                                             asset_id=asset_id,
1453                                                                             business_unit_id=business_unit_id),
1454                                     'allAssetVersions')
1455
1456
1457def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1458    """
1459    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1460
1461    Args:
1462        client_id (str):
1463            CLIENT_ID as specified in the API documentation
1464        client_secret (str):
1465            CLIENT_SECRET as specified in the API documentation
1466        token_url (str, optional):
1467            Token URL, by default TOKEN_URL
1468        audience (str, optional):
1469            Audience, by default AUDIENCE
1470
1471    Raises:
1472        Exception: If the response status code is not 200
1473
1474    Returns:
1475        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1476    """
1477    payload = {
1478        "client_id": client_id,
1479        "client_secret": client_secret,
1480        "audience": AUDIENCE,
1481        "grant_type": "client_credentials"
1482    }
1483
1484    headers = {
1485        'content-type': "application/json"
1486    }
1487
1488    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1489    if response.status_code == 200:
1490        auth_token = response.json()['access_token']
1491    else:
1492        raise Exception(f"Error: {response.status_code} - {response.text}")
1493
1494    return auth_token
1495
1496
1497def get_findings(
1498    token,
1499    organization_context,
1500    asset_version_id=None,
1501    finding_id=None,
1502    category=None,
1503    status=None,
1504    severity=None,
1505    count=False,
1506    limit=None,
1507):
1508    """
1509    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1510    Args:
1511        token (str):
1512            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1513        organization_context (str):
1514            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1515        asset_version_id (str, optional):
1516            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1517        finding_id (str, optional):
1518            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1519        category (str, optional):
1520            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1521            This can be a single string, or an array of values.
1522        status (str, optional):
1523            The status of Findings to return.
1524        severity (str, optional):
1525            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1526        count (bool, optional):
1527            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1528        limit (int, optional):
1529            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1530
1531    Raises:
1532        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1533
1534    Returns:
1535        list: List of Finding Objects
1536    """
1537
1538    if limit and limit > 1000:
1539        raise Exception("Error: limit must be less than 1000")
1540    if limit and limit < 1:
1541        raise Exception("Error: limit must be greater than 0")
1542
1543    if count:
1544        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1545                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1546                                                                          finding_id=finding_id, category=category,
1547                                                                          status=status, severity=severity,
1548                                                                          limit=limit))["data"]["_allFindingsMeta"]
1549    else:
1550        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1551                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1552                                                                           finding_id=finding_id, category=category,
1553                                                                           status=status, severity=severity,
1554                                                                           limit=limit), 'allFindings', limit=limit)
1555
1556
1557def get_product_asset_versions(token, organization_context, product_id=None):
1558    """
1559    Gets all the asset versions for a product.
1560    Args:
1561        token (str):
1562            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1563        organization_context (str):
1564            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1565        product_id (str, optional):
1566            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1567    Raises:
1568        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1569    Returns:
1570        list: List of AssetVersion Objects
1571    """
1572    if not product_id:
1573        raise Exception("Product ID is required")
1574
1575    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1576                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1577
1578
1579def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1580    """
1581    Gets all the products for the specified business unit.
1582    Args:
1583        token (str):
1584            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1585        organization_context (str):
1586            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1587        product_id (str, optional):
1588            Product ID to get. If not provided, will get all products in the organization.
1589        business_unit_id (str, optional):
1590            Business Unit ID to get products for. If not provided, will get all products in the organization.
1591    Raises:
1592        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1593    Returns:
1594        list: List of Product Objects
1595    """
1596
1597    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1598                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1599                                                                       business_unit_id=business_unit_id),
1600                                     'allProducts')
1601
1602
1603def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1604                                 report_subtype=None, verbose=False) -> str:
1605    """
1606    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1607    This may take several minutes to complete, depending on the size of the report.
1608
1609    Args:
1610        token (str):
1611            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1612        organization_context (str):
1613            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1614        asset_version_id (str, optional):
1615            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1616        product_id (str, optional):
1617            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1618        report_type (str, required):
1619            The file type of the report to download. Valid values are "CSV" and "PDF".
1620        report_subtype (str, required):
1621            The type of report to download. Based on available reports for the `report_type` specified
1622            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1623            Valid values for PDF are "RISK_SUMMARY".
1624        verbose (bool, optional):
1625            If True, print additional information to the console. Defaults to False.
1626    """
1627    if not report_type:
1628        raise ValueError("Report Type is required")
1629    if not report_subtype:
1630        raise ValueError("Report Subtype is required")
1631    if not asset_version_id and not product_id:
1632        raise ValueError("Asset Version ID or Product ID is required")
1633
1634    if asset_version_id and product_id:
1635        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1636
1637    if report_type not in ["CSV", "PDF"]:
1638        raise Exception(f"Report Type {report_type} not supported")
1639
1640    if report_type == "CSV":
1641        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1642            raise Exception(f"Report Subtype {report_subtype} not supported")
1643
1644        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1645                                                            report_type=report_type, report_subtype=report_subtype)
1646        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1647                                                              report_type=report_type, report_subtype=report_subtype)
1648
1649        response_data = send_graphql_query(token, organization_context, mutation, variables)
1650        if verbose:
1651            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1652
1653        # get exportJobId from the result
1654        if asset_version_id:
1655            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1656        elif product_id:
1657            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1658        else:
1659            raise Exception(
1660                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1661
1662        if verbose:
1663            print(f'Export Job ID: {export_job_id}')
1664
1665    if report_type == "PDF":
1666        if report_subtype not in ["RISK_SUMMARY"]:
1667            raise Exception(f"Report Subtype {report_subtype} not supported")
1668
1669        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1670                                                            report_type=report_type, report_subtype=report_subtype)
1671        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1672                                                              report_type=report_type, report_subtype=report_subtype)
1673
1674        response_data = send_graphql_query(token, organization_context, mutation, variables)
1675        if verbose:
1676            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1677
1678        # get exportJobId from the result
1679        if asset_version_id:
1680            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1681        elif product_id:
1682            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1683        else:
1684            raise Exception(
1685                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1686
1687        if verbose:
1688            print(f'Export Job ID: {export_job_id}')
1689
1690    if not export_job_id:
1691        raise Exception(
1692            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1693
1694    # poll the API until the export job is complete
1695    sleep_time = 10
1696    total_time = 0
1697    if verbose:
1698        print(f'Polling every {sleep_time} seconds for export job to complete')
1699
1700    while True:
1701        time.sleep(sleep_time)
1702        total_time += sleep_time
1703        if verbose:
1704            print(f'Total time elapsed: {total_time} seconds')
1705
1706        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1707        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1708
1709        response_data = send_graphql_query(token, organization_context, query, variables)
1710
1711        if verbose:
1712            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1713
1714        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1715            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1716                if verbose:
1717                    print(
1718                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1719                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1720
1721
1722def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1723                               verbose=False) -> str:
1724    """
1725    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1726    This may take several minutes to complete, depending on the size of SBOM.
1727
1728    Args:
1729        token (str):
1730            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1731        organization_context (str):
1732            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1733        sbom_type (str, required):
1734            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1735        sbom_subtype (str, required):
1736            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1737        asset_version_id (str, required):
1738            Asset Version ID to download the SBOM for.
1739        verbose (bool, optional):
1740            If True, print additional information to the console. Defaults to False.
1741
1742    Raises:
1743        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1744        Exception: Raised if the query fails.
1745
1746    Returns:
1747        str: URL to download the SBOM from.
1748    """
1749
1750    if not sbom_type:
1751        raise ValueError("SBOM Type is required")
1752    if not sbom_subtype:
1753        raise ValueError("SBOM Subtype is required")
1754    if not asset_version_id:
1755        raise ValueError("Asset Version ID is required")
1756
1757    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1758        raise Exception(f"SBOM Type {sbom_type} not supported")
1759
1760    if sbom_type == "CYCLONEDX":
1761        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1762            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1763
1764        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1765        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1766
1767        response_data = send_graphql_query(token, organization_context, mutation, variables)
1768        if verbose:
1769            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1770
1771        # get exportJobId from the result
1772        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1773        if verbose:
1774            print(f'Export Job ID: {export_job_id}')
1775
1776    if sbom_type == "SPDX":
1777        if sbom_subtype not in ["SBOM_ONLY"]:
1778            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1779
1780        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1781        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1782
1783        response_data = send_graphql_query(token, organization_context, mutation, variables)
1784        if verbose:
1785            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1786
1787        # get exportJobId from the result
1788        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1789        if verbose:
1790            print(f'Export Job ID: {export_job_id}')
1791
1792    if not export_job_id:
1793        raise Exception(
1794            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1795
1796    # poll the API until the export job is complete
1797    sleep_time = 10
1798    total_time = 0
1799    if verbose:
1800        print(f'Polling every {sleep_time} seconds for export job to complete')
1801    while True:
1802        time.sleep(sleep_time)
1803        total_time += sleep_time
1804        if verbose:
1805            print(f'Total time elapsed: {total_time} seconds')
1806
1807        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1808        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1809
1810        response_data = send_graphql_query(token, organization_context, query, variables)
1811
1812        if verbose:
1813            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1814
1815        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1816            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1817                if verbose:
1818                    print(
1819                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1820                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1821
1822
1823def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1824    """
1825    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1826    Args:
1827        token (str):
1828            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1829        organization_context (str):
1830            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1831        asset_version_id (str, optional):
1832            Asset Version ID to get software components for.
1833        type (str, optional):
1834            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1835    Raises:
1836        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1837    Returns:
1838        list: List of Software Component Objects
1839    """
1840    if not asset_version_id:
1841        raise Exception("Asset Version ID is required")
1842
1843    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1844                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1845                                                                                  type=type),
1846                                     'allSoftwareComponentInstances')
1847
1848
1849def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1850                case_sensitive=False) -> list:
1851    """
1852    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1853    Search Methods: EXACT or CONTAINS
1854    An exact match will return only the software component whose name matches the name exactly.
1855    A contains match will return all software components whose name contains the search string.
1856
1857    Args:
1858        token (str):
1859            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1860        organization_context (str):
1861            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1862        name (str, required):
1863            Name of the software component to search for.
1864        version (str, optional):
1865            Version of the software component to search for. If not specified, will search for all versions of the software component.
1866        asset_version_id (str, optional):
1867            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1868        search_method (str, optional):
1869            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1870        case_sensitive (bool, optional):
1871            Whether or not to perform a case sensitive search. Defaults to False.
1872    Raises:
1873        ValueError: Raised if name is not provided.
1874        Exception: Raised if the query fails.
1875    Returns:
1876        list: List of SoftwareComponentInstance Objects
1877    """
1878    if asset_version_id:
1879        query = """
1880query GetSoftwareComponentInstances_SDK(
1881    $filter: SoftwareComponentInstanceFilter
1882    $after: String
1883    $first: Int
1884) {
1885    allSoftwareComponentInstances(
1886        filter: $filter
1887        after: $after
1888        first: $first
1889    ) {
1890        _cursor
1891        id
1892        name
1893        version
1894        originalComponents {
1895            id
1896            name
1897            version
1898        }
1899    }
1900}
1901"""
1902    else:
1903        # gets the asset version info that contains the software component
1904        query = """
1905query GetSoftwareComponentInstances_SDK(
1906    $filter: SoftwareComponentInstanceFilter
1907    $after: String
1908    $first: Int
1909) {
1910    allSoftwareComponentInstances(
1911        filter: $filter
1912        after: $after
1913        first: $first
1914    ) {
1915        _cursor
1916        id
1917        name
1918        version
1919        assetVersion {
1920            id
1921            name
1922            asset {
1923                id
1924                name
1925            }
1926        }
1927    }
1928}
1929"""
1930
1931    variables = {
1932        "filter": {
1933            "mergedComponentRefId": None
1934        },
1935        "after": None,
1936        "first": 100
1937    }
1938
1939    if asset_version_id:
1940        variables["filter"]["assetVersionRefId"] = asset_version_id
1941
1942    if search_method == 'EXACT':
1943        if case_sensitive:
1944            variables["filter"]["name"] = name
1945        else:
1946            variables["filter"]["name_like"] = name
1947    elif search_method == 'CONTAINS':
1948        variables["filter"]["name_contains"] = name
1949
1950    if version:
1951        if search_method == 'EXACT':
1952            variables["filter"]["version"] = version
1953        elif search_method == 'CONTAINS':
1954            variables["filter"]["version_contains"] = version
1955
1956    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1957                                        field="allSoftwareComponentInstances")
1958
1959    return records
1960
1961
1962def send_graphql_query(token, organization_context, query, variables=None):
1963    """
1964    Send a GraphQL query to the API
1965
1966    Args:
1967        token (str):
1968            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1969        organization_context (str):
1970            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1971        query (str):
1972            The GraphQL query string
1973        variables (dict, optional):
1974            Variables to be used in the GraphQL query, by default None
1975
1976    Raises:
1977        Exception: If the response status code is not 200
1978
1979    Returns:
1980        dict: Response JSON
1981    """
1982    headers = {
1983        'Content-Type': 'application/json',
1984        'Authorization': f'Bearer {token}',
1985        'Organization-Context': organization_context
1986    }
1987    data = {
1988        'query': query,
1989        'variables': variables
1990    }
1991
1992    response = requests.post(API_URL, headers=headers, json=data)
1993
1994    if response.status_code == 200:
1995        thejson = response.json()
1996
1997        if "errors" in thejson:
1998            raise Exception(f"Error: {thejson['errors']}")
1999
2000        return thejson
2001    else:
2002        raise Exception(f"Error: {response.status_code} - {response.text}")
2003
2004
2005def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2006                            justification=None, response=None, comment=None):
2007    """
2008    Updates the status of a findings or multiple findings. This is a blocking call.
2009
2010    Args:
2011        token (str):
2012            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2013        organization_context (str):
2014            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2015        user_id (str, required):
2016            User ID to update the finding status for.
2017        finding_ids (str, required):
2018            Finding ID to update the status for.
2019        status (str, required):
2020            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2021        justification (str, optional):
2022            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2023        response (str, optional):
2024            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2025        comment (str, optional):
2026            Optional comment to add to the finding status update.
2027
2028    Raises:
2029        ValueError: Raised if required parameters are not provided.
2030        Exception: Raised if the query fails.
2031
2032    Returns:
2033        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2034    """
2035    if not user_id:
2036        raise ValueError("User Id is required")
2037    if not finding_ids:
2038        raise ValueError("Finding Ids is required")
2039    if not status:
2040        raise ValueError("Status is required")
2041
2042    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2043    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2044                                                             justification=justification, response=response,
2045                                                             comment=comment)
2046
2047    return send_graphql_query(token, organization_context, mutation, variables)
2048
2049
2050def upload_file_for_binary_analysis(
2051    token, organization_context, test_id=None, file_path=None, chunk_size=DEFAULT_CHUNK_SIZE, quick_scan=False
2052):
2053    """
2054    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk.
2055    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2056
2057    Args:
2058        token (str):
2059            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2060        organization_context (str):
2061            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2062        test_id (str, required):
2063            Test ID to upload the file for.
2064        file_path (str, required):
2065            Local path to the file to upload.
2066        chunk_size (int, optional):
2067            The size of the chunks to read. 1000 MiB by default. Min 5MiB and max 2GiB.
2068        quick_scan (bool, optional):
2069            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2070
2071    Raises:
2072        ValueError: Raised if test_id or file_path are not provided.
2073        Exception: Raised if the query fails.
2074
2075    Returns:
2076        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2077    """
2078    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2079    if not test_id:
2080        raise ValueError("Test Id is required")
2081    if not file_path:
2082        raise ValueError("File Path is required")
2083    if chunk_size < MIN_CHUNK_SIZE:
2084        raise ValueError(f"Chunk size must be greater than {MIN_CHUNK_SIZE} bytes")
2085    if chunk_size >= MAX_CHUNK_SIZE:
2086        raise ValueError(f"Chunk size must be less than {MAX_CHUNK_SIZE} bytes")
2087
2088    # Start Multi-part Upload
2089    graphql_query = """
2090    mutation Start_SDK($testId: ID!) {
2091        startMultipartUploadV2(testId: $testId) {
2092            uploadId
2093            key
2094        }
2095    }
2096    """
2097
2098    variables = {
2099        "testId": test_id
2100    }
2101
2102    response = send_graphql_query(token, organization_context, graphql_query, variables)
2103
2104    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2105    upload_key = response['data']['startMultipartUploadV2']['key']
2106
2107    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2108    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2109    # and upload the file to the returned upload URL
2110    i = 0
2111    part_data = []
2112    for chunk in file_chunks(file_path, chunk_size):
2113        i = i + 1
2114        graphql_query = """
2115        mutation GenerateUploadPartUrl_SDK($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2116            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2117                key
2118                uploadUrl
2119            }
2120        }
2121        """
2122
2123        variables = {
2124            "partNumber": i,
2125            "uploadId": upload_id,
2126            "uploadKey": upload_key
2127        }
2128
2129        response = send_graphql_query(token, organization_context, graphql_query, variables)
2130
2131        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2132
2133        # upload the chunk to the upload URL
2134        response = upload_bytes_to_url(chunk_upload_url, chunk)
2135
2136        part_data.append({
2137            "ETag": response.headers['ETag'],
2138            "PartNumber": i
2139        })
2140
2141    # call completeMultipartUploadV2
2142    graphql_query = """
2143    mutation CompleteMultipartUpload_SDK($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2144        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2145            key
2146        }
2147    }
2148    """
2149
2150    variables = {
2151        "partData": part_data,
2152        "uploadId": upload_id,
2153        "uploadKey": upload_key
2154    }
2155
2156    response = send_graphql_query(token, organization_context, graphql_query, variables)
2157
2158    # get key from the result
2159    key = response['data']['completeMultipartUploadV2']['key']
2160
2161    variables = {
2162        "key": key,
2163        "testId": test_id
2164    }
2165
2166    # call launchBinaryUploadProcessing
2167    if quick_scan:
2168        graphql_query = """
2169        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2170            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2171                key
2172            }
2173        }
2174        """
2175        variables["configurationOptions"] = ["QUICK_SCAN"]
2176    else:
2177        graphql_query = """
2178        mutation LaunchBinaryUploadProcessing_SDK($key: String!, $testId: ID!) {
2179            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2180                key
2181            }
2182        }
2183        """
2184
2185    response = send_graphql_query(token, organization_context, graphql_query, variables)
2186
2187    return response['data']
2188
2189
2190def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2191    """
2192    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2193
2194    Args:
2195        token (str):
2196            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2197        organization_context (str):
2198            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2199        test_id (str, required):
2200            Test ID to upload the file for.
2201        file_path (str, required):
2202            Local path to the file to upload.
2203
2204    Raises:
2205        ValueError: Raised if test_id or file_path are not provided.
2206        Exception: Raised if the query fails.
2207
2208    Returns:
2209        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2210    """
2211    if not test_id:
2212        raise ValueError("Test Id is required")
2213    if not file_path:
2214        raise ValueError("File Path is required")
2215
2216    # Gerneate Test Result Upload URL
2217    graphql_query = """
2218    mutation GenerateTestResultUploadUrl_SDK($testId: ID!) {
2219        generateSinglePartUploadUrl(testId: $testId) {
2220            uploadUrl
2221            key
2222        }
2223    }
2224    """
2225
2226    variables = {
2227        "testId": test_id
2228    }
2229
2230    response = send_graphql_query(token, organization_context, graphql_query, variables)
2231
2232    # get the upload URL and key
2233    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2234    key = response['data']['generateSinglePartUploadUrl']['key']
2235
2236    # upload the file
2237    upload_file_to_url(upload_url, file_path)
2238
2239    # complete the upload
2240    graphql_query = """
2241    mutation CompleteTestResultUpload_SDK($key: String!, $testId: ID!) {
2242        launchTestResultProcessing(key: $key, testId: $testId) {
2243            key
2244        }
2245    }
2246    """
2247
2248    variables = {
2249        "testId": test_id,
2250        "key": key
2251    }
2252
2253    response = send_graphql_query(token, organization_context, graphql_query, variables)
2254    return response['data']
2255
2256
2257def upload_bytes_to_url(url, bytes):
2258    """
2259    Used for uploading a file to a pre-signed S3 URL
2260
2261    Args:
2262        url (str):
2263            (Pre-signed S3) URL
2264        bytes (bytes):
2265            Bytes to upload
2266
2267    Raises:
2268        Exception: If the response status code is not 200
2269
2270    Returns:
2271        requests.Response: Response object
2272    """
2273    response = requests.put(url, data=bytes)
2274
2275    if response.status_code == 200:
2276        return response
2277    else:
2278        raise Exception(f"Error: {response.status_code} - {response.text}")
2279
2280
2281def upload_file_to_url(url, file_path):
2282    """
2283    Used for uploading a file to a pre-signed S3 URL
2284
2285    Args:
2286        url (str):
2287            (Pre-signed S3) URL
2288        file_path (str):
2289            Local path to file to upload
2290
2291    Raises:
2292        Exception: If the response status code is not 200
2293
2294    Returns:
2295        requests.Response: Response object
2296    """
2297    with open(file_path, 'rb') as file:
2298        response = requests.put(url, data=file)
2299
2300    if response.status_code == 200:
2301        return response
2302    else:
2303        raise Exception(f"Error: {response.status_code} - {response.text}")
API_URL = 'https://platform.finitestate.io/api/v1/graphql'
AUDIENCE = 'https://platform.finitestate.io/api/v1/graphql'
TOKEN_URL = 'https://platform.finitestate.io/api/v1/auth/token'

DEFAULT CHUNK SIZE: 1000 MiB

DEFAULT_CHUNK_SIZE = 1048576000

MAX CHUNK SIZE: 2 GiB

MAX_CHUNK_SIZE = 2097152000

MIN CHUNK SIZE: 5 MiB

MIN_CHUNK_SIZE = 5242880
class UploadMethod(enum.Enum):
28class UploadMethod(Enum):
29    """
30    Enumeration class representing different upload methods.
31
32    Attributes:
33        WEB_APP_UI: Upload method via web application UI.
34        API: Upload method via API.
35        GITHUB_INTEGRATION: Upload method via GitHub integration.
36        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
37
38    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
39    """
40    WEB_APP_UI = "WEB_APP_UI"
41    API = "API"
42    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
43    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"

Enumeration class representing different upload methods.

Attributes:
  • WEB_APP_UI: Upload method via web application UI.
  • API: Upload method via API.
  • GITHUB_INTEGRATION: Upload method via GitHub integration.
  • AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.

To use any value from this enumeration, use UploadMethod. i.e. UploadMethod.WEB_APP_UI

WEB_APP_UI = <UploadMethod.WEB_APP_UI: 'WEB_APP_UI'>
API = <UploadMethod.API: 'API'>
GITHUB_INTEGRATION = <UploadMethod.GITHUB_INTEGRATION: 'GITHUB_INTEGRATION'>
AZURE_DEVOPS_INTEGRATION = <UploadMethod.AZURE_DEVOPS_INTEGRATION: 'AZURE_DEVOPS_INTEGRATION'>
Inherited Members
enum.Enum
name
value
def create_artifact( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_version_id=None, artifact_name=None, product_id=None):
 46def create_artifact(
 47    token,
 48    organization_context,
 49    business_unit_id=None,
 50    created_by_user_id=None,
 51    asset_version_id=None,
 52    artifact_name=None,
 53    product_id=None,
 54):
 55    """
 56    Create a new Artifact.
 57    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 58    Please see the examples in the Github repository for more information:
 59    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 60    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 61
 62    Args:
 63        token (str):
 64            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 65        organization_context (str):
 66            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 67        business_unit_id (str, required):
 68            Business Unit ID to associate the artifact with.
 69        created_by_user_id (str, required):
 70            User ID of the user creating the artifact.
 71        asset_version_id (str, required):
 72            Asset Version ID to associate the artifact with.
 73        artifact_name (str, required):
 74            The name of the Artifact being created.
 75        product_id (str, optional):
 76            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
 77
 78    Raises:
 79        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
 80        Exception: Raised if the query fails.
 81
 82    Returns:
 83        dict: createArtifact Object
 84    """
 85    if not business_unit_id:
 86        raise ValueError("Business unit ID is required")
 87    if not created_by_user_id:
 88        raise ValueError("Created by user ID is required")
 89    if not asset_version_id:
 90        raise ValueError("Asset version ID is required")
 91    if not artifact_name:
 92        raise ValueError("Artifact name is required")
 93
 94    graphql_query = """
 95    mutation CreateArtifactMutation_SDK($input: CreateArtifactInput!) {
 96        createArtifact(input: $input) {
 97            id
 98            name
 99            assetVersion {
100                id
101                name
102                asset {
103                    id
104                    name
105                }
106            }
107            createdBy {
108                id
109                email
110            }
111            ctx {
112                asset
113                products
114                businessUnits
115            }
116        }
117    }
118    """
119
120    # Asset name, business unit context, and creating user are required
121    variables = {
122        "input": {
123            "name": artifact_name,
124            "createdBy": created_by_user_id,
125            "assetVersion": asset_version_id,
126            "ctx": {
127                "asset": asset_version_id,
128                "businessUnits": [business_unit_id]
129            }
130        }
131    }
132
133    if product_id is not None:
134        variables["input"]["ctx"]["products"] = product_id
135
136    response = send_graphql_query(token, organization_context, graphql_query, variables)
137    return response['data']

Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the artifact with.
  • created_by_user_id (str, required): User ID of the user creating the artifact.
  • asset_version_id (str, required): Asset Version ID to associate the artifact with.
  • artifact_name (str, required): The name of the Artifact being created.
  • product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createArtifact Object

def create_asset( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
140def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
141    """
142    Create a new Asset.
143
144    Args:
145        token (str):
146            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
147        organization_context (str):
148            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
149        business_unit_id (str, required):
150            Business Unit ID to associate the asset with.
151        created_by_user_id (str, required):
152            User ID of the user creating the asset.
153        asset_name (str, required):
154            The name of the Asset being created.
155        product_id (str, optional):
156            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
157
158    Raises:
159        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
160        Exception: Raised if the query fails.
161
162    Returns:
163        dict: createAsset Object
164    """
165    if not business_unit_id:
166        raise ValueError("Business unit ID is required")
167    if not created_by_user_id:
168        raise ValueError("Created by user ID is required")
169    if not asset_name:
170        raise ValueError("Asset name is required")
171
172    graphql_query = """
173    mutation CreateAssetMutation_SDK($input: CreateAssetInput!) {
174        createAsset(input: $input) {
175            id
176            name
177            dependentProducts {
178                id
179                name
180            }
181            group {
182                id
183                name
184            }
185            createdBy {
186                id
187                email
188            }
189            ctx {
190                asset
191                products
192                businessUnits
193            }
194        }
195    }
196    """
197
198    # Asset name, business unit context, and creating user are required
199    variables = {
200        "input": {
201            "name": asset_name,
202            "group": business_unit_id,
203            "createdBy": created_by_user_id,
204            "ctx": {
205                "businessUnits": [business_unit_id]
206            }
207        }
208    }
209
210    if product_id is not None:
211        variables["input"]["ctx"]["products"] = product_id
212
213    response = send_graphql_query(token, organization_context, graphql_query, variables)
214    return response['data']

Create a new Asset.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset with.
  • created_by_user_id (str, required): User ID of the user creating the asset.
  • asset_name (str, required): The name of the Asset being created.
  • product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAsset Object

def create_asset_version( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
217def create_asset_version(
218    token,
219    organization_context,
220    business_unit_id=None,
221    created_by_user_id=None,
222    asset_id=None,
223    asset_version_name=None,
224    product_id=None,
225):
226    """
227    Create a new Asset Version.
228
229    Args:
230        token (str):
231            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
232        organization_context (str):
233            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
234        business_unit_id (str, required):
235            Business Unit ID to associate the asset version with.
236        created_by_user_id (str, required):
237            User ID of the user creating the asset version.
238        asset_id (str, required):
239            Asset ID to associate the asset version with.
240        asset_version_name (str, required):
241            The name of the Asset Version being created.
242        product_id (str, optional):
243            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
244
245    Raises:
246        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
247        Exception: Raised if the query fails.
248
249    Returns:
250        dict: createAssetVersion Object
251
252    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
253    """
254    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
255    if not business_unit_id:
256        raise ValueError("Business unit ID is required")
257    if not created_by_user_id:
258        raise ValueError("Created by user ID is required")
259    if not asset_id:
260        raise ValueError("Asset ID is required")
261    if not asset_version_name:
262        raise ValueError("Asset version name is required")
263
264    graphql_query = """
265    mutation CreateAssetVersionMutation_SDK($input: CreateAssetVersionInput!) {
266        createAssetVersion(input: $input) {
267            id
268            name
269            asset {
270                id
271                name
272            }
273            createdBy {
274                id
275                email
276            }
277            ctx {
278                asset
279                products
280                businessUnits
281            }
282        }
283    }
284    """
285
286    # Asset name, business unit context, and creating user are required
287    variables = {
288        "input": {
289            "name": asset_version_name,
290            "createdBy": created_by_user_id,
291            "asset": asset_id,
292            "ctx": {
293                "asset": asset_id,
294                "businessUnits": [business_unit_id]
295            }
296        }
297    }
298
299    if product_id is not None:
300        variables["input"]["ctx"]["products"] = product_id
301
302    response = send_graphql_query(token, organization_context, graphql_query, variables)
303    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset version with.
  • created_by_user_id (str, required): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
  • product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

deprecated:: 0.1.7. Use create_asset_version_on_asset instead.

def create_asset_version_on_asset( token, organization_context, created_by_user_id=None, asset_id=None, asset_version_name=None):
306def create_asset_version_on_asset(
307    token,
308    organization_context,
309    created_by_user_id=None,
310    asset_id=None,
311    asset_version_name=None,
312):
313    """
314    Create a new Asset Version.
315
316    Args:
317        token (str):
318            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
319        organization_context (str):
320            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
321        created_by_user_id (str, optional):
322            User ID of the user creating the asset version.
323        asset_id (str, required):
324            Asset ID to associate the asset version with.
325        asset_version_name (str, required):
326            The name of the Asset Version being created.
327
328    Raises:
329        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
330        Exception: Raised if the query fails.
331
332    Returns:
333        dict: createAssetVersion Object
334    """
335    if not asset_id:
336        raise ValueError("Asset ID is required")
337    if not asset_version_name:
338        raise ValueError("Asset version name is required")
339
340    graphql_query = """
341        mutation BapiCreateAssetVersion_SDK($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) {
342            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) {
343                id
344                assetVersion {
345                    id
346                }
347            }
348        }
349    """
350
351    # Asset name, business unit context, and creating user are required
352    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
353
354    if created_by_user_id:
355        variables["createdByUserId"] = created_by_user_id
356
357    response = send_graphql_query(token, organization_context, graphql_query, variables)
358    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • created_by_user_id (str, optional): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

def create_new_asset_version_artifact_and_test_for_upload( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, product_id=None, test_type=None, artifact_description=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
361def create_new_asset_version_artifact_and_test_for_upload(
362    token,
363    organization_context,
364    business_unit_id=None,
365    created_by_user_id=None,
366    asset_id=None,
367    version=None,
368    product_id=None,
369    test_type=None,
370    artifact_description=None,
371    upload_method: UploadMethod = UploadMethod.API,
372):
373    """
374    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
375    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
376
377    Args:
378        token (str):
379            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
380        organization_context (str):
381            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
382        business_unit_id (str, optional):
383            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
384        created_by_user_id (str, optional):
385            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
386        asset_id (str, required):
387            Asset ID to create the asset version for. If not provided, the default asset will be used.
388        version (str, required):
389            Version to create the asset version for.
390        product_id (str, optional):
391            Product ID to create the entities for. If not provided, the default product will be used.
392        test_type (str, required):
393            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
394        artifact_description (str, optional):
395            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
396        upload_method (UploadMethod, optional):
397            The method of uploading the test results. Default is UploadMethod.API.
398
399
400    Raises:
401        ValueError: Raised if asset_id or version are not provided.
402        Exception: Raised if the query fails.
403
404    Returns:
405        str: The Test ID of the newly created test that is used for uploading the file.
406    """
407    if not asset_id:
408        raise ValueError("Asset ID is required")
409    if not version:
410        raise ValueError("Version is required")
411
412    assets = get_all_assets(token, organization_context, asset_id=asset_id)
413    asset = assets[0]
414
415    # get the asset name
416    asset_name = asset['name']
417
418    # get the existing asset product IDs
419    asset_product_ids = asset['ctx']['products']
420
421    # get the asset product ID
422    if product_id and product_id not in asset_product_ids:
423        asset_product_ids.append(product_id)
424
425    # if business_unit_id or created_by_user_id are not provided, get the existing asset
426    if not business_unit_id or not created_by_user_id:
427        if not business_unit_id:
428            business_unit_id = asset['group']['id']
429        if not created_by_user_id:
430            created_by_user_id = asset['createdBy']['id']
431
432        if not business_unit_id:
433            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
434        if not created_by_user_id:
435            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
436
437    # create the asset version
438    response = create_asset_version_on_asset(
439        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version
440    )
441    # get the asset version ID
442    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
443
444    # create the test
445    if test_type == "finite_state_binary_analysis":
446        # create the artifact
447        if not artifact_description:
448            artifact_description = "Binary"
449        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
450        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
451                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
452                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
453
454        # get the artifact ID
455        binary_artifact_id = response['createArtifact']['id']
456
457        # create the test
458        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
459        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
460                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
461                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
462                                                  test_name=test_name, upload_method=upload_method)
463        test_id = response['createTest']['id']
464        return test_id
465
466    else:
467        # create the artifact
468        if not artifact_description:
469            artifact_description = "Unspecified Artifact"
470        artifact_name = f"{asset_name} {version} - {artifact_description}"
471        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
472                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
473                                   artifact_name=artifact_name, product_id=asset_product_ids)
474
475        # get the artifact ID
476        binary_artifact_id = response['createArtifact']['id']
477
478        # create the test
479        test_name = f"{asset_name} {version} - {test_type}"
480        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
481                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
482                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
483                                                      test_name=test_name, test_type=test_type,
484                                                      upload_method=upload_method)
485        test_id = response['createTest']['id']
486        return test_id

Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
  • created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
  • version (str, required): Version to create the asset version for.
  • product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
  • test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
  • artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id or version are not provided.
  • Exception: Raised if the query fails.
Returns:

str: The Test ID of the newly created test that is used for uploading the file.

def create_new_asset_version_and_upload_binary( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, artifact_description=None, quick_scan=False, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
489def create_new_asset_version_and_upload_binary(
490    token,
491    organization_context,
492    business_unit_id=None,
493    created_by_user_id=None,
494    asset_id=None,
495    version=None,
496    file_path=None,
497    product_id=None,
498    artifact_description=None,
499    quick_scan=False,
500    upload_method: UploadMethod = UploadMethod.API,
501):
502    """
503    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
504    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
505
506    Args:
507        token (str):
508            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
509        organization_context (str):
510            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
511        business_unit_id (str, optional):
512            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
513        created_by_user_id (str, optional):
514            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
515        asset_id (str, required):
516            Asset ID to create the asset version for.
517        version (str, required):
518            Version to create the asset version for.
519        file_path (str, required):
520            Local path to the file to upload.
521        product_id (str, optional):
522            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
523        artifact_description (str, optional):
524            Description of the artifact. If not provided, the default is "Firmware Binary".
525        quick_scan (bool, optional):
526            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
527        upload_method (UploadMethod, optional):
528            The method of uploading the test results. Default is UploadMethod.API.
529
530    Raises:
531        ValueError: Raised if asset_id, version, or file_path are not provided.
532        Exception: Raised if any of the queries fail.
533
534    Returns:
535        dict: The response from the GraphQL query, a createAssetVersion Object.
536    """
537    if not asset_id:
538        raise ValueError("Asset ID is required")
539    if not version:
540        raise ValueError("Version is required")
541    if not file_path:
542        raise ValueError("File path is required")
543
544    # create the asset version and binary test
545    if not artifact_description:
546        artifact_description = "Firmware Binary"
547    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
548        token,
549        organization_context,
550        business_unit_id=business_unit_id,
551        created_by_user_id=created_by_user_id,
552        asset_id=asset_id,
553        version=version,
554        product_id=product_id,
555        test_type="finite_state_binary_analysis",
556        artifact_description=artifact_description,
557        upload_method=upload_method,
558    )
559
560    # upload file for binary test
561    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
562                                               quick_scan=quick_scan)
563    return response

Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Local path to the file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
  • artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
  • quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id, version, or file_path are not provided.
  • Exception: Raised if any of the queries fail.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_new_asset_version_and_upload_test_results( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, test_type=None, artifact_description='', upload_method: UploadMethod = <UploadMethod.API: 'API'>):
566def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
567                                                     created_by_user_id=None, asset_id=None, version=None,
568                                                     file_path=None, product_id=None, test_type=None,
569                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
570    """
571    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
572    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
573
574    Args:
575        token (str):
576            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
577        organization_context (str):
578            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
579        business_unit_id (str, optional):
580            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
581        created_by_user_id (str, optional):
582            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
583        asset_id (str, required):
584            Asset ID to create the asset version for.
585        version (str, required):
586            Version to create the asset version for.
587        file_path (str, required):
588            Path to the test results file to upload.
589        product_id (str, optional):
590            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
591        test_type (str, required):
592            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
593        artifact_description (str, optional):
594            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
595        upload_method (UploadMethod, optional):
596            The method of uploading the test results. Default is UploadMethod.API.
597
598    Raises:
599        ValueError: If the asset_id, version, or file_path are not provided.
600        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
601
602    Returns:
603        dict: The response from the GraphQL query, a createAssetVersion Object.
604    """
605    if not asset_id:
606        raise ValueError("Asset ID is required")
607    if not version:
608        raise ValueError("Version is required")
609    if not file_path:
610        raise ValueError("File path is required")
611    if not test_type:
612        raise ValueError("Test type is required")
613
614    # create the asset version and test
615    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
616                                                                    business_unit_id=business_unit_id,
617                                                                    created_by_user_id=created_by_user_id,
618                                                                    asset_id=asset_id, version=version,
619                                                                    product_id=product_id, test_type=test_type,
620                                                                    artifact_description=artifact_description,
621                                                                    upload_method=upload_method)
622
623    # upload test results file
624    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
625    return response

Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Path to the test results file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
  • test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
  • artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: If the asset_id, version, or file_path are not provided.
  • Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_product( token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, product_description=None, vendor_id=None, vendor_name=None):
628def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
629                   product_description=None, vendor_id=None, vendor_name=None):
630    """
631    Create a new Product.
632
633    Args:
634        token (str):
635            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
636        organization_context (str):
637            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
638        business_unit_id (str, required):
639            Business Unit ID to associate the product with.
640        created_by_user_id (str, required):
641            User ID of the user creating the product.
642        product_name (str, required):
643            The name of the Product being created.
644        product_description (str, optional):
645            The description of the Product being created.
646        vendor_id (str, optional):
647            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
648        vendor_name (str, optional):
649            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
650
651    Raises:
652        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
653        Exception: Raised if the query fails.
654
655    Returns:
656        dict: createProduct Object
657    """
658
659    if not business_unit_id:
660        raise ValueError("Business unit ID is required")
661    if not created_by_user_id:
662        raise ValueError("Created by user ID is required")
663    if not product_name:
664        raise ValueError("Product name is required")
665
666    graphql_query = """
667    mutation CreateProductMutation_SDK($input: CreateProductInput!) {
668        createProduct(input: $input) {
669            id
670            name
671            vendor {
672                name
673            }
674            group {
675                id
676                name
677            }
678            createdBy {
679                id
680                email
681            }
682            ctx {
683                businessUnit
684            }
685        }
686    }
687    """
688
689    # Product name, business unit context, and creating user are required
690    variables = {
691        "input": {
692            "name": product_name,
693            "group": business_unit_id,
694            "createdBy": created_by_user_id,
695            "ctx": {
696                "businessUnit": business_unit_id
697            }
698        }
699    }
700
701    if product_description is not None:
702        variables["input"]["description"] = product_description
703
704    # If the vendor ID is specified, this will link the new product to the existing vendor
705    if vendor_id is not None:
706        variables["input"]["vendor"] = {
707            "id": vendor_id
708        }
709
710    # If the vendor name is specified, this will create a new vendor and link it to the new product
711    if vendor_name is not None:
712        variables["input"]["createVendor"] = {
713            "name": vendor_name
714        }
715
716    response = send_graphql_query(token, organization_context, graphql_query, variables)
717
718    return response['data']

Create a new Product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the product with.
  • created_by_user_id (str, required): User ID of the user creating the product.
  • product_name (str, required): The name of the Product being created.
  • product_description (str, optional): The description of the Product being created.
  • vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
  • vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createProduct Object

def create_test( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, tools=[], upload_method: UploadMethod = <UploadMethod.API: 'API'>):
721def create_test(
722    token,
723    organization_context,
724    business_unit_id=None,
725    created_by_user_id=None,
726    asset_id=None,
727    artifact_id=None,
728    test_name=None,
729    product_id=None,
730    test_type=None,
731    tools=[],
732    upload_method: UploadMethod = UploadMethod.API,
733):
734    """
735    Create a new Test object for uploading files.
736    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
737    Please see the examples in the Github repository for more information:
738    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
739    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
740
741    Args:
742        token (str):
743            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
744        organization_context (str):
745            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
746        business_unit_id (str, required):
747            Business Unit ID to associate the Test with.
748        created_by_user_id (str, required):
749            User ID of the user creating the Test.
750        asset_id (str, required):
751            Asset ID to associate the Test with.
752        artifact_id (str, required):
753            Artifact ID to associate the Test with.
754        test_name (str, required):
755            The name of the Test being created.
756        product_id (str, optional):
757            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
758        test_type (str, required):
759            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
760        tools (list, optional):
761            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
762        upload_method (UploadMethod, required):
763            The method of uploading the test results.
764
765    Raises:
766        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
767        Exception: Raised if the query fails.
768
769    Returns:
770        dict: createTest Object
771    """
772    if not business_unit_id:
773        raise ValueError("Business unit ID is required")
774    if not created_by_user_id:
775        raise ValueError("Created by user ID is required")
776    if not asset_id:
777        raise ValueError("Asset ID is required")
778    if not artifact_id:
779        raise ValueError("Artifact ID is required")
780    if not test_name:
781        raise ValueError("Test name is required")
782    if not test_type:
783        raise ValueError("Test type is required")
784
785    graphql_query = """
786    mutation CreateTestMutation_SDK($input: CreateTestInput!) {
787        createTest(input: $input) {
788            id
789            name
790            artifactUnderTest {
791                id
792                name
793                assetVersion {
794                    id
795                    name
796                    asset {
797                        id
798                        name
799                        dependentProducts {
800                            id
801                            name
802                        }
803                    }
804                }
805            }
806            createdBy {
807                id
808                email
809            }
810            ctx {
811                asset
812                products
813                businessUnits
814            }
815            uploadMethod
816        }
817    }
818    """
819
820    # Asset name, business unit context, and creating user are required
821    variables = {
822        "input": {
823            "name": test_name,
824            "createdBy": created_by_user_id,
825            "artifactUnderTest": artifact_id,
826            "testResultFileFormat": test_type,
827            "ctx": {
828                "asset": asset_id,
829                "businessUnits": [business_unit_id]
830            },
831            "tools": tools,
832            "uploadMethod": upload_method.value
833        }
834    }
835
836    if product_id is not None:
837        variables["input"]["ctx"]["products"] = product_id
838
839    response = send_graphql_query(token, organization_context, graphql_query, variables)
840    return response['data']

Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
  • tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
  • upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_binary_analysis( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
843def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
844                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
845                                   upload_method: UploadMethod = UploadMethod.API):
846    """
847    Create a new Test object for uploading files for Finite State Binary Analysis.
848
849    Args:
850        token (str):
851            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
852        organization_context (str):
853            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
854        business_unit_id (str, required):
855            Business Unit ID to associate the Test with.
856        created_by_user_id (str, required):
857            User ID of the user creating the Test.
858        asset_id (str, required):
859            Asset ID to associate the Test with.
860        artifact_id (str, required):
861            Artifact ID to associate the Test with.
862        test_name (str, required):
863            The name of the Test being created.
864        product_id (str, optional):
865            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
866        upload_method (UploadMethod, optional):
867            The method of uploading the test results. Default is UploadMethod.API.
868
869    Raises:
870        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
871        Exception: Raised if the query fails.
872
873    Returns:
874        dict: createTest Object
875    """
876    tools = [
877        {
878            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
879            "name": "Finite State Binary Analysis"
880        }
881    ]
882    return create_test(token, organization_context, business_unit_id=business_unit_id,
883                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
884                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
885                       tools=tools, upload_method=upload_method)

Create a new Test object for uploading files for Finite State Binary Analysis.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_cyclone_dx( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
888def create_test_as_cyclone_dx(
889    token,
890    organization_context,
891    business_unit_id=None,
892    created_by_user_id=None,
893    asset_id=None,
894    artifact_id=None,
895    test_name=None,
896    product_id=None,
897    upload_method: UploadMethod = UploadMethod.API,
898):
899    """
900    Create a new Test object for uploading CycloneDX files.
901
902    Args:
903        token (str):
904            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
905        organization_context (str):
906            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
907        business_unit_id (str, required):
908            Business Unit ID to associate the Test with.
909        created_by_user_id (str, required):
910            User ID of the user creating the Test.
911        asset_id (str, required):
912            Asset ID to associate the Test with.
913        artifact_id (str, required):
914            Artifact ID to associate the Test with.
915        test_name (str, required):
916            The name of the Test being created.
917        product_id (str, optional):
918            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
919        upload_method (UploadMethod, optional):
920            The method of uploading the test results. Default is UploadMethod.API.
921
922    Raises:
923        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
924        Exception: Raised if the query fails.
925
926    Returns:
927        dict: createTest Object
928    """
929    return create_test(token, organization_context, business_unit_id=business_unit_id,
930                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
931                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)

Create a new Test object for uploading CycloneDX files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_third_party_scanner( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
934def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
935                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
936                                       upload_method: UploadMethod = UploadMethod.API):
937    """
938    Create a new Test object for uploading Third Party Scanner files.
939
940    Args:
941        token (str):
942            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
943        organization_context (str):
944            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
945        business_unit_id (str, required):
946            Business Unit ID to associate the Test with.
947        created_by_user_id (str, required):
948            User ID of the user creating the Test.
949        asset_id (str, required):
950            Asset ID to associate the Test with.
951        artifact_id (str, required):
952            Artifact ID to associate the Test with.
953        test_name (str, required):
954            The name of the Test being created.
955        product_id (str, optional):
956            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
957        test_type (str, required):
958            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
959        upload_method (UploadMethod, optional):
960            The method of uploading the test results. Default is UploadMethod.API.
961
962    Raises:
963        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
964        Exception: Raised if the query fails.
965
966    Returns:
967        dict: createTest Object
968    """
969    return create_test(token, organization_context, business_unit_id=business_unit_id,
970                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
971                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)

Create a new Test object for uploading Third Party Scanner files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def download_asset_version_report( token, organization_context, asset_version_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
 974def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 975                                  report_subtype=None, output_filename=None, verbose=False):
 976    """
 977    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 978
 979    Args:
 980        token (str):
 981            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 982        organization_context (str):
 983            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 984        asset_version_id (str, required):
 985            The Asset Version ID to download the report for.
 986        report_type (str, required):
 987            The file type of the report to download. Valid values are "CSV" and "PDF".
 988        report_subtype (str, required):
 989            The type of report to download. Based on available reports for the `report_type` specified
 990            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
 991            Valid values for PDF are "RISK_SUMMARY".
 992        output_filename (str, optional):
 993            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
 994        verbose (bool, optional):
 995            If True, will print additional information to the console. Defaults to False.
 996
 997    Raises:
 998        ValueError: Raised if required parameters are not provided.
 999        Exception: Raised if the query fails.
1000
1001    Returns:
1002        None
1003    """
1004    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
1005                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
1006
1007    # Send an HTTP GET request to the URL
1008    response = requests.get(url)
1009
1010    # Check if the request was successful (status code 200)
1011    if response.status_code == 200:
1012        # Open a local file in binary write mode and write the content to it
1013        if verbose:
1014            print("File downloaded successfully.")
1015        with open(output_filename, 'wb') as file:
1016            file.write(response.content)
1017            if verbose:
1018                print(f'Wrote file to {output_filename}')
1019    else:
1020        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, required): The Asset Version ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def download_product_report( token, organization_context, product_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
1023def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1024                            output_filename=None, verbose=False):
1025    """
1026    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1027
1028    Args:
1029        token (str):
1030            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1031        organization_context (str):
1032            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1033        product_id (str, required):
1034            The Product ID to download the report for.
1035        report_type (str, required):
1036            The file type of the report to download. Valid values are "CSV".
1037        report_subtype (str, required):
1038            The type of report to download. Based on available reports for the `report_type` specified
1039            Valid values for CSV are "ALL_FINDINGS".
1040        output_filename (str, optional):
1041            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1042        verbose (bool, optional):
1043            If True, will print additional information to the console. Defaults to False.
1044    """
1045    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1046                                       report_subtype=report_subtype, verbose=verbose)
1047
1048    # Send an HTTP GET request to the URL
1049    response = requests.get(url)
1050
1051    # Check if the request was successful (status code 200)
1052    if response.status_code == 200:
1053        # Open a local file in binary write mode and write the content to it
1054        if verbose:
1055            print("File downloaded successfully.")
1056        with open(output_filename, 'wb') as file:
1057            file.write(response.content)
1058            if verbose:
1059                print(f'Wrote file to {output_filename}')
1060    else:
1061        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, required): The Product ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
def download_sbom( token, organization_context, sbom_type='CYCLONEDX', sbom_subtype='SBOM_ONLY', asset_version_id=None, output_filename='sbom.json', verbose=False):
1064def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1065                  output_filename="sbom.json", verbose=False):
1066    """
1067    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1068
1069    Args:
1070        token (str):
1071            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1072        organization_context (str):
1073            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1074        sbom_type (str, required):
1075            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1076        sbom_subtype (str, required):
1077            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1078        asset_version_id (str, required):
1079            The Asset Version ID to download the SBOM for.
1080        output_filename (str, required):
1081            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1082        verbose (bool, optional):
1083            If True, will print additional information to the console. Defaults to False.
1084
1085    Raises:
1086        ValueError: Raised if required parameters are not provided.
1087        Exception: Raised if the query fails.
1088
1089    Returns:
1090        None
1091    """
1092    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1093                                     asset_version_id=asset_version_id, verbose=verbose)
1094
1095    # Send an HTTP GET request to the URL
1096    response = requests.get(url)
1097
1098    # Check if the request was successful (status code 200)
1099    if response.status_code == 200:
1100        # Open a local file in binary write mode and write the content to it
1101        if verbose:
1102            print("File downloaded successfully.")
1103        with open(output_filename, 'wb') as file:
1104            file.write(response.content)
1105            if verbose:
1106                print(f'Wrote file to {output_filename}')
1107    else:
1108        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
  • asset_version_id (str, required): The Asset Version ID to download the SBOM for.
  • output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def file_chunks(file_path, chunk_size=1048576000):
1111def file_chunks(file_path, chunk_size=DEFAULT_CHUNK_SIZE):
1112    """
1113    Helper method to read a file in chunks.
1114
1115    Args:
1116        file_path (str):
1117            Local path to the file to read.
1118        chunk_size (int, optional):
1119            The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
1120
1121    Yields:
1122        bytes: The next chunk of the file.
1123
1124    Raises:
1125        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1126    """
1127    with open(file_path, 'rb') as f:
1128        while True:
1129            chunk = f.read(chunk_size)
1130            if chunk:
1131                yield chunk
1132            else:
1133                break

Helper method to read a file in chunks.

Arguments:
  • file_path (str): Local path to the file to read.
  • chunk_size (int, optional): The size of the chunks to read. Defaults to DEFAULT_CHUNK_SIZE.
Yields:

bytes: The next chunk of the file.

Raises:
  • FileIO Exceptions: Raised if the file cannot be opened or read correctly.
def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1136def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1137    """
1138    Get all artifacts in the organization. Uses pagination to get all results.
1139
1140    Args:
1141        token (str):
1142            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1143        organization_context (str):
1144            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1145        artifact_id (str, optional):
1146            An optional Artifact ID if this is used to get a single artifact, by default None
1147        business_unit_id (str, optional):
1148            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1149
1150    Raises:
1151        Exception: Raised if the query fails.
1152
1153    Returns:
1154        list: List of Artifact Objects
1155    """
1156    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1157                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')

Get all artifacts in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
  • business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Artifact Objects

def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1160def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1161    """
1162    Gets all assets in the organization. Uses pagination to get all results.
1163
1164    Args:
1165        token (str):
1166            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1167        organization_context (str):
1168            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1169        asset_id (str, optional):
1170            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1171        business_unit_id (str, optional):
1172            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1173
1174    Raises:
1175        Exception: Raised if the query fails.
1176
1177    Returns:
1178        list: List of Asset Objects
1179    """
1180    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1181                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets all assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_all_asset_versions(token, organization_context):
1184def get_all_asset_versions(token, organization_context):
1185    """
1186    Get all asset versions in the organization. Uses pagination to get all results.
1187
1188    Args:
1189        token (str):
1190            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1191        organization_context (str):
1192            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1193
1194    Raises:
1195        Exception: Raised if the query fails.
1196
1197    Returns:
1198        list: List of AssetVersion Objects
1199    """
1200    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1201                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')

Get all asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_all_asset_versions_for_product(token, organization_context, product_id):
1204def get_all_asset_versions_for_product(token, organization_context, product_id):
1205    """
1206    Get all asset versions for a product. Uses pagination to get all results.
1207
1208    Args:
1209        token (str):
1210            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1211        organization_context (str):
1212            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1213        product_id (str):
1214            The Product ID to get asset versions for
1215
1216    Returns:
1217        list: List of AssetVersion Objects
1218    """
1219    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1220                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Get all asset versions for a product. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str): The Product ID to get asset versions for
Returns:

list: List of AssetVersion Objects

def get_all_business_units(token, organization_context):
1223def get_all_business_units(token, organization_context):
1224    """
1225    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1226
1227    Args:
1228        token (str):
1229            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1230        organization_context (str):
1231            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1232
1233    Raises:
1234        Exception: Raised if the query fails.
1235
1236    Returns:
1237        list: List of Group Objects
1238    """
1239    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1240                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')

Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Group Objects

def get_all_organizations(token, organization_context):
1243def get_all_organizations(token, organization_context):
1244    """
1245    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1246
1247    Args:
1248        token (str):
1249            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1250        organization_context (str):
1251            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1252
1253    Raises:
1254        Exception: Raised if the query fails.
1255
1256    Returns:
1257        list: List of Organization Objects
1258    """
1259    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1260                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')

Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Organization Objects

def get_all_paginated_results( token, organization_context, query, variables=None, field=None, limit=None):
1263def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1264    """
1265    Get all results from a paginated GraphQL query
1266
1267    Args:
1268        token (str):
1269            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1270        organization_context (str):
1271            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1272        query (str):
1273            The GraphQL query string
1274        variables (dict, optional):
1275            Variables to be used in the GraphQL query, by default None
1276        field (str, required):
1277            The field in the response JSON that contains the results
1278        limit (int, Optional):
1279            The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
1280
1281    Raises:
1282        Exception: If the response status code is not 200, or if the field is not in the response JSON
1283
1284    Returns:
1285        list: List of results
1286    """
1287
1288    if not field:
1289        raise Exception("Error: field is required")
1290    if limit and limit > 1000:
1291        raise Exception("Error: limit cannot be greater than 1000")
1292    if limit and limit < 1:
1293        raise Exception("Error: limit cannot be less than 1")
1294    if not variables["first"]:
1295        raise Exception("Error: first is required")
1296    if variables["first"] < 1:
1297        raise Exception("Error: first cannot be less than 1")
1298    if variables["first"] > 1000:
1299        raise Exception("Error: limit cannot be greater than 1000")
1300
1301    # query the API for the first page of results
1302    response_data = send_graphql_query(token, organization_context, query, variables)
1303
1304    # if there are no results, return an empty list
1305    if not response_data:
1306        return []
1307
1308    # create a list to store the results
1309    results = []
1310
1311    # add the first page of results to the list
1312    if field in response_data['data']:
1313        results.extend(response_data['data'][field])
1314    else:
1315        raise Exception(f"Error: {field} not in response JSON")
1316
1317    if len(response_data['data'][field]) > 0:
1318        # get the cursor from the last entry in the list
1319        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1320
1321        while cursor:
1322            if limit and len(results) == limit:
1323                break
1324
1325            variables['after'] = cursor
1326
1327            # add the next page of results to the list
1328            response_data = send_graphql_query(token, organization_context, query, variables)
1329            results.extend(response_data['data'][field])
1330
1331            try:
1332                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1333            except IndexError:
1334                # when there is no additional cursor, stop getting more pages
1335                cursor = None
1336
1337    return results

Get all results from a paginated GraphQL query

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
  • field (str, required): The field in the response JSON that contains the results
  • limit (int, Optional): The maximum number of results to return. By default, None to return all results. Limit cannot be greater than 1000.
Raises:
  • Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:

list: List of results

def get_all_products(token, organization_context):
1340def get_all_products(token, organization_context):
1341    """
1342    Get all products in the organization. Uses pagination to get all results.
1343
1344    Args:
1345        token (str):
1346            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1347        organization_context (str):
1348            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1349
1350    Raises:
1351        Exception: Raised if the query fails.
1352
1353    Returns:
1354        list: List of Product Objects
1355
1356    .. deprecated:: 0.1.4. Use get_products instead.
1357    """
1358    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1359    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1360                                     queries.ALL_PRODUCTS['variables'], 'allProducts')

Get all products in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Product Objects

Deprecated since version 0.1.4. Use get_products instead..

def get_all_users(token, organization_context):
1363def get_all_users(token, organization_context):
1364    """
1365    Get all users in the organization. Uses pagination to get all results.
1366
1367    Args:
1368        token (str):
1369            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1370        organization_context (str):
1371            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1372
1373    Raises:
1374        Exception: Raised if the query fails.
1375
1376    Returns:
1377        list: List of User Objects
1378    """
1379    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1380                                     queries.ALL_USERS['variables'], 'allUsers')

Get all users in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of User Objects

def get_artifact_context(token, organization_context, artifact_id):
1383def get_artifact_context(token, organization_context, artifact_id):
1384    """
1385    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1386
1387    Args:
1388        token (str):
1389            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1390        organization_context (str):
1391            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1392
1393    Raises:
1394        Exception: Raised if the query fails.
1395
1396    Returns:
1397        dict: Artifact Context Object
1398    """
1399    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1400                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1401
1402    return artifact[0]['ctx']

Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

dict: Artifact Context Object

def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1405def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1406    """
1407    Gets assets in the organization. Uses pagination to get all results.
1408
1409    Args:
1410        token (str):
1411            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1412        organization_context (str):
1413            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1414        asset_id (str, optional):
1415            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1416        business_unit_id (str, optional):
1417            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1418
1419    Raises:
1420        Exception: Raised if the query fails.
1421
1422    Returns:
1423        list: List of Asset Objects
1424    """
1425    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1426                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_asset_versions( token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1429def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1430    """
1431    Gets asset versions in the organization. Uses pagination to get all results.
1432
1433    Args:
1434        token (str):
1435            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1436        organization_context (str):
1437            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1438        asset_version_id (str, optional):
1439            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1440        asset_id (str, optional):
1441            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1442        business_unit_id (str, optional):
1443            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1444
1445    Raises:
1446        Exception: Raised if the query fails.
1447
1448    Returns:
1449        list: List of AssetVersion Objects
1450    """
1451    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1452                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1453                                                                             asset_id=asset_id,
1454                                                                             business_unit_id=business_unit_id),
1455                                     'allAssetVersions')

Gets asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
  • asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_auth_token( client_id, client_secret, token_url='https://platform.finitestate.io/api/v1/auth/token', audience='https://platform.finitestate.io/api/v1/graphql'):
1458def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1459    """
1460    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1461
1462    Args:
1463        client_id (str):
1464            CLIENT_ID as specified in the API documentation
1465        client_secret (str):
1466            CLIENT_SECRET as specified in the API documentation
1467        token_url (str, optional):
1468            Token URL, by default TOKEN_URL
1469        audience (str, optional):
1470            Audience, by default AUDIENCE
1471
1472    Raises:
1473        Exception: If the response status code is not 200
1474
1475    Returns:
1476        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1477    """
1478    payload = {
1479        "client_id": client_id,
1480        "client_secret": client_secret,
1481        "audience": AUDIENCE,
1482        "grant_type": "client_credentials"
1483    }
1484
1485    headers = {
1486        'content-type': "application/json"
1487    }
1488
1489    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1490    if response.status_code == 200:
1491        auth_token = response.json()['access_token']
1492    else:
1493        raise Exception(f"Error: {response.status_code} - {response.text}")
1494
1495    return auth_token

Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET

Arguments:
  • client_id (str): CLIENT_ID as specified in the API documentation
  • client_secret (str): CLIENT_SECRET as specified in the API documentation
  • token_url (str, optional): Token URL, by default TOKEN_URL
  • audience (str, optional): Audience, by default AUDIENCE
Raises:
  • Exception: If the response status code is not 200
Returns:

str: Auth token. Use this token as the Authorization header in subsequent API calls.

def get_findings( token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None, severity=None, count=False, limit=None):
1498def get_findings(
1499    token,
1500    organization_context,
1501    asset_version_id=None,
1502    finding_id=None,
1503    category=None,
1504    status=None,
1505    severity=None,
1506    count=False,
1507    limit=None,
1508):
1509    """
1510    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1511    Args:
1512        token (str):
1513            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1514        organization_context (str):
1515            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1516        asset_version_id (str, optional):
1517            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1518        finding_id (str, optional):
1519            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1520        category (str, optional):
1521            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1522            This can be a single string, or an array of values.
1523        status (str, optional):
1524            The status of Findings to return.
1525        severity (str, optional):
1526            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1527        count (bool, optional):
1528            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1529        limit (int, optional):
1530            The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
1531
1532    Raises:
1533        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1534
1535    Returns:
1536        list: List of Finding Objects
1537    """
1538
1539    if limit and limit > 1000:
1540        raise Exception("Error: limit must be less than 1000")
1541    if limit and limit < 1:
1542        raise Exception("Error: limit must be greater than 0")
1543
1544    if count:
1545        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1546                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1547                                                                          finding_id=finding_id, category=category,
1548                                                                          status=status, severity=severity,
1549                                                                          limit=limit))["data"]["_allFindingsMeta"]
1550    else:
1551        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1552                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1553                                                                           finding_id=finding_id, category=category,
1554                                                                           status=status, severity=severity,
1555                                                                           limit=limit), 'allFindings', limit=limit)

Gets all the Findings for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
  • finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
  • category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
  • status (str, optional): The status of Findings to return.
  • severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
  • count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
  • limit (int, optional): The maximum number of findings to return. By default, this is None. Limit must be between 1 and 1000.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Finding Objects

def get_product_asset_versions(token, organization_context, product_id=None):
1558def get_product_asset_versions(token, organization_context, product_id=None):
1559    """
1560    Gets all the asset versions for a product.
1561    Args:
1562        token (str):
1563            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1564        organization_context (str):
1565            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1566        product_id (str, optional):
1567            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1568    Raises:
1569        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1570    Returns:
1571        list: List of AssetVersion Objects
1572    """
1573    if not product_id:
1574        raise Exception("Product ID is required")
1575
1576    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1577                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Gets all the asset versions for a product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of AssetVersion Objects

def get_products( token, organization_context, product_id=None, business_unit_id=None) -> list:
1580def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1581    """
1582    Gets all the products for the specified business unit.
1583    Args:
1584        token (str):
1585            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1586        organization_context (str):
1587            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1588        product_id (str, optional):
1589            Product ID to get. If not provided, will get all products in the organization.
1590        business_unit_id (str, optional):
1591            Business Unit ID to get products for. If not provided, will get all products in the organization.
1592    Raises:
1593        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1594    Returns:
1595        list: List of Product Objects
1596    """
1597
1598    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1599                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1600                                                                       business_unit_id=business_unit_id),
1601                                     'allProducts')

Gets all the products for the specified business unit.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
  • business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Product Objects

def generate_report_download_url( token, organization_context, asset_version_id=None, product_id=None, report_type=None, report_subtype=None, verbose=False) -> str:
1604def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1605                                 report_subtype=None, verbose=False) -> str:
1606    """
1607    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1608    This may take several minutes to complete, depending on the size of the report.
1609
1610    Args:
1611        token (str):
1612            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1613        organization_context (str):
1614            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1615        asset_version_id (str, optional):
1616            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1617        product_id (str, optional):
1618            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1619        report_type (str, required):
1620            The file type of the report to download. Valid values are "CSV" and "PDF".
1621        report_subtype (str, required):
1622            The type of report to download. Based on available reports for the `report_type` specified
1623            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1624            Valid values for PDF are "RISK_SUMMARY".
1625        verbose (bool, optional):
1626            If True, print additional information to the console. Defaults to False.
1627    """
1628    if not report_type:
1629        raise ValueError("Report Type is required")
1630    if not report_subtype:
1631        raise ValueError("Report Subtype is required")
1632    if not asset_version_id and not product_id:
1633        raise ValueError("Asset Version ID or Product ID is required")
1634
1635    if asset_version_id and product_id:
1636        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1637
1638    if report_type not in ["CSV", "PDF"]:
1639        raise Exception(f"Report Type {report_type} not supported")
1640
1641    if report_type == "CSV":
1642        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1643            raise Exception(f"Report Subtype {report_subtype} not supported")
1644
1645        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1646                                                            report_type=report_type, report_subtype=report_subtype)
1647        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1648                                                              report_type=report_type, report_subtype=report_subtype)
1649
1650        response_data = send_graphql_query(token, organization_context, mutation, variables)
1651        if verbose:
1652            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1653
1654        # get exportJobId from the result
1655        if asset_version_id:
1656            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1657        elif product_id:
1658            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1659        else:
1660            raise Exception(
1661                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1662
1663        if verbose:
1664            print(f'Export Job ID: {export_job_id}')
1665
1666    if report_type == "PDF":
1667        if report_subtype not in ["RISK_SUMMARY"]:
1668            raise Exception(f"Report Subtype {report_subtype} not supported")
1669
1670        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1671                                                            report_type=report_type, report_subtype=report_subtype)
1672        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1673                                                              report_type=report_type, report_subtype=report_subtype)
1674
1675        response_data = send_graphql_query(token, organization_context, mutation, variables)
1676        if verbose:
1677            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1678
1679        # get exportJobId from the result
1680        if asset_version_id:
1681            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1682        elif product_id:
1683            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1684        else:
1685            raise Exception(
1686                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1687
1688        if verbose:
1689            print(f'Export Job ID: {export_job_id}')
1690
1691    if not export_job_id:
1692        raise Exception(
1693            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1694
1695    # poll the API until the export job is complete
1696    sleep_time = 10
1697    total_time = 0
1698    if verbose:
1699        print(f'Polling every {sleep_time} seconds for export job to complete')
1700
1701    while True:
1702        time.sleep(sleep_time)
1703        total_time += sleep_time
1704        if verbose:
1705            print(f'Total time elapsed: {total_time} seconds')
1706
1707        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1708        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1709
1710        response_data = send_graphql_query(token, organization_context, query, variables)
1711
1712        if verbose:
1713            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1714
1715        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1716            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1717                if verbose:
1718                    print(
1719                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1720                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to download the report for. Either asset_version_id or product_id are required.
  • product_id (str, optional): Product ID to download the report for. Either asset_version_id or product_id are required.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
def generate_sbom_download_url( token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, verbose=False) -> str:
1723def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1724                               verbose=False) -> str:
1725    """
1726    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1727    This may take several minutes to complete, depending on the size of SBOM.
1728
1729    Args:
1730        token (str):
1731            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1732        organization_context (str):
1733            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1734        sbom_type (str, required):
1735            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1736        sbom_subtype (str, required):
1737            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1738        asset_version_id (str, required):
1739            Asset Version ID to download the SBOM for.
1740        verbose (bool, optional):
1741            If True, print additional information to the console. Defaults to False.
1742
1743    Raises:
1744        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1745        Exception: Raised if the query fails.
1746
1747    Returns:
1748        str: URL to download the SBOM from.
1749    """
1750
1751    if not sbom_type:
1752        raise ValueError("SBOM Type is required")
1753    if not sbom_subtype:
1754        raise ValueError("SBOM Subtype is required")
1755    if not asset_version_id:
1756        raise ValueError("Asset Version ID is required")
1757
1758    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1759        raise Exception(f"SBOM Type {sbom_type} not supported")
1760
1761    if sbom_type == "CYCLONEDX":
1762        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1763            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1764
1765        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1766        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1767
1768        response_data = send_graphql_query(token, organization_context, mutation, variables)
1769        if verbose:
1770            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1771
1772        # get exportJobId from the result
1773        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1774        if verbose:
1775            print(f'Export Job ID: {export_job_id}')
1776
1777    if sbom_type == "SPDX":
1778        if sbom_subtype not in ["SBOM_ONLY"]:
1779            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1780
1781        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1782        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1783
1784        response_data = send_graphql_query(token, organization_context, mutation, variables)
1785        if verbose:
1786            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1787
1788        # get exportJobId from the result
1789        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1790        if verbose:
1791            print(f'Export Job ID: {export_job_id}')
1792
1793    if not export_job_id:
1794        raise Exception(
1795            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1796
1797    # poll the API until the export job is complete
1798    sleep_time = 10
1799    total_time = 0
1800    if verbose:
1801        print(f'Polling every {sleep_time} seconds for export job to complete')
1802    while True:
1803        time.sleep(sleep_time)
1804        total_time += sleep_time
1805        if verbose:
1806            print(f'Total time elapsed: {total_time} seconds')
1807
1808        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1809        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1810
1811        response_data = send_graphql_query(token, organization_context, query, variables)
1812
1813        if verbose:
1814            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1815
1816        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1817            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1818                if verbose:
1819                    print(
1820                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1821                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
  • asset_version_id (str, required): Asset Version ID to download the SBOM for.
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
  • Exception: Raised if the query fails.
Returns:

str: URL to download the SBOM from.

def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1824def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1825    """
1826    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1827    Args:
1828        token (str):
1829            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1830        organization_context (str):
1831            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1832        asset_version_id (str, optional):
1833            Asset Version ID to get software components for.
1834        type (str, optional):
1835            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1836    Raises:
1837        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1838    Returns:
1839        list: List of Software Component Objects
1840    """
1841    if not asset_version_id:
1842        raise Exception("Asset Version ID is required")
1843
1844    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1845                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1846                                                                                  type=type),
1847                                     'allSoftwareComponentInstances')

Gets all the Software Components for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get software components for.
  • type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Software Component Objects

def search_sbom( token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False) -> list:
1850def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1851                case_sensitive=False) -> list:
1852    """
1853    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1854    Search Methods: EXACT or CONTAINS
1855    An exact match will return only the software component whose name matches the name exactly.
1856    A contains match will return all software components whose name contains the search string.
1857
1858    Args:
1859        token (str):
1860            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1861        organization_context (str):
1862            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1863        name (str, required):
1864            Name of the software component to search for.
1865        version (str, optional):
1866            Version of the software component to search for. If not specified, will search for all versions of the software component.
1867        asset_version_id (str, optional):
1868            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1869        search_method (str, optional):
1870            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1871        case_sensitive (bool, optional):
1872            Whether or not to perform a case sensitive search. Defaults to False.
1873    Raises:
1874        ValueError: Raised if name is not provided.
1875        Exception: Raised if the query fails.
1876    Returns:
1877        list: List of SoftwareComponentInstance Objects
1878    """
1879    if asset_version_id:
1880        query = """
1881query GetSoftwareComponentInstances_SDK(
1882    $filter: SoftwareComponentInstanceFilter
1883    $after: String
1884    $first: Int
1885) {
1886    allSoftwareComponentInstances(
1887        filter: $filter
1888        after: $after
1889        first: $first
1890    ) {
1891        _cursor
1892        id
1893        name
1894        version
1895        originalComponents {
1896            id
1897            name
1898            version
1899        }
1900    }
1901}
1902"""
1903    else:
1904        # gets the asset version info that contains the software component
1905        query = """
1906query GetSoftwareComponentInstances_SDK(
1907    $filter: SoftwareComponentInstanceFilter
1908    $after: String
1909    $first: Int
1910) {
1911    allSoftwareComponentInstances(
1912        filter: $filter
1913        after: $after
1914        first: $first
1915    ) {
1916        _cursor
1917        id
1918        name
1919        version
1920        assetVersion {
1921            id
1922            name
1923            asset {
1924                id
1925                name
1926            }
1927        }
1928    }
1929}
1930"""
1931
1932    variables = {
1933        "filter": {
1934            "mergedComponentRefId": None
1935        },
1936        "after": None,
1937        "first": 100
1938    }
1939
1940    if asset_version_id:
1941        variables["filter"]["assetVersionRefId"] = asset_version_id
1942
1943    if search_method == 'EXACT':
1944        if case_sensitive:
1945            variables["filter"]["name"] = name
1946        else:
1947            variables["filter"]["name_like"] = name
1948    elif search_method == 'CONTAINS':
1949        variables["filter"]["name_contains"] = name
1950
1951    if version:
1952        if search_method == 'EXACT':
1953            variables["filter"]["version"] = version
1954        elif search_method == 'CONTAINS':
1955            variables["filter"]["version_contains"] = version
1956
1957    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1958                                        field="allSoftwareComponentInstances")
1959
1960    return records

Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • name (str, required): Name of the software component to search for.
  • version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
  • asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
  • search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
  • case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
  • ValueError: Raised if name is not provided.
  • Exception: Raised if the query fails.
Returns:

list: List of SoftwareComponentInstance Objects

def send_graphql_query(token, organization_context, query, variables=None):
1963def send_graphql_query(token, organization_context, query, variables=None):
1964    """
1965    Send a GraphQL query to the API
1966
1967    Args:
1968        token (str):
1969            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1970        organization_context (str):
1971            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1972        query (str):
1973            The GraphQL query string
1974        variables (dict, optional):
1975            Variables to be used in the GraphQL query, by default None
1976
1977    Raises:
1978        Exception: If the response status code is not 200
1979
1980    Returns:
1981        dict: Response JSON
1982    """
1983    headers = {
1984        'Content-Type': 'application/json',
1985        'Authorization': f'Bearer {token}',
1986        'Organization-Context': organization_context
1987    }
1988    data = {
1989        'query': query,
1990        'variables': variables
1991    }
1992
1993    response = requests.post(API_URL, headers=headers, json=data)
1994
1995    if response.status_code == 200:
1996        thejson = response.json()
1997
1998        if "errors" in thejson:
1999            raise Exception(f"Error: {thejson['errors']}")
2000
2001        return thejson
2002    else:
2003        raise Exception(f"Error: {response.status_code} - {response.text}")

Send a GraphQL query to the API

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
  • Exception: If the response status code is not 200
Returns:

dict: Response JSON

def update_finding_statuses( token, organization_context, user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
2006def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
2007                            justification=None, response=None, comment=None):
2008    """
2009    Updates the status of a findings or multiple findings. This is a blocking call.
2010
2011    Args:
2012        token (str):
2013            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
2014        organization_context (str):
2015            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2016        user_id (str, required):
2017            User ID to update the finding status for.
2018        finding_ids (str, required):
2019            Finding ID to update the status for.
2020        status (str, required):
2021            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
2022        justification (str, optional):
2023            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
2024        response (str, optional):
2025            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
2026        comment (str, optional):
2027            Optional comment to add to the finding status update.
2028
2029    Raises:
2030        ValueError: Raised if required parameters are not provided.
2031        Exception: Raised if the query fails.
2032
2033    Returns:
2034        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
2035    """
2036    if not user_id:
2037        raise ValueError("User Id is required")
2038    if not finding_ids:
2039        raise ValueError("Finding Ids is required")
2040    if not status:
2041        raise ValueError("Status is required")
2042
2043    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2044    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2045                                                             justification=justification, response=response,
2046                                                             comment=comment)
2047
2048    return send_graphql_query(token, organization_context