finite_state_sdk

   1import json
   2from enum import Enum
   3
   4import requests
   5import time
   6from warnings import warn
   7import finite_state_sdk.queries as queries
   8
   9API_URL = 'https://platform.finitestate.io/api/v1/graphql'
  10AUDIENCE = "https://platform.finitestate.io/api/v1/graphql"
  11TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token"
  12
  13
  14class UploadMethod(Enum):
  15    """
  16    Enumeration class representing different upload methods.
  17
  18    Attributes:
  19        WEB_APP_UI: Upload method via web application UI.
  20        API: Upload method via API.
  21        GITHUB_INTEGRATION: Upload method via GitHub integration.
  22        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
  23
  24    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
  25    """
  26    WEB_APP_UI = "WEB_APP_UI"
  27    API = "API"
  28    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
  29    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"
  30
  31
  32def create_artifact(
  33    token,
  34    organization_context,
  35    business_unit_id=None,
  36    created_by_user_id=None,
  37    asset_version_id=None,
  38    artifact_name=None,
  39    product_id=None,
  40):
  41    """
  42    Create a new Artifact.
  43    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
  44    Please see the examples in the Github repository for more information:
  45    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
  46    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
  47
  48    Args:
  49        token (str):
  50            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  51        organization_context (str):
  52            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  53        business_unit_id (str, required):
  54            Business Unit ID to associate the artifact with.
  55        created_by_user_id (str, required):
  56            User ID of the user creating the artifact.
  57        asset_version_id (str, required):
  58            Asset Version ID to associate the artifact with.
  59        artifact_name (str, required):
  60            The name of the Artifact being created.
  61        product_id (str, optional):
  62            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
  63
  64    Raises:
  65        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  66        Exception: Raised if the query fails.
  67
  68    Returns:
  69        dict: createArtifact Object
  70    """
  71    if not business_unit_id:
  72        raise ValueError("Business unit ID is required")
  73    if not created_by_user_id:
  74        raise ValueError("Created by user ID is required")
  75    if not asset_version_id:
  76        raise ValueError("Asset version ID is required")
  77    if not artifact_name:
  78        raise ValueError("Artifact name is required")
  79
  80    graphql_query = '''
  81    mutation CreateArtifactMutation($input: CreateArtifactInput!) {
  82        createArtifact(input: $input) {
  83            id
  84            name
  85            assetVersion {
  86                id
  87                name
  88                asset {
  89                    id
  90                    name
  91                }
  92            }
  93            createdBy {
  94                id
  95                email
  96            }
  97            ctx {
  98                asset
  99                products
 100                businessUnits
 101            }
 102        }
 103    }
 104    '''
 105
 106    # Asset name, business unit context, and creating user are required
 107    variables = {
 108        "input": {
 109            "name": artifact_name,
 110            "createdBy": created_by_user_id,
 111            "assetVersion": asset_version_id,
 112            "ctx": {
 113                "asset": asset_version_id,
 114                "businessUnits": [business_unit_id]
 115            }
 116        }
 117    }
 118
 119    if product_id is not None:
 120        variables["input"]["ctx"]["products"] = product_id
 121
 122    response = send_graphql_query(token, organization_context, graphql_query, variables)
 123    return response['data']
 124
 125
 126def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
 127    """
 128    Create a new Asset.
 129
 130    Args:
 131        token (str):
 132            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 133        organization_context (str):
 134            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 135        business_unit_id (str, required):
 136            Business Unit ID to associate the asset with.
 137        created_by_user_id (str, required):
 138            User ID of the user creating the asset.
 139        asset_name (str, required):
 140            The name of the Asset being created.
 141        product_id (str, optional):
 142            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
 143
 144    Raises:
 145        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
 146        Exception: Raised if the query fails.
 147
 148    Returns:
 149        dict: createAsset Object
 150    """
 151    if not business_unit_id:
 152        raise ValueError("Business unit ID is required")
 153    if not created_by_user_id:
 154        raise ValueError("Created by user ID is required")
 155    if not asset_name:
 156        raise ValueError("Asset name is required")
 157
 158    graphql_query = '''
 159    mutation CreateAssetMutation($input: CreateAssetInput!) {
 160        createAsset(input: $input) {
 161            id
 162            name
 163            dependentProducts {
 164                id
 165                name
 166            }
 167            group {
 168                id
 169                name
 170            }
 171            createdBy {
 172                id
 173                email
 174            }
 175            ctx {
 176                asset
 177                products
 178                businessUnits
 179            }
 180        }
 181    }
 182    '''
 183
 184    # Asset name, business unit context, and creating user are required
 185    variables = {
 186        "input": {
 187            "name": asset_name,
 188            "group": business_unit_id,
 189            "createdBy": created_by_user_id,
 190            "ctx": {
 191                "businessUnits": [business_unit_id]
 192            }
 193        }
 194    }
 195
 196    if product_id is not None:
 197        variables["input"]["ctx"]["products"] = product_id
 198
 199    response = send_graphql_query(token, organization_context, graphql_query, variables)
 200    return response['data']
 201
 202
 203def create_asset_version(
 204    token,
 205    organization_context,
 206    business_unit_id=None,
 207    created_by_user_id=None,
 208    asset_id=None,
 209    asset_version_name=None,
 210    product_id=None,
 211):
 212    """
 213    Create a new Asset Version.
 214
 215    Args:
 216        token (str):
 217            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 218        organization_context (str):
 219            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 220        business_unit_id (str, required):
 221            Business Unit ID to associate the asset version with.
 222        created_by_user_id (str, required):
 223            User ID of the user creating the asset version.
 224        asset_id (str, required):
 225            Asset ID to associate the asset version with.
 226        asset_version_name (str, required):
 227            The name of the Asset Version being created.
 228        product_id (str, optional):
 229            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
 230
 231    Raises:
 232        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 233        Exception: Raised if the query fails.
 234
 235    Returns:
 236        dict: createAssetVersion Object
 237
 238    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
 239    """
 240    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
 241    if not business_unit_id:
 242        raise ValueError("Business unit ID is required")
 243    if not created_by_user_id:
 244        raise ValueError("Created by user ID is required")
 245    if not asset_id:
 246        raise ValueError("Asset ID is required")
 247    if not asset_version_name:
 248        raise ValueError("Asset version name is required")
 249
 250    graphql_query = '''
 251    mutation CreateAssetVersionMutation($input: CreateAssetVersionInput!) {
 252        createAssetVersion(input: $input) {
 253            id
 254            name
 255            asset {
 256                id
 257                name
 258            }
 259            createdBy {
 260                id
 261                email
 262            }
 263            ctx {
 264                asset
 265                products
 266                businessUnits
 267            }
 268        }
 269    }
 270    '''
 271
 272    # Asset name, business unit context, and creating user are required
 273    variables = {
 274        "input": {
 275            "name": asset_version_name,
 276            "createdBy": created_by_user_id,
 277            "asset": asset_id,
 278            "ctx": {
 279                "asset": asset_id,
 280                "businessUnits": [business_unit_id]
 281            }
 282        }
 283    }
 284
 285    if product_id is not None:
 286        variables["input"]["ctx"]["products"] = product_id
 287
 288    response = send_graphql_query(token, organization_context, graphql_query, variables)
 289    return response['data']
 290
 291
 292def create_asset_version_on_asset(
 293    token,
 294    organization_context,
 295    created_by_user_id=None,
 296    asset_id=None,
 297    asset_version_name=None,
 298):
 299    """
 300    Create a new Asset Version.
 301
 302    Args:
 303        token (str):
 304            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 305        organization_context (str):
 306            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 307        created_by_user_id (str, optional):
 308            User ID of the user creating the asset version.
 309        asset_id (str, required):
 310            Asset ID to associate the asset version with.
 311        asset_version_name (str, required):
 312            The name of the Asset Version being created.
 313
 314    Raises:
 315        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
 316        Exception: Raised if the query fails.
 317
 318    Returns:
 319        dict: createAssetVersion Object
 320    """
 321    if not asset_id:
 322        raise ValueError("Asset ID is required")
 323    if not asset_version_name:
 324        raise ValueError("Asset version name is required")
 325
 326    graphql_query = '''
 327        mutation BapiCreateAssetVersion($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) {
 328            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) {
 329                id
 330                assetVersion {
 331                    id
 332                }
 333            }
 334        }
 335    '''
 336
 337    # Asset name, business unit context, and creating user are required
 338    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
 339
 340    if created_by_user_id:
 341        variables["createdByUserId"] = created_by_user_id
 342
 343    response = send_graphql_query(token, organization_context, graphql_query, variables)
 344    return response['data']
 345
 346
 347def create_new_asset_version_artifact_and_test_for_upload(
 348    token,
 349    organization_context,
 350    business_unit_id=None,
 351    created_by_user_id=None,
 352    asset_id=None,
 353    version=None,
 354    product_id=None,
 355    test_type=None,
 356    artifact_description=None,
 357    upload_method: UploadMethod = UploadMethod.API,
 358):
 359    """
 360    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
 361    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
 362
 363    Args:
 364        token (str):
 365            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 366        organization_context (str):
 367            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 368        business_unit_id (str, optional):
 369            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
 370        created_by_user_id (str, optional):
 371            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
 372        asset_id (str, required):
 373            Asset ID to create the asset version for. If not provided, the default asset will be used.
 374        version (str, required):
 375            Version to create the asset version for.
 376        product_id (str, optional):
 377            Product ID to create the entities for. If not provided, the default product will be used.
 378        test_type (str, required):
 379            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
 380        artifact_description (str, optional):
 381            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
 382        upload_method (UploadMethod, optional):
 383            The method of uploading the test results. Default is UploadMethod.API.
 384
 385
 386    Raises:
 387        ValueError: Raised if asset_id or version are not provided.
 388        Exception: Raised if the query fails.
 389
 390    Returns:
 391        str: The Test ID of the newly created test that is used for uploading the file.
 392    """
 393    if not asset_id:
 394        raise ValueError("Asset ID is required")
 395    if not version:
 396        raise ValueError("Version is required")
 397
 398    assets = get_all_assets(token, organization_context, asset_id=asset_id)
 399    asset = assets[0]
 400
 401    # get the asset name
 402    asset_name = asset['name']
 403
 404    # get the existing asset product IDs
 405    asset_product_ids = asset['ctx']['products']
 406
 407    # get the asset product ID
 408    if product_id and product_id not in asset_product_ids:
 409        asset_product_ids.append(product_id)
 410
 411    # if business_unit_id or created_by_user_id are not provided, get the existing asset
 412    if not business_unit_id or not created_by_user_id:
 413        if not business_unit_id:
 414            business_unit_id = asset['group']['id']
 415        if not created_by_user_id:
 416            created_by_user_id = asset['createdBy']['id']
 417
 418        if not business_unit_id:
 419            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
 420        if not created_by_user_id:
 421            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
 422
 423    # create the asset version
 424    response = create_asset_version_on_asset(
 425        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version
 426    )
 427    # get the asset version ID
 428    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
 429
 430    # create the test
 431    if test_type == "finite_state_binary_analysis":
 432        # create the artifact
 433        if not artifact_description:
 434            artifact_description = "Binary"
 435        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
 436        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 437                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 438                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
 439
 440        # get the artifact ID
 441        binary_artifact_id = response['createArtifact']['id']
 442
 443        # create the test
 444        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
 445        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
 446                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
 447                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
 448                                                  test_name=test_name, upload_method=upload_method)
 449        test_id = response['createTest']['id']
 450        return test_id
 451
 452    else:
 453        # create the artifact
 454        if not artifact_description:
 455            artifact_description = "Unspecified Artifact"
 456        artifact_name = f"{asset_name} {version} - {artifact_description}"
 457        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
 458                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
 459                                   artifact_name=artifact_name, product_id=asset_product_ids)
 460
 461        # get the artifact ID
 462        binary_artifact_id = response['createArtifact']['id']
 463
 464        # create the test
 465        test_name = f"{asset_name} {version} - {test_type}"
 466        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
 467                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
 468                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
 469                                                      test_name=test_name, test_type=test_type,
 470                                                      upload_method=upload_method)
 471        test_id = response['createTest']['id']
 472        return test_id
 473
 474
 475def create_new_asset_version_and_upload_binary(
 476    token,
 477    organization_context,
 478    business_unit_id=None,
 479    created_by_user_id=None,
 480    asset_id=None,
 481    version=None,
 482    file_path=None,
 483    product_id=None,
 484    artifact_description=None,
 485    quick_scan=False,
 486    upload_method: UploadMethod = UploadMethod.API,
 487):
 488    """
 489    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
 490    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 491
 492    Args:
 493        token (str):
 494            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 495        organization_context (str):
 496            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 497        business_unit_id (str, optional):
 498            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 499        created_by_user_id (str, optional):
 500            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 501        asset_id (str, required):
 502            Asset ID to create the asset version for.
 503        version (str, required):
 504            Version to create the asset version for.
 505        file_path (str, required):
 506            Local path to the file to upload.
 507        product_id (str, optional):
 508            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
 509        artifact_description (str, optional):
 510            Description of the artifact. If not provided, the default is "Firmware Binary".
 511        quick_scan (bool, optional):
 512            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
 513        upload_method (UploadMethod, optional):
 514            The method of uploading the test results. Default is UploadMethod.API.
 515
 516    Raises:
 517        ValueError: Raised if asset_id, version, or file_path are not provided.
 518        Exception: Raised if any of the queries fail.
 519
 520    Returns:
 521        dict: The response from the GraphQL query, a createAssetVersion Object.
 522    """
 523    if not asset_id:
 524        raise ValueError("Asset ID is required")
 525    if not version:
 526        raise ValueError("Version is required")
 527    if not file_path:
 528        raise ValueError("File path is required")
 529
 530    # create the asset version and binary test
 531    if not artifact_description:
 532        artifact_description = "Firmware Binary"
 533    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
 534        token,
 535        organization_context,
 536        business_unit_id=business_unit_id,
 537        created_by_user_id=created_by_user_id,
 538        asset_id=asset_id,
 539        version=version,
 540        product_id=product_id,
 541        test_type="finite_state_binary_analysis",
 542        artifact_description=artifact_description,
 543        upload_method=upload_method,
 544    )
 545
 546    # upload file for binary test
 547    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
 548                                               quick_scan=quick_scan)
 549    return response
 550
 551
 552def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
 553                                                     created_by_user_id=None, asset_id=None, version=None,
 554                                                     file_path=None, product_id=None, test_type=None,
 555                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
 556    """
 557    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
 558    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
 559
 560    Args:
 561        token (str):
 562            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 563        organization_context (str):
 564            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 565        business_unit_id (str, optional):
 566            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
 567        created_by_user_id (str, optional):
 568            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
 569        asset_id (str, required):
 570            Asset ID to create the asset version for.
 571        version (str, required):
 572            Version to create the asset version for.
 573        file_path (str, required):
 574            Path to the test results file to upload.
 575        product_id (str, optional):
 576            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
 577        test_type (str, required):
 578            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
 579        artifact_description (str, optional):
 580            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
 581        upload_method (UploadMethod, optional):
 582            The method of uploading the test results. Default is UploadMethod.API.
 583
 584    Raises:
 585        ValueError: If the asset_id, version, or file_path are not provided.
 586        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
 587
 588    Returns:
 589        dict: The response from the GraphQL query, a createAssetVersion Object.
 590    """
 591    if not asset_id:
 592        raise ValueError("Asset ID is required")
 593    if not version:
 594        raise ValueError("Version is required")
 595    if not file_path:
 596        raise ValueError("File path is required")
 597    if not test_type:
 598        raise ValueError("Test type is required")
 599
 600    # create the asset version and test
 601    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
 602                                                                    business_unit_id=business_unit_id,
 603                                                                    created_by_user_id=created_by_user_id,
 604                                                                    asset_id=asset_id, version=version,
 605                                                                    product_id=product_id, test_type=test_type,
 606                                                                    artifact_description=artifact_description,
 607                                                                    upload_method=upload_method)
 608
 609    # upload test results file
 610    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
 611    return response
 612
 613
 614def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
 615                   product_description=None, vendor_id=None, vendor_name=None):
 616    """
 617    Create a new Product.
 618
 619    Args:
 620        token (str):
 621            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 622        organization_context (str):
 623            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 624        business_unit_id (str, required):
 625            Business Unit ID to associate the product with.
 626        created_by_user_id (str, required):
 627            User ID of the user creating the product.
 628        product_name (str, required):
 629            The name of the Product being created.
 630        product_description (str, optional):
 631            The description of the Product being created.
 632        vendor_id (str, optional):
 633            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
 634        vendor_name (str, optional):
 635            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
 636
 637    Raises:
 638        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
 639        Exception: Raised if the query fails.
 640
 641    Returns:
 642        dict: createProduct Object
 643    """
 644
 645    if not business_unit_id:
 646        raise ValueError("Business unit ID is required")
 647    if not created_by_user_id:
 648        raise ValueError("Created by user ID is required")
 649    if not product_name:
 650        raise ValueError("Product name is required")
 651
 652    graphql_query = '''
 653    mutation CreateProductMutation($input: CreateProductInput!) {
 654        createProduct(input: $input) {
 655            id
 656            name
 657            vendor {
 658                name
 659            }
 660            group {
 661                id
 662                name
 663            }
 664            createdBy {
 665                id
 666                email
 667            }
 668            ctx {
 669                businessUnit
 670            }
 671        }
 672    }
 673    '''
 674
 675    # Product name, business unit context, and creating user are required
 676    variables = {
 677        "input": {
 678            "name": product_name,
 679            "group": business_unit_id,
 680            "createdBy": created_by_user_id,
 681            "ctx": {
 682                "businessUnit": business_unit_id
 683            }
 684        }
 685    }
 686
 687    if product_description is not None:
 688        variables["input"]["description"] = product_description
 689
 690    # If the vendor ID is specified, this will link the new product to the existing vendor
 691    if vendor_id is not None:
 692        variables["input"]["vendor"] = {
 693            "id": vendor_id
 694        }
 695
 696    # If the vendor name is specified, this will create a new vendor and link it to the new product
 697    if vendor_name is not None:
 698        variables["input"]["createVendor"] = {
 699            "name": vendor_name
 700        }
 701
 702    response = send_graphql_query(token, organization_context, graphql_query, variables)
 703
 704    return response['data']
 705
 706
 707def create_test(
 708    token,
 709    organization_context,
 710    business_unit_id=None,
 711    created_by_user_id=None,
 712    asset_id=None,
 713    artifact_id=None,
 714    test_name=None,
 715    product_id=None,
 716    test_type=None,
 717    tools=[],
 718    upload_method: UploadMethod = UploadMethod.API,
 719):
 720    """
 721    Create a new Test object for uploading files.
 722    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 723    Please see the examples in the Github repository for more information:
 724    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 725    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 726
 727    Args:
 728        token (str):
 729            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 730        organization_context (str):
 731            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 732        business_unit_id (str, required):
 733            Business Unit ID to associate the Test with.
 734        created_by_user_id (str, required):
 735            User ID of the user creating the Test.
 736        asset_id (str, required):
 737            Asset ID to associate the Test with.
 738        artifact_id (str, required):
 739            Artifact ID to associate the Test with.
 740        test_name (str, required):
 741            The name of the Test being created.
 742        product_id (str, optional):
 743            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 744        test_type (str, required):
 745            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
 746        tools (list, optional):
 747            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
 748        upload_method (UploadMethod, required):
 749            The method of uploading the test results.
 750
 751    Raises:
 752        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
 753        Exception: Raised if the query fails.
 754
 755    Returns:
 756        dict: createTest Object
 757    """
 758    if not business_unit_id:
 759        raise ValueError("Business unit ID is required")
 760    if not created_by_user_id:
 761        raise ValueError("Created by user ID is required")
 762    if not asset_id:
 763        raise ValueError("Asset ID is required")
 764    if not artifact_id:
 765        raise ValueError("Artifact ID is required")
 766    if not test_name:
 767        raise ValueError("Test name is required")
 768    if not test_type:
 769        raise ValueError("Test type is required")
 770
 771    graphql_query = '''
 772    mutation CreateTestMutation($input: CreateTestInput!) {
 773        createTest(input: $input) {
 774            id
 775            name
 776            artifactUnderTest {
 777                id
 778                name
 779                assetVersion {
 780                    id
 781                    name
 782                    asset {
 783                        id
 784                        name
 785                        dependentProducts {
 786                            id
 787                            name
 788                        }
 789                    }
 790                }
 791            }
 792            createdBy {
 793                id
 794                email
 795            }
 796            ctx {
 797                asset
 798                products
 799                businessUnits
 800            }
 801            uploadMethod
 802        }
 803    }
 804    '''
 805
 806    # Asset name, business unit context, and creating user are required
 807    variables = {
 808        "input": {
 809            "name": test_name,
 810            "createdBy": created_by_user_id,
 811            "artifactUnderTest": artifact_id,
 812            "testResultFileFormat": test_type,
 813            "ctx": {
 814                "asset": asset_id,
 815                "businessUnits": [business_unit_id]
 816            },
 817            "tools": tools,
 818            "uploadMethod": upload_method.value
 819        }
 820    }
 821
 822    if product_id is not None:
 823        variables["input"]["ctx"]["products"] = product_id
 824
 825    response = send_graphql_query(token, organization_context, graphql_query, variables)
 826    return response['data']
 827
 828
 829def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
 830                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
 831                                   upload_method: UploadMethod = UploadMethod.API):
 832    """
 833    Create a new Test object for uploading files for Finite State Binary Analysis.
 834
 835    Args:
 836        token (str):
 837            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 838        organization_context (str):
 839            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 840        business_unit_id (str, required):
 841            Business Unit ID to associate the Test with.
 842        created_by_user_id (str, required):
 843            User ID of the user creating the Test.
 844        asset_id (str, required):
 845            Asset ID to associate the Test with.
 846        artifact_id (str, required):
 847            Artifact ID to associate the Test with.
 848        test_name (str, required):
 849            The name of the Test being created.
 850        product_id (str, optional):
 851            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 852        upload_method (UploadMethod, optional):
 853            The method of uploading the test results. Default is UploadMethod.API.
 854
 855    Raises:
 856        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 857        Exception: Raised if the query fails.
 858
 859    Returns:
 860        dict: createTest Object
 861    """
 862    tools = [
 863        {
 864            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
 865            "name": "Finite State Binary Analysis"
 866        }
 867    ]
 868    return create_test(token, organization_context, business_unit_id=business_unit_id,
 869                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 870                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
 871                       tools=tools, upload_method=upload_method)
 872
 873
 874def create_test_as_cyclone_dx(
 875    token,
 876    organization_context,
 877    business_unit_id=None,
 878    created_by_user_id=None,
 879    asset_id=None,
 880    artifact_id=None,
 881    test_name=None,
 882    product_id=None,
 883    upload_method: UploadMethod = UploadMethod.API,
 884):
 885    """
 886    Create a new Test object for uploading CycloneDX files.
 887
 888    Args:
 889        token (str):
 890            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 891        organization_context (str):
 892            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 893        business_unit_id (str, required):
 894            Business Unit ID to associate the Test with.
 895        created_by_user_id (str, required):
 896            User ID of the user creating the Test.
 897        asset_id (str, required):
 898            Asset ID to associate the Test with.
 899        artifact_id (str, required):
 900            Artifact ID to associate the Test with.
 901        test_name (str, required):
 902            The name of the Test being created.
 903        product_id (str, optional):
 904            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 905        upload_method (UploadMethod, optional):
 906            The method of uploading the test results. Default is UploadMethod.API.
 907
 908    Raises:
 909        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 910        Exception: Raised if the query fails.
 911
 912    Returns:
 913        dict: createTest Object
 914    """
 915    return create_test(token, organization_context, business_unit_id=business_unit_id,
 916                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 917                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)
 918
 919
 920def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
 921                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
 922                                       upload_method: UploadMethod = UploadMethod.API):
 923    """
 924    Create a new Test object for uploading Third Party Scanner files.
 925
 926    Args:
 927        token (str):
 928            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 929        organization_context (str):
 930            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 931        business_unit_id (str, required):
 932            Business Unit ID to associate the Test with.
 933        created_by_user_id (str, required):
 934            User ID of the user creating the Test.
 935        asset_id (str, required):
 936            Asset ID to associate the Test with.
 937        artifact_id (str, required):
 938            Artifact ID to associate the Test with.
 939        test_name (str, required):
 940            The name of the Test being created.
 941        product_id (str, optional):
 942            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
 943        test_type (str, required):
 944            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
 945        upload_method (UploadMethod, optional):
 946            The method of uploading the test results. Default is UploadMethod.API.
 947
 948    Raises:
 949        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
 950        Exception: Raised if the query fails.
 951
 952    Returns:
 953        dict: createTest Object
 954    """
 955    return create_test(token, organization_context, business_unit_id=business_unit_id,
 956                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
 957                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)
 958
 959
 960def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 961                                  report_subtype=None, output_filename=None, verbose=False):
 962    """
 963    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 964
 965    Args:
 966        token (str):
 967            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 968        organization_context (str):
 969            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 970        asset_version_id (str, required):
 971            The Asset Version ID to download the report for.
 972        report_type (str, required):
 973            The file type of the report to download. Valid values are "CSV" and "PDF".
 974        report_subtype (str, required):
 975            The type of report to download. Based on available reports for the `report_type` specified
 976            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
 977            Valid values for PDF are "RISK_SUMMARY".
 978        output_filename (str, optional):
 979            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
 980        verbose (bool, optional):
 981            If True, will print additional information to the console. Defaults to False.
 982
 983    Raises:
 984        ValueError: Raised if required parameters are not provided.
 985        Exception: Raised if the query fails.
 986
 987    Returns:
 988        None
 989    """
 990    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
 991                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
 992
 993    # Send an HTTP GET request to the URL
 994    response = requests.get(url)
 995
 996    # Check if the request was successful (status code 200)
 997    if response.status_code == 200:
 998        # Open a local file in binary write mode and write the content to it
 999        if verbose:
1000            print("File downloaded successfully.")
1001        with open(output_filename, 'wb') as file:
1002            file.write(response.content)
1003            if verbose:
1004                print(f'Wrote file to {output_filename}')
1005    else:
1006        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1007
1008
1009def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1010                            output_filename=None, verbose=False):
1011    """
1012    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1013
1014    Args:
1015        token (str):
1016            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1017        organization_context (str):
1018            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1019        product_id (str, required):
1020            The Product ID to download the report for.
1021        report_type (str, required):
1022            The file type of the report to download. Valid values are "CSV".
1023        report_subtype (str, required):
1024            The type of report to download. Based on available reports for the `report_type` specified
1025            Valid values for CSV are "ALL_FINDINGS".
1026        output_filename (str, optional):
1027            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1028        verbose (bool, optional):
1029            If True, will print additional information to the console. Defaults to False.
1030    """
1031    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1032                                       report_subtype=report_subtype, verbose=verbose)
1033
1034    # Send an HTTP GET request to the URL
1035    response = requests.get(url)
1036
1037    # Check if the request was successful (status code 200)
1038    if response.status_code == 200:
1039        # Open a local file in binary write mode and write the content to it
1040        if verbose:
1041            print("File downloaded successfully.")
1042        with open(output_filename, 'wb') as file:
1043            file.write(response.content)
1044            if verbose:
1045                print(f'Wrote file to {output_filename}')
1046    else:
1047        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1048
1049
1050def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1051                  output_filename="sbom.json", verbose=False):
1052    """
1053    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1054
1055    Args:
1056        token (str):
1057            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1058        organization_context (str):
1059            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1060        sbom_type (str, required):
1061            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1062        sbom_subtype (str, required):
1063            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1064        asset_version_id (str, required):
1065            The Asset Version ID to download the SBOM for.
1066        output_filename (str, required):
1067            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1068        verbose (bool, optional):
1069            If True, will print additional information to the console. Defaults to False.
1070
1071    Raises:
1072        ValueError: Raised if required parameters are not provided.
1073        Exception: Raised if the query fails.
1074
1075    Returns:
1076        None
1077    """
1078    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1079                                     asset_version_id=asset_version_id, verbose=verbose)
1080
1081    # Send an HTTP GET request to the URL
1082    response = requests.get(url)
1083
1084    # Check if the request was successful (status code 200)
1085    if response.status_code == 200:
1086        # Open a local file in binary write mode and write the content to it
1087        if verbose:
1088            print("File downloaded successfully.")
1089        with open(output_filename, 'wb') as file:
1090            file.write(response.content)
1091            if verbose:
1092                print(f'Wrote file to {output_filename}')
1093    else:
1094        raise Exception(f"Failed to download the file. Status code: {response.status_code}")
1095
1096
1097def file_chunks(file_path, chunk_size=1024 * 1024 * 1024 * 5):
1098    """
1099    Helper method to read a file in chunks.
1100
1101    Args:
1102        file_path (str):
1103            Local path to the file to read.
1104        chunk_size (int, optional):
1105            The size of the chunks to read. Defaults to 5GB.
1106
1107    Yields:
1108        bytes: The next chunk of the file.
1109
1110    Raises:
1111        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1112    """
1113    with open(file_path, 'rb') as f:
1114        while True:
1115            chunk = f.read(chunk_size)
1116            if chunk:
1117                yield chunk
1118            else:
1119                break
1120
1121
1122def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1123    """
1124    Get all artifacts in the organization. Uses pagination to get all results.
1125
1126    Args:
1127        token (str):
1128            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1129        organization_context (str):
1130            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1131        artifact_id (str, optional):
1132            An optional Artifact ID if this is used to get a single artifact, by default None
1133        business_unit_id (str, optional):
1134            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1135
1136    Raises:
1137        Exception: Raised if the query fails.
1138
1139    Returns:
1140        list: List of Artifact Objects
1141    """
1142    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1143                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')
1144
1145
1146def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1147    """
1148    Gets all assets in the organization. Uses pagination to get all results.
1149
1150    Args:
1151        token (str):
1152            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1153        organization_context (str):
1154            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1155        asset_id (str, optional):
1156            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1157        business_unit_id (str, optional):
1158            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1159
1160    Raises:
1161        Exception: Raised if the query fails.
1162
1163    Returns:
1164        list: List of Asset Objects
1165    """
1166    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1167                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1168
1169
1170def get_all_asset_versions(token, organization_context):
1171    """
1172    Get all asset versions in the organization. Uses pagination to get all results.
1173
1174    Args:
1175        token (str):
1176            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1177        organization_context (str):
1178            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1179
1180    Raises:
1181        Exception: Raised if the query fails.
1182
1183    Returns:
1184        list: List of AssetVersion Objects
1185    """
1186    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1187                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')
1188
1189
1190def get_all_asset_versions_for_product(token, organization_context, product_id):
1191    """
1192    Get all asset versions for a product. Uses pagination to get all results.
1193
1194    Args:
1195        token (str):
1196            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1197        organization_context (str):
1198            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1199        product_id (str):
1200            The Product ID to get asset versions for
1201
1202    Returns:
1203        list: List of AssetVersion Objects
1204    """
1205    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1206                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1207
1208
1209def get_all_business_units(token, organization_context):
1210    """
1211    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1212
1213    Args:
1214        token (str):
1215            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1216        organization_context (str):
1217            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1218
1219    Raises:
1220        Exception: Raised if the query fails.
1221
1222    Returns:
1223        list: List of Group Objects
1224    """
1225    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1226                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')
1227
1228
1229def get_all_organizations(token, organization_context):
1230    """
1231    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1232
1233    Args:
1234        token (str):
1235            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1236        organization_context (str):
1237            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1238
1239    Raises:
1240        Exception: Raised if the query fails.
1241
1242    Returns:
1243        list: List of Organization Objects
1244    """
1245    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1246                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')
1247
1248
1249def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1250    """
1251    Get all results from a paginated GraphQL query
1252
1253    Args:
1254        token (str):
1255            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1256        organization_context (str):
1257            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1258        query (str):
1259            The GraphQL query string
1260        variables (dict, optional):
1261            Variables to be used in the GraphQL query, by default None
1262        field (str, required):
1263            The field in the response JSON that contains the results
1264        limit (int, optional):
1265            The maximum number of results to return. If not provided, will return all results. By default None
1266
1267    Raises:
1268        Exception: If the response status code is not 200, or if the field is not in the response JSON
1269
1270    Returns:
1271        list: List of results
1272    """
1273
1274    if not field:
1275        raise Exception("Error: field is required")
1276
1277    # query the API for the first page of results
1278    response_data = send_graphql_query(token, organization_context, query, variables)
1279
1280    # if there are no results, return an empty list
1281    if not response_data:
1282        return []
1283
1284    # create a list to store the results
1285    results = []
1286
1287    # add the first page of results to the list
1288    if field in response_data['data']:
1289        results.extend(response_data['data'][field])
1290    else:
1291        raise Exception(f"Error: {field} not in response JSON")
1292
1293    if len(response_data['data'][field]) > 0:
1294        # get the cursor from the last entry in the list
1295        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1296
1297        while cursor:
1298            if limit is not None:
1299                if len(results) >= limit:
1300                    break
1301
1302            variables['after'] = cursor
1303
1304            # add the next page of results to the list
1305            response_data = send_graphql_query(token, organization_context, query, variables)
1306            results.extend(response_data['data'][field])
1307
1308            try:
1309                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1310            except IndexError:
1311                # when there is no additional cursor, stop getting more pages
1312                cursor = None
1313
1314    return results
1315
1316
1317def get_all_products(token, organization_context):
1318    """
1319    Get all products in the organization. Uses pagination to get all results.
1320
1321    Args:
1322        token (str):
1323            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1324        organization_context (str):
1325            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1326
1327    Raises:
1328        Exception: Raised if the query fails.
1329
1330    Returns:
1331        list: List of Product Objects
1332
1333    .. deprecated:: 0.1.4. Use get_products instead.
1334    """
1335    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1336    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1337                                     queries.ALL_PRODUCTS['variables'], 'allProducts')
1338
1339
1340def get_all_users(token, organization_context):
1341    """
1342    Get all users in the organization. Uses pagination to get all results.
1343
1344    Args:
1345        token (str):
1346            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1347        organization_context (str):
1348            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1349
1350    Raises:
1351        Exception: Raised if the query fails.
1352
1353    Returns:
1354        list: List of User Objects
1355    """
1356    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1357                                     queries.ALL_USERS['variables'], 'allUsers')
1358
1359
1360def get_artifact_context(token, organization_context, artifact_id):
1361    """
1362    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1363
1364    Args:
1365        token (str):
1366            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1367        organization_context (str):
1368            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1369
1370    Raises:
1371        Exception: Raised if the query fails.
1372
1373    Returns:
1374        dict: Artifact Context Object
1375    """
1376    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1377                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1378
1379    return artifact[0]['ctx']
1380
1381
1382def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1383    """
1384    Gets assets in the organization. Uses pagination to get all results.
1385
1386    Args:
1387        token (str):
1388            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1389        organization_context (str):
1390            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1391        asset_id (str, optional):
1392            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1393        business_unit_id (str, optional):
1394            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1395
1396    Raises:
1397        Exception: Raised if the query fails.
1398
1399    Returns:
1400        list: List of Asset Objects
1401    """
1402    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1403                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')
1404
1405
1406def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1407    """
1408    Gets asset versions in the organization. Uses pagination to get all results.
1409
1410    Args:
1411        token (str):
1412            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1413        organization_context (str):
1414            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1415        asset_version_id (str, optional):
1416            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1417        asset_id (str, optional):
1418            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1419        business_unit_id (str, optional):
1420            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1421
1422    Raises:
1423        Exception: Raised if the query fails.
1424
1425    Returns:
1426        list: List of AssetVersion Objects
1427    """
1428    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1429                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1430                                                                             asset_id=asset_id,
1431                                                                             business_unit_id=business_unit_id),
1432                                     'allAssetVersions')
1433
1434
1435def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1436    """
1437    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1438
1439    Args:
1440        client_id (str):
1441            CLIENT_ID as specified in the API documentation
1442        client_secret (str):
1443            CLIENT_SECRET as specified in the API documentation
1444        token_url (str, optional):
1445            Token URL, by default TOKEN_URL
1446        audience (str, optional):
1447            Audience, by default AUDIENCE
1448
1449    Raises:
1450        Exception: If the response status code is not 200
1451
1452    Returns:
1453        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1454    """
1455    payload = {
1456        "client_id": client_id,
1457        "client_secret": client_secret,
1458        "audience": AUDIENCE,
1459        "grant_type": "client_credentials"
1460    }
1461
1462    headers = {
1463        'content-type': "application/json"
1464    }
1465
1466    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1467    if response.status_code == 200:
1468        auth_token = response.json()['access_token']
1469    else:
1470        raise Exception(f"Error: {response.status_code} - {response.text}")
1471
1472    return auth_token
1473
1474
1475def get_findings(token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None,
1476                 severity=None, count=False, limit=None):
1477    """
1478    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1479    Args:
1480        token (str):
1481            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1482        organization_context (str):
1483            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1484        asset_version_id (str, optional):
1485            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1486        finding_id (str, optional):
1487            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1488        category (str, optional):
1489            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1490            This can be a single string, or an array of values.
1491        status (str, optional):
1492            The status of Findings to return.
1493        severity (str, optional):
1494            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1495        count (bool, optional):
1496            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1497        limit (int, optional):
1498            The maximum number of findings to return. If not specified, will return all findings, up to the default of 1000.
1499
1500    Raises:
1501        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1502
1503    Returns:
1504        list: List of Finding Objects
1505    """
1506
1507    if count:
1508        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1509                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1510                                                                          finding_id=finding_id, category=category,
1511                                                                          status=status, severity=severity,
1512                                                                          limit=limit))["data"]["_allFindingsMeta"]
1513    else:
1514        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1515                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1516                                                                           finding_id=finding_id, category=category,
1517                                                                           status=status, severity=severity,
1518                                                                           limit=limit), 'allFindings', limit=limit)
1519
1520
1521def get_product_asset_versions(token, organization_context, product_id=None):
1522    """
1523    Gets all the asset versions for a product.
1524    Args:
1525        token (str):
1526            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1527        organization_context (str):
1528            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1529        product_id (str, optional):
1530            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1531    Raises:
1532        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1533    Returns:
1534        list: List of AssetVersion Objects
1535    """
1536    if not product_id:
1537        raise Exception("Product ID is required")
1538
1539    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1540                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')
1541
1542
1543def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1544    """
1545    Gets all the products for the specified business unit.
1546    Args:
1547        token (str):
1548            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1549        organization_context (str):
1550            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1551        product_id (str, optional):
1552            Product ID to get. If not provided, will get all products in the organization.
1553        business_unit_id (str, optional):
1554            Business Unit ID to get products for. If not provided, will get all products in the organization.
1555    Raises:
1556        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1557    Returns:
1558        list: List of Product Objects
1559    """
1560
1561    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1562                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1563                                                                       business_unit_id=business_unit_id),
1564                                     'allProducts')
1565
1566
1567def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1568                                 report_subtype=None, verbose=False) -> str:
1569    """
1570    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1571    This may take several minutes to complete, depending on the size of the report.
1572
1573    Args:
1574        token (str):
1575            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1576        organization_context (str):
1577            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1578        asset_version_id (str, optional):
1579            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1580        product_id (str, optional):
1581            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1582        report_type (str, required):
1583            The file type of the report to download. Valid values are "CSV" and "PDF".
1584        report_subtype (str, required):
1585            The type of report to download. Based on available reports for the `report_type` specified
1586            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1587            Valid values for PDF are "RISK_SUMMARY".
1588        verbose (bool, optional):
1589            If True, print additional information to the console. Defaults to False.
1590    """
1591    if not report_type:
1592        raise ValueError("Report Type is required")
1593    if not report_subtype:
1594        raise ValueError("Report Subtype is required")
1595    if not asset_version_id and not product_id:
1596        raise ValueError("Asset Version ID or Product ID is required")
1597
1598    if asset_version_id and product_id:
1599        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1600
1601    if report_type not in ["CSV", "PDF"]:
1602        raise Exception(f"Report Type {report_type} not supported")
1603
1604    if report_type == "CSV":
1605        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1606            raise Exception(f"Report Subtype {report_subtype} not supported")
1607
1608        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1609                                                            report_type=report_type, report_subtype=report_subtype)
1610        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1611                                                              report_type=report_type, report_subtype=report_subtype)
1612
1613        response_data = send_graphql_query(token, organization_context, mutation, variables)
1614        if verbose:
1615            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1616
1617        # get exportJobId from the result
1618        if asset_version_id:
1619            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1620        elif product_id:
1621            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1622        else:
1623            raise Exception(
1624                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1625
1626        if verbose:
1627            print(f'Export Job ID: {export_job_id}')
1628
1629    if report_type == "PDF":
1630        if report_subtype not in ["RISK_SUMMARY"]:
1631            raise Exception(f"Report Subtype {report_subtype} not supported")
1632
1633        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1634                                                            report_type=report_type, report_subtype=report_subtype)
1635        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1636                                                              report_type=report_type, report_subtype=report_subtype)
1637
1638        response_data = send_graphql_query(token, organization_context, mutation, variables)
1639        if verbose:
1640            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1641
1642        # get exportJobId from the result
1643        if asset_version_id:
1644            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1645        elif product_id:
1646            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1647        else:
1648            raise Exception(
1649                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1650
1651        if verbose:
1652            print(f'Export Job ID: {export_job_id}')
1653
1654    if not export_job_id:
1655        raise Exception(
1656            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1657
1658    # poll the API until the export job is complete
1659    sleep_time = 10
1660    total_time = 0
1661    if verbose:
1662        print(f'Polling every {sleep_time} seconds for export job to complete')
1663
1664    while True:
1665        time.sleep(sleep_time)
1666        total_time += sleep_time
1667        if verbose:
1668            print(f'Total time elapsed: {total_time} seconds')
1669
1670        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1671        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1672
1673        response_data = send_graphql_query(token, organization_context, query, variables)
1674
1675        if verbose:
1676            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1677
1678        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1679            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1680                if verbose:
1681                    print(
1682                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1683                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1684
1685
1686def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1687                               verbose=False) -> str:
1688    """
1689    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1690    This may take several minutes to complete, depending on the size of SBOM.
1691
1692    Args:
1693        token (str):
1694            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1695        organization_context (str):
1696            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1697        sbom_type (str, required):
1698            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1699        sbom_subtype (str, required):
1700            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1701        asset_version_id (str, required):
1702            Asset Version ID to download the SBOM for.
1703        verbose (bool, optional):
1704            If True, print additional information to the console. Defaults to False.
1705
1706    Raises:
1707        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1708        Exception: Raised if the query fails.
1709
1710    Returns:
1711        str: URL to download the SBOM from.
1712    """
1713
1714    if not sbom_type:
1715        raise ValueError("SBOM Type is required")
1716    if not sbom_subtype:
1717        raise ValueError("SBOM Subtype is required")
1718    if not asset_version_id:
1719        raise ValueError("Asset Version ID is required")
1720
1721    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1722        raise Exception(f"SBOM Type {sbom_type} not supported")
1723
1724    if sbom_type == "CYCLONEDX":
1725        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1726            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1727
1728        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1729        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1730
1731        response_data = send_graphql_query(token, organization_context, mutation, variables)
1732        if verbose:
1733            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1734
1735        # get exportJobId from the result
1736        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1737        if verbose:
1738            print(f'Export Job ID: {export_job_id}')
1739
1740    if sbom_type == "SPDX":
1741        if sbom_subtype not in ["SBOM_ONLY"]:
1742            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1743
1744        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1745        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1746
1747        response_data = send_graphql_query(token, organization_context, mutation, variables)
1748        if verbose:
1749            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1750
1751        # get exportJobId from the result
1752        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1753        if verbose:
1754            print(f'Export Job ID: {export_job_id}')
1755
1756    if not export_job_id:
1757        raise Exception(
1758            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1759
1760    # poll the API until the export job is complete
1761    sleep_time = 10
1762    total_time = 0
1763    if verbose:
1764        print(f'Polling every {sleep_time} seconds for export job to complete')
1765    while True:
1766        time.sleep(sleep_time)
1767        total_time += sleep_time
1768        if verbose:
1769            print(f'Total time elapsed: {total_time} seconds')
1770
1771        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1772        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1773
1774        response_data = send_graphql_query(token, organization_context, query, variables)
1775
1776        if verbose:
1777            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1778
1779        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1780            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1781                if verbose:
1782                    print(
1783                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1784                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']
1785
1786
1787def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1788    """
1789    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1790    Args:
1791        token (str):
1792            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1793        organization_context (str):
1794            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1795        asset_version_id (str, optional):
1796            Asset Version ID to get software components for.
1797        type (str, optional):
1798            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1799    Raises:
1800        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1801    Returns:
1802        list: List of Software Component Objects
1803    """
1804    if not asset_version_id:
1805        raise Exception("Asset Version ID is required")
1806
1807    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1808                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1809                                                                                  type=type),
1810                                     'allSoftwareComponentInstances')
1811
1812
1813def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1814                case_sensitive=False) -> list:
1815    """
1816    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1817    Search Methods: EXACT or CONTAINS
1818    An exact match will return only the software component whose name matches the name exactly.
1819    A contains match will return all software components whose name contains the search string.
1820
1821    Args:
1822        token (str):
1823            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1824        organization_context (str):
1825            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1826        name (str, required):
1827            Name of the software component to search for.
1828        version (str, optional):
1829            Version of the software component to search for. If not specified, will search for all versions of the software component.
1830        asset_version_id (str, optional):
1831            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1832        search_method (str, optional):
1833            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1834        case_sensitive (bool, optional):
1835            Whether or not to perform a case sensitive search. Defaults to False.
1836    Raises:
1837        ValueError: Raised if name is not provided.
1838        Exception: Raised if the query fails.
1839    Returns:
1840        list: List of SoftwareComponentInstance Objects
1841    """
1842    if asset_version_id:
1843        query = '''
1844query GetSoftwareComponentInstances(
1845    $filter: SoftwareComponentInstanceFilter
1846    $after: String
1847    $first: Int
1848) {
1849    allSoftwareComponentInstances(
1850        filter: $filter
1851        after: $after
1852        first: $first
1853    ) {
1854        _cursor
1855        id
1856        name
1857        version
1858        originalComponents {
1859            id
1860            name
1861            version
1862        }
1863    }
1864}
1865'''
1866    else:
1867        # gets the asset version info that contains the software component
1868        query = '''
1869query GetSoftwareComponentInstances(
1870    $filter: SoftwareComponentInstanceFilter
1871    $after: String
1872    $first: Int
1873) {
1874    allSoftwareComponentInstances(
1875        filter: $filter
1876        after: $after
1877        first: $first
1878    ) {
1879        _cursor
1880        id
1881        name
1882        version
1883        assetVersion {
1884            id
1885            name
1886            asset {
1887                id
1888                name
1889            }
1890        }
1891    }
1892}
1893'''
1894
1895    variables = {
1896        "filter": {
1897            "mergedComponentRefId": None
1898        },
1899        "after": None,
1900        "first": 100
1901    }
1902
1903    if asset_version_id:
1904        variables["filter"]["assetVersionRefId"] = asset_version_id
1905
1906    if search_method == 'EXACT':
1907        if case_sensitive:
1908            variables["filter"]["name"] = name
1909        else:
1910            variables["filter"]["name_like"] = name
1911    elif search_method == 'CONTAINS':
1912        variables["filter"]["name_contains"] = name
1913
1914    if version:
1915        if search_method == 'EXACT':
1916            variables["filter"]["version"] = version
1917        elif search_method == 'CONTAINS':
1918            variables["filter"]["version_contains"] = version
1919
1920    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1921                                        field="allSoftwareComponentInstances")
1922
1923    return records
1924
1925
1926def send_graphql_query(token, organization_context, query, variables=None):
1927    """
1928    Send a GraphQL query to the API
1929
1930    Args:
1931        token (str):
1932            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1933        organization_context (str):
1934            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1935        query (str):
1936            The GraphQL query string
1937        variables (dict, optional):
1938            Variables to be used in the GraphQL query, by default None
1939
1940    Raises:
1941        Exception: If the response status code is not 200
1942
1943    Returns:
1944        dict: Response JSON
1945    """
1946    headers = {
1947        'Content-Type': 'application/json',
1948        'Authorization': f'Bearer {token}',
1949        'Organization-Context': organization_context
1950    }
1951    data = {
1952        'query': query,
1953        'variables': variables
1954    }
1955
1956    response = requests.post(API_URL, headers=headers, json=data)
1957
1958    if response.status_code == 200:
1959        thejson = response.json()
1960
1961        if "errors" in thejson:
1962            raise Exception(f"Error: {thejson['errors']}")
1963
1964        return thejson
1965    else:
1966        raise Exception(f"Error: {response.status_code} - {response.text}")
1967
1968
1969def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
1970                            justification=None, response=None, comment=None):
1971    """
1972    Updates the status of a findings or multiple findings. This is a blocking call.
1973
1974    Args:
1975        token (str):
1976            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1977        organization_context (str):
1978            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1979        user_id (str, required):
1980            User ID to update the finding status for.
1981        finding_ids (str, required):
1982            Finding ID to update the status for.
1983        status (str, required):
1984            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
1985        justification (str, optional):
1986            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
1987        response (str, optional):
1988            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
1989        comment (str, optional):
1990            Optional comment to add to the finding status update.
1991
1992    Raises:
1993        ValueError: Raised if required parameters are not provided.
1994        Exception: Raised if the query fails.
1995
1996    Returns:
1997        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
1998    """
1999    if not user_id:
2000        raise ValueError("User ID is required")
2001    if not finding_ids:
2002        raise ValueError("Finding ID is required")
2003    if not status:
2004        raise ValueError("Status is required")
2005
2006    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2007    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2008                                                             justification=justification, response=response,
2009                                                             comment=comment)
2010
2011    return send_graphql_query(token, organization_context, mutation, variables)
2012
2013
2014def upload_file_for_binary_analysis(token, organization_context, test_id=None, file_path=None,
2015                                    chunk_size=1024 * 1024 * 1024 * 5, quick_scan=False):
2016    """
2017    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. Chunk size defaults to 5GB.
2018    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2019
2020    Args:
2021        token (str):
2022            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2023        organization_context (str):
2024            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2025        test_id (str, required):
2026            Test ID to upload the file for.
2027        file_path (str, required):
2028            Local path to the file to upload.
2029        chunk_size (int, optional):
2030            The size of the chunks to read. Defaults to 5GB.
2031        quick_scan (bool, optional):
2032            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2033
2034    Raises:
2035        ValueError: Raised if test_id or file_path are not provided.
2036        Exception: Raised if the query fails.
2037
2038    Returns:
2039        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2040    """
2041    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2042
2043    if not test_id:
2044        raise ValueError("Test ID is required")
2045    if not file_path:
2046        raise ValueError("File path is required")
2047
2048    # Start Multi-part Upload
2049    graphql_query = '''
2050    mutation Start($testId: ID!) {
2051        startMultipartUploadV2(testId: $testId) {
2052            uploadId
2053            key
2054        }
2055    }
2056    '''
2057
2058    variables = {
2059        "testId": test_id
2060    }
2061
2062    response = send_graphql_query(token, organization_context, graphql_query, variables)
2063
2064    upload_id = response['data']['startMultipartUploadV2']['uploadId']
2065    upload_key = response['data']['startMultipartUploadV2']['key']
2066
2067    # if the file is greater than max chunk size (or 5 GB), split the file in chunks,
2068    # call generateUploadPartUrlV2 for each chunk of the file (even if it is a single part)
2069    # and upload the file to the returned upload URL
2070    i = 1
2071    part_data = []
2072    for chunk in file_chunks(file_path, chunk_size):
2073        graphql_query = '''
2074        mutation GenerateUploadPartUrl($partNumber: Int!, $uploadId: ID!, $uploadKey: String!) {
2075            generateUploadPartUrlV2(partNumber: $partNumber, uploadId: $uploadId, uploadKey: $uploadKey) {
2076                key
2077                uploadUrl
2078            }
2079        }
2080        '''
2081
2082        variables = {
2083            "partNumber": i,
2084            "uploadId": upload_id,
2085            "uploadKey": upload_key
2086        }
2087
2088        response = send_graphql_query(token, organization_context, graphql_query, variables)
2089
2090        chunk_upload_url = response['data']['generateUploadPartUrlV2']['uploadUrl']
2091
2092        # upload the chunk to the upload URL
2093        response = upload_bytes_to_url(chunk_upload_url, chunk)
2094
2095        part_data.append({
2096            "ETag": response.headers['ETag'],
2097            "PartNumber": i
2098        })
2099
2100    # call completeMultipartUploadV2
2101    graphql_query = '''
2102    mutation CompleteMultipartUpload($partData: [PartInput!]!, $uploadId: ID!, $uploadKey: String!) {
2103        completeMultipartUploadV2(partData: $partData, uploadId: $uploadId, uploadKey: $uploadKey) {
2104            key
2105        }
2106    }
2107    '''
2108
2109    variables = {
2110        "partData": part_data,
2111        "uploadId": upload_id,
2112        "uploadKey": upload_key
2113    }
2114
2115    response = send_graphql_query(token, organization_context, graphql_query, variables)
2116
2117    # get key from the result
2118    key = response['data']['completeMultipartUploadV2']['key']
2119
2120    variables = {
2121        "key": key,
2122        "testId": test_id
2123    }
2124
2125    # call launchBinaryUploadProcessing
2126    if quick_scan:
2127        graphql_query = '''
2128        mutation LaunchBinaryUploadProcessing($key: String!, $testId: ID!, $configurationOptions: [BinaryAnalysisConfigurationOption]) {
2129            launchBinaryUploadProcessing(key: $key, testId: $testId, configurationOptions: $configurationOptions) {
2130                key
2131            }
2132        }
2133        '''
2134        variables["configurationOptions"] = ["QUICK_SCAN"]
2135    else:
2136        graphql_query = '''
2137        mutation LaunchBinaryUploadProcessing($key: String!, $testId: ID!) {
2138            launchBinaryUploadProcessing(key: $key, testId: $testId) {
2139                key
2140            }
2141        }
2142        '''
2143
2144    response = send_graphql_query(token, organization_context, graphql_query, variables)
2145
2146    return response['data']
2147
2148
2149def upload_test_results_file(token, organization_context, test_id=None, file_path=None):
2150    """
2151    Uploads a test results file to the test specified by test_id. NOTE: This is not for Binary Analysis. Use upload_file_for_binary_analysis for that.
2152
2153    Args:
2154        token (str):
2155            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2156        organization_context (str):
2157            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2158        test_id (str, required):
2159            Test ID to upload the file for.
2160        file_path (str, required):
2161            Local path to the file to upload.
2162
2163    Raises:
2164        ValueError: Raised if test_id or file_path are not provided.
2165        Exception: Raised if the query fails.
2166
2167    Returns:
2168        dict: The response from the GraphQL query, a completeTestResultUpload Object.
2169    """
2170    if not test_id:
2171        raise ValueError("Test ID is required")
2172    if not file_path:
2173        raise ValueError("File path is required")
2174
2175    # Gerneate Test Result Upload URL
2176    graphql_query = '''
2177    mutation GenerateTestResultUploadUrl($testId: ID!) {
2178        generateSinglePartUploadUrl(testId: $testId) {
2179            uploadUrl
2180            key
2181        }
2182    }
2183    '''
2184
2185    variables = {
2186        "testId": test_id
2187    }
2188
2189    response = send_graphql_query(token, organization_context, graphql_query, variables)
2190
2191    # get the upload URL and key
2192    upload_url = response['data']['generateSinglePartUploadUrl']['uploadUrl']
2193    key = response['data']['generateSinglePartUploadUrl']['key']
2194
2195    # upload the file
2196    upload_file_to_url(upload_url, file_path)
2197
2198    # complete the upload
2199    graphql_query = '''
2200    mutation CompleteTestResultUpload($key: String!, $testId: ID!) {
2201        launchTestResultProcessing(key: $key, testId: $testId) {
2202            key
2203        }
2204    }
2205    '''
2206
2207    variables = {
2208        "testId": test_id,
2209        "key": key
2210    }
2211
2212    response = send_graphql_query(token, organization_context, graphql_query, variables)
2213    return response['data']
2214
2215
2216def upload_bytes_to_url(url, bytes):
2217    """
2218    Used for uploading a file to a pre-signed S3 URL
2219
2220    Args:
2221        url (str):
2222            (Pre-signed S3) URL
2223        bytes (bytes):
2224            Bytes to upload
2225
2226    Raises:
2227        Exception: If the response status code is not 200
2228
2229    Returns:
2230        requests.Response: Response object
2231    """
2232    response = requests.put(url, data=bytes)
2233
2234    if response.status_code == 200:
2235        return response
2236    else:
2237        raise Exception(f"Error: {response.status_code} - {response.text}")
2238
2239
2240def upload_file_to_url(url, file_path):
2241    """
2242    Used for uploading a file to a pre-signed S3 URL
2243
2244    Args:
2245        url (str):
2246            (Pre-signed S3) URL
2247        file_path (str):
2248            Local path to file to upload
2249
2250    Raises:
2251        Exception: If the response status code is not 200
2252
2253    Returns:
2254        requests.Response: Response object
2255    """
2256    with open(file_path, 'rb') as file:
2257        response = requests.put(url, data=file)
2258
2259    if response.status_code == 200:
2260        return response
2261    else:
2262        raise Exception(f"Error: {response.status_code} - {response.text}")
API_URL = 'https://platform.finitestate.io/api/v1/graphql'
AUDIENCE = 'https://platform.finitestate.io/api/v1/graphql'
TOKEN_URL = 'https://platform.finitestate.io/api/v1/auth/token'
class UploadMethod(enum.Enum):
15class UploadMethod(Enum):
16    """
17    Enumeration class representing different upload methods.
18
19    Attributes:
20        WEB_APP_UI: Upload method via web application UI.
21        API: Upload method via API.
22        GITHUB_INTEGRATION: Upload method via GitHub integration.
23        AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.
24
25    To use any value from this enumeration, use UploadMethod.<attribute> i.e. finite_state_sdk.UploadMethod.WEB_APP_UI
26    """
27    WEB_APP_UI = "WEB_APP_UI"
28    API = "API"
29    GITHUB_INTEGRATION = "GITHUB_INTEGRATION"
30    AZURE_DEVOPS_INTEGRATION = "AZURE_DEVOPS_INTEGRATION"

Enumeration class representing different upload methods.

Attributes:
  • WEB_APP_UI: Upload method via web application UI.
  • API: Upload method via API.
  • GITHUB_INTEGRATION: Upload method via GitHub integration.
  • AZURE_DEVOPS_INTEGRATION: Upload method via Azure DevOps integration.

To use any value from this enumeration, use UploadMethod. i.e. UploadMethod.WEB_APP_UI

WEB_APP_UI = <UploadMethod.WEB_APP_UI: 'WEB_APP_UI'>
API = <UploadMethod.API: 'API'>
GITHUB_INTEGRATION = <UploadMethod.GITHUB_INTEGRATION: 'GITHUB_INTEGRATION'>
AZURE_DEVOPS_INTEGRATION = <UploadMethod.AZURE_DEVOPS_INTEGRATION: 'AZURE_DEVOPS_INTEGRATION'>
Inherited Members
enum.Enum
name
value
def create_artifact( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_version_id=None, artifact_name=None, product_id=None):
 33def create_artifact(
 34    token,
 35    organization_context,
 36    business_unit_id=None,
 37    created_by_user_id=None,
 38    asset_version_id=None,
 39    artifact_name=None,
 40    product_id=None,
 41):
 42    """
 43    Create a new Artifact.
 44    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
 45    Please see the examples in the Github repository for more information:
 46    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
 47    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
 48
 49    Args:
 50        token (str):
 51            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 52        organization_context (str):
 53            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 54        business_unit_id (str, required):
 55            Business Unit ID to associate the artifact with.
 56        created_by_user_id (str, required):
 57            User ID of the user creating the artifact.
 58        asset_version_id (str, required):
 59            Asset Version ID to associate the artifact with.
 60        artifact_name (str, required):
 61            The name of the Artifact being created.
 62        product_id (str, optional):
 63            Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
 64
 65    Raises:
 66        ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
 67        Exception: Raised if the query fails.
 68
 69    Returns:
 70        dict: createArtifact Object
 71    """
 72    if not business_unit_id:
 73        raise ValueError("Business unit ID is required")
 74    if not created_by_user_id:
 75        raise ValueError("Created by user ID is required")
 76    if not asset_version_id:
 77        raise ValueError("Asset version ID is required")
 78    if not artifact_name:
 79        raise ValueError("Artifact name is required")
 80
 81    graphql_query = '''
 82    mutation CreateArtifactMutation($input: CreateArtifactInput!) {
 83        createArtifact(input: $input) {
 84            id
 85            name
 86            assetVersion {
 87                id
 88                name
 89                asset {
 90                    id
 91                    name
 92                }
 93            }
 94            createdBy {
 95                id
 96                email
 97            }
 98            ctx {
 99                asset
100                products
101                businessUnits
102            }
103        }
104    }
105    '''
106
107    # Asset name, business unit context, and creating user are required
108    variables = {
109        "input": {
110            "name": artifact_name,
111            "createdBy": created_by_user_id,
112            "assetVersion": asset_version_id,
113            "ctx": {
114                "asset": asset_version_id,
115                "businessUnits": [business_unit_id]
116            }
117        }
118    }
119
120    if product_id is not None:
121        variables["input"]["ctx"]["products"] = product_id
122
123    response = send_graphql_query(token, organization_context, graphql_query, variables)
124    return response['data']

Create a new Artifact. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the artifact with.
  • created_by_user_id (str, required): User ID of the user creating the artifact.
  • asset_version_id (str, required): Asset Version ID to associate the artifact with.
  • artifact_name (str, required): The name of the Artifact being created.
  • product_id (str, optional): Product ID to associate the artifact with. If not specified, the artifact will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_version_id, or artifact_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createArtifact Object

def create_asset( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
127def create_asset(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_name=None, product_id=None):
128    """
129    Create a new Asset.
130
131    Args:
132        token (str):
133            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
134        organization_context (str):
135            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
136        business_unit_id (str, required):
137            Business Unit ID to associate the asset with.
138        created_by_user_id (str, required):
139            User ID of the user creating the asset.
140        asset_name (str, required):
141            The name of the Asset being created.
142        product_id (str, optional):
143            Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
144
145    Raises:
146        ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
147        Exception: Raised if the query fails.
148
149    Returns:
150        dict: createAsset Object
151    """
152    if not business_unit_id:
153        raise ValueError("Business unit ID is required")
154    if not created_by_user_id:
155        raise ValueError("Created by user ID is required")
156    if not asset_name:
157        raise ValueError("Asset name is required")
158
159    graphql_query = '''
160    mutation CreateAssetMutation($input: CreateAssetInput!) {
161        createAsset(input: $input) {
162            id
163            name
164            dependentProducts {
165                id
166                name
167            }
168            group {
169                id
170                name
171            }
172            createdBy {
173                id
174                email
175            }
176            ctx {
177                asset
178                products
179                businessUnits
180            }
181        }
182    }
183    '''
184
185    # Asset name, business unit context, and creating user are required
186    variables = {
187        "input": {
188            "name": asset_name,
189            "group": business_unit_id,
190            "createdBy": created_by_user_id,
191            "ctx": {
192                "businessUnits": [business_unit_id]
193            }
194        }
195    }
196
197    if product_id is not None:
198        variables["input"]["ctx"]["products"] = product_id
199
200    response = send_graphql_query(token, organization_context, graphql_query, variables)
201    return response['data']

Create a new Asset.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset with.
  • created_by_user_id (str, required): User ID of the user creating the asset.
  • asset_name (str, required): The name of the Asset being created.
  • product_id (str, optional): Product ID to associate the asset with. If not specified, the asset will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or asset_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAsset Object

def create_asset_version( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, asset_version_name=None, product_id=None):
204def create_asset_version(
205    token,
206    organization_context,
207    business_unit_id=None,
208    created_by_user_id=None,
209    asset_id=None,
210    asset_version_name=None,
211    product_id=None,
212):
213    """
214    Create a new Asset Version.
215
216    Args:
217        token (str):
218            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
219        organization_context (str):
220            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
221        business_unit_id (str, required):
222            Business Unit ID to associate the asset version with.
223        created_by_user_id (str, required):
224            User ID of the user creating the asset version.
225        asset_id (str, required):
226            Asset ID to associate the asset version with.
227        asset_version_name (str, required):
228            The name of the Asset Version being created.
229        product_id (str, optional):
230            Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
231
232    Raises:
233        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
234        Exception: Raised if the query fails.
235
236    Returns:
237        dict: createAssetVersion Object
238
239    deprecated:: 0.1.7. Use create_asset_version_on_asset instead.
240    """
241    warn('`create_asset_version` is deprecated. Use: `create_asset_version_on_asset instead`', DeprecationWarning, stacklevel=2)
242    if not business_unit_id:
243        raise ValueError("Business unit ID is required")
244    if not created_by_user_id:
245        raise ValueError("Created by user ID is required")
246    if not asset_id:
247        raise ValueError("Asset ID is required")
248    if not asset_version_name:
249        raise ValueError("Asset version name is required")
250
251    graphql_query = '''
252    mutation CreateAssetVersionMutation($input: CreateAssetVersionInput!) {
253        createAssetVersion(input: $input) {
254            id
255            name
256            asset {
257                id
258                name
259            }
260            createdBy {
261                id
262                email
263            }
264            ctx {
265                asset
266                products
267                businessUnits
268            }
269        }
270    }
271    '''
272
273    # Asset name, business unit context, and creating user are required
274    variables = {
275        "input": {
276            "name": asset_version_name,
277            "createdBy": created_by_user_id,
278            "asset": asset_id,
279            "ctx": {
280                "asset": asset_id,
281                "businessUnits": [business_unit_id]
282            }
283        }
284    }
285
286    if product_id is not None:
287        variables["input"]["ctx"]["products"] = product_id
288
289    response = send_graphql_query(token, organization_context, graphql_query, variables)
290    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the asset version with.
  • created_by_user_id (str, required): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
  • product_id (str, optional): Product ID to associate the asset version with. If not specified, the asset version will not be associated with a product.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

deprecated:: 0.1.7. Use create_asset_version_on_asset instead.

def create_asset_version_on_asset( token, organization_context, created_by_user_id=None, asset_id=None, asset_version_name=None):
293def create_asset_version_on_asset(
294    token,
295    organization_context,
296    created_by_user_id=None,
297    asset_id=None,
298    asset_version_name=None,
299):
300    """
301    Create a new Asset Version.
302
303    Args:
304        token (str):
305            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
306        organization_context (str):
307            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
308        created_by_user_id (str, optional):
309            User ID of the user creating the asset version.
310        asset_id (str, required):
311            Asset ID to associate the asset version with.
312        asset_version_name (str, required):
313            The name of the Asset Version being created.
314
315    Raises:
316        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
317        Exception: Raised if the query fails.
318
319    Returns:
320        dict: createAssetVersion Object
321    """
322    if not asset_id:
323        raise ValueError("Asset ID is required")
324    if not asset_version_name:
325        raise ValueError("Asset version name is required")
326
327    graphql_query = '''
328        mutation BapiCreateAssetVersion($assetVersionName: String!, $assetId: ID!, $createdByUserId: ID!) {
329            createNewAssetVersionOnAsset(assetVersionName: $assetVersionName, assetId: $assetId, createdByUserId: $createdByUserId) {
330                id
331                assetVersion {
332                    id
333                }
334            }
335        }
336    '''
337
338    # Asset name, business unit context, and creating user are required
339    variables = {"assetVersionName": asset_version_name, "assetId": asset_id}
340
341    if created_by_user_id:
342        variables["createdByUserId"] = created_by_user_id
343
344    response = send_graphql_query(token, organization_context, graphql_query, variables)
345    return response['data']

Create a new Asset Version.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • created_by_user_id (str, optional): User ID of the user creating the asset version.
  • asset_id (str, required): Asset ID to associate the asset version with.
  • asset_version_name (str, required): The name of the Asset Version being created.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, or asset_version_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createAssetVersion Object

def create_new_asset_version_artifact_and_test_for_upload( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, product_id=None, test_type=None, artifact_description=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
348def create_new_asset_version_artifact_and_test_for_upload(
349    token,
350    organization_context,
351    business_unit_id=None,
352    created_by_user_id=None,
353    asset_id=None,
354    version=None,
355    product_id=None,
356    test_type=None,
357    artifact_description=None,
358    upload_method: UploadMethod = UploadMethod.API,
359):
360    """
361    Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test.
362    This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.
363
364    Args:
365        token (str):
366            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
367        organization_context (str):
368            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
369        business_unit_id (str, optional):
370            Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
371        created_by_user_id (str, optional):
372            User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
373        asset_id (str, required):
374            Asset ID to create the asset version for. If not provided, the default asset will be used.
375        version (str, required):
376            Version to create the asset version for.
377        product_id (str, optional):
378            Product ID to create the entities for. If not provided, the default product will be used.
379        test_type (str, required):
380            Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
381        artifact_description (str, optional):
382            Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
383        upload_method (UploadMethod, optional):
384            The method of uploading the test results. Default is UploadMethod.API.
385
386
387    Raises:
388        ValueError: Raised if asset_id or version are not provided.
389        Exception: Raised if the query fails.
390
391    Returns:
392        str: The Test ID of the newly created test that is used for uploading the file.
393    """
394    if not asset_id:
395        raise ValueError("Asset ID is required")
396    if not version:
397        raise ValueError("Version is required")
398
399    assets = get_all_assets(token, organization_context, asset_id=asset_id)
400    asset = assets[0]
401
402    # get the asset name
403    asset_name = asset['name']
404
405    # get the existing asset product IDs
406    asset_product_ids = asset['ctx']['products']
407
408    # get the asset product ID
409    if product_id and product_id not in asset_product_ids:
410        asset_product_ids.append(product_id)
411
412    # if business_unit_id or created_by_user_id are not provided, get the existing asset
413    if not business_unit_id or not created_by_user_id:
414        if not business_unit_id:
415            business_unit_id = asset['group']['id']
416        if not created_by_user_id:
417            created_by_user_id = asset['createdBy']['id']
418
419        if not business_unit_id:
420            raise ValueError("Business Unit ID is required and could not be retrieved from the existing asset")
421        if not created_by_user_id:
422            raise ValueError("Created By User ID is required and could not be retrieved from the existing asset")
423
424    # create the asset version
425    response = create_asset_version_on_asset(
426        token, organization_context, created_by_user_id=created_by_user_id, asset_id=asset_id, asset_version_name=version
427    )
428    # get the asset version ID
429    asset_version_id = response['createNewAssetVersionOnAsset']['assetVersion']['id']
430
431    # create the test
432    if test_type == "finite_state_binary_analysis":
433        # create the artifact
434        if not artifact_description:
435            artifact_description = "Binary"
436        binary_artifact_name = f"{asset_name} {version} - {artifact_description}"
437        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
438                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
439                                   artifact_name=binary_artifact_name, product_id=asset_product_ids)
440
441        # get the artifact ID
442        binary_artifact_id = response['createArtifact']['id']
443
444        # create the test
445        test_name = f"{asset_name} {version} - Finite State Binary Analysis"
446        response = create_test_as_binary_analysis(token, organization_context, business_unit_id=business_unit_id,
447                                                  created_by_user_id=created_by_user_id, asset_id=asset_id,
448                                                  artifact_id=binary_artifact_id, product_id=asset_product_ids,
449                                                  test_name=test_name, upload_method=upload_method)
450        test_id = response['createTest']['id']
451        return test_id
452
453    else:
454        # create the artifact
455        if not artifact_description:
456            artifact_description = "Unspecified Artifact"
457        artifact_name = f"{asset_name} {version} - {artifact_description}"
458        response = create_artifact(token, organization_context, business_unit_id=business_unit_id,
459                                   created_by_user_id=created_by_user_id, asset_version_id=asset_version_id,
460                                   artifact_name=artifact_name, product_id=asset_product_ids)
461
462        # get the artifact ID
463        binary_artifact_id = response['createArtifact']['id']
464
465        # create the test
466        test_name = f"{asset_name} {version} - {test_type}"
467        response = create_test_as_third_party_scanner(token, organization_context, business_unit_id=business_unit_id,
468                                                      created_by_user_id=created_by_user_id, asset_id=asset_id,
469                                                      artifact_id=binary_artifact_id, product_id=asset_product_ids,
470                                                      test_name=test_name, test_type=test_type,
471                                                      upload_method=upload_method)
472        test_id = response['createTest']['id']
473        return test_id

Creates the entities needed for uploading a file for Binary Analysis or test results from a third party scanner to an existing Asset. This will create a new Asset Version, Artifact, and Test. This method is used by the upload_file_for_binary_analysis and upload_test_results_file methods, which are generally easier to use for basic use cases.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the default Business Unit will be used.
  • created_by_user_id (str, optional): User ID that will be the creator of the asset version. If not specified, the creator of the related Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for. If not provided, the default asset will be used.
  • version (str, required): Version to create the asset version for.
  • product_id (str, optional): Product ID to create the entities for. If not provided, the default product will be used.
  • test_type (str, required): Test type to create the test for. Must be one of "finite_state_binary_analysis" or of the list of supported third party test types. For the full list, see the API documenation.
  • artifact_description (str, optional): Description to use for the artifact. Examples inlcude "Firmware", "Source Code Repository". This will be appended to the default Artifact description. If none is provided, the default Artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id or version are not provided.
  • Exception: Raised if the query fails.
Returns:

str: The Test ID of the newly created test that is used for uploading the file.

def create_new_asset_version_and_upload_binary( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, artifact_description=None, quick_scan=False, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
476def create_new_asset_version_and_upload_binary(
477    token,
478    organization_context,
479    business_unit_id=None,
480    created_by_user_id=None,
481    asset_id=None,
482    version=None,
483    file_path=None,
484    product_id=None,
485    artifact_description=None,
486    quick_scan=False,
487    upload_method: UploadMethod = UploadMethod.API,
488):
489    """
490    Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis.
491    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
492
493    Args:
494        token (str):
495            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
496        organization_context (str):
497            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
498        business_unit_id (str, optional):
499            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
500        created_by_user_id (str, optional):
501            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
502        asset_id (str, required):
503            Asset ID to create the asset version for.
504        version (str, required):
505            Version to create the asset version for.
506        file_path (str, required):
507            Local path to the file to upload.
508        product_id (str, optional):
509            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
510        artifact_description (str, optional):
511            Description of the artifact. If not provided, the default is "Firmware Binary".
512        quick_scan (bool, optional):
513            If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
514        upload_method (UploadMethod, optional):
515            The method of uploading the test results. Default is UploadMethod.API.
516
517    Raises:
518        ValueError: Raised if asset_id, version, or file_path are not provided.
519        Exception: Raised if any of the queries fail.
520
521    Returns:
522        dict: The response from the GraphQL query, a createAssetVersion Object.
523    """
524    if not asset_id:
525        raise ValueError("Asset ID is required")
526    if not version:
527        raise ValueError("Version is required")
528    if not file_path:
529        raise ValueError("File path is required")
530
531    # create the asset version and binary test
532    if not artifact_description:
533        artifact_description = "Firmware Binary"
534    binary_test_id = create_new_asset_version_artifact_and_test_for_upload(
535        token,
536        organization_context,
537        business_unit_id=business_unit_id,
538        created_by_user_id=created_by_user_id,
539        asset_id=asset_id,
540        version=version,
541        product_id=product_id,
542        test_type="finite_state_binary_analysis",
543        artifact_description=artifact_description,
544        upload_method=upload_method,
545    )
546
547    # upload file for binary test
548    response = upload_file_for_binary_analysis(token, organization_context, test_id=binary_test_id, file_path=file_path,
549                                               quick_scan=quick_scan)
550    return response

Creates a new Asset Version for an existing asset, and uploads a binary file for Finite State Binary Analysis. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Local path to the file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used, if it exists.
  • artifact_description (str, optional): Description of the artifact. If not provided, the default is "Firmware Binary".
  • quick_scan (bool, optional): If True, will upload the file for quick scan. Defaults to False (Full Scan). For details about Quick Scan vs Full Scan, please see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if asset_id, version, or file_path are not provided.
  • Exception: Raised if any of the queries fail.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_new_asset_version_and_upload_test_results( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, version=None, file_path=None, product_id=None, test_type=None, artifact_description='', upload_method: UploadMethod = <UploadMethod.API: 'API'>):
553def create_new_asset_version_and_upload_test_results(token, organization_context, business_unit_id=None,
554                                                     created_by_user_id=None, asset_id=None, version=None,
555                                                     file_path=None, product_id=None, test_type=None,
556                                                     artifact_description="", upload_method: UploadMethod = UploadMethod.API):
557    """
558    Creates a new Asset Version for an existing asset, and uploads test results for that asset version.
559    By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.
560
561    Args:
562        token (str):
563            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
564        organization_context (str):
565            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
566        business_unit_id (str, optional):
567            Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
568        created_by_user_id (str, optional):
569            Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
570        asset_id (str, required):
571            Asset ID to create the asset version for.
572        version (str, required):
573            Version to create the asset version for.
574        file_path (str, required):
575            Path to the test results file to upload.
576        product_id (str, optional):
577            Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
578        test_type (str, required):
579            Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
580        artifact_description (str, optional):
581            Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
582        upload_method (UploadMethod, optional):
583            The method of uploading the test results. Default is UploadMethod.API.
584
585    Raises:
586        ValueError: If the asset_id, version, or file_path are not provided.
587        Exception: If the test_type is not a supported third party scanner type, or if the query fails.
588
589    Returns:
590        dict: The response from the GraphQL query, a createAssetVersion Object.
591    """
592    if not asset_id:
593        raise ValueError("Asset ID is required")
594    if not version:
595        raise ValueError("Version is required")
596    if not file_path:
597        raise ValueError("File path is required")
598    if not test_type:
599        raise ValueError("Test type is required")
600
601    # create the asset version and test
602    test_id = create_new_asset_version_artifact_and_test_for_upload(token, organization_context,
603                                                                    business_unit_id=business_unit_id,
604                                                                    created_by_user_id=created_by_user_id,
605                                                                    asset_id=asset_id, version=version,
606                                                                    product_id=product_id, test_type=test_type,
607                                                                    artifact_description=artifact_description,
608                                                                    upload_method=upload_method)
609
610    # upload test results file
611    response = upload_test_results_file(token, organization_context, test_id=test_id, file_path=file_path)
612    return response

Creates a new Asset Version for an existing asset, and uploads test results for that asset version. By default, this uses the existing Business Unit and Created By User for the Asset. If you need to change these, you can provide the IDs for them.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, optional): Business Unit ID to create the asset version for. If not provided, the existing Business Unit for the Asset will be used.
  • created_by_user_id (str, optional): Created By User ID to create the asset version for. If not provided, the existing Created By User for the Asset will be used.
  • asset_id (str, required): Asset ID to create the asset version for.
  • version (str, required): Version to create the asset version for.
  • file_path (str, required): Path to the test results file to upload.
  • product_id (str, optional): Product ID to create the asset version for. If not provided, the existing Product for the Asset will be used.
  • test_type (str, required): Test type. This must be one of the list of supported third party scanner types. For the full list of supported third party scanner types, see the Finite State API documentation.
  • artifact_description (str, optional): Description of the artifact being scanned (e.g. "Source Code Repository", "Container Image"). If not provided, the default artifact description will be used.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: If the asset_id, version, or file_path are not provided.
  • Exception: If the test_type is not a supported third party scanner type, or if the query fails.
Returns:

dict: The response from the GraphQL query, a createAssetVersion Object.

def create_product( token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None, product_description=None, vendor_id=None, vendor_name=None):
615def create_product(token, organization_context, business_unit_id=None, created_by_user_id=None, product_name=None,
616                   product_description=None, vendor_id=None, vendor_name=None):
617    """
618    Create a new Product.
619
620    Args:
621        token (str):
622            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
623        organization_context (str):
624            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
625        business_unit_id (str, required):
626            Business Unit ID to associate the product with.
627        created_by_user_id (str, required):
628            User ID of the user creating the product.
629        product_name (str, required):
630            The name of the Product being created.
631        product_description (str, optional):
632            The description of the Product being created.
633        vendor_id (str, optional):
634            Vendor ID to associate the product with. If not specified, vendor_name must be provided.
635        vendor_name (str, optional):
636            Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
637
638    Raises:
639        ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
640        Exception: Raised if the query fails.
641
642    Returns:
643        dict: createProduct Object
644    """
645
646    if not business_unit_id:
647        raise ValueError("Business unit ID is required")
648    if not created_by_user_id:
649        raise ValueError("Created by user ID is required")
650    if not product_name:
651        raise ValueError("Product name is required")
652
653    graphql_query = '''
654    mutation CreateProductMutation($input: CreateProductInput!) {
655        createProduct(input: $input) {
656            id
657            name
658            vendor {
659                name
660            }
661            group {
662                id
663                name
664            }
665            createdBy {
666                id
667                email
668            }
669            ctx {
670                businessUnit
671            }
672        }
673    }
674    '''
675
676    # Product name, business unit context, and creating user are required
677    variables = {
678        "input": {
679            "name": product_name,
680            "group": business_unit_id,
681            "createdBy": created_by_user_id,
682            "ctx": {
683                "businessUnit": business_unit_id
684            }
685        }
686    }
687
688    if product_description is not None:
689        variables["input"]["description"] = product_description
690
691    # If the vendor ID is specified, this will link the new product to the existing vendor
692    if vendor_id is not None:
693        variables["input"]["vendor"] = {
694            "id": vendor_id
695        }
696
697    # If the vendor name is specified, this will create a new vendor and link it to the new product
698    if vendor_name is not None:
699        variables["input"]["createVendor"] = {
700            "name": vendor_name
701        }
702
703    response = send_graphql_query(token, organization_context, graphql_query, variables)
704
705    return response['data']

Create a new Product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the product with.
  • created_by_user_id (str, required): User ID of the user creating the product.
  • product_name (str, required): The name of the Product being created.
  • product_description (str, optional): The description of the Product being created.
  • vendor_id (str, optional): Vendor ID to associate the product with. If not specified, vendor_name must be provided.
  • vendor_name (str, optional): Vendor name to associate the product with. This is used to create the Vendor if the vendor does not currently exist.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, or product_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createProduct Object

def create_test( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, tools=[], upload_method: UploadMethod = <UploadMethod.API: 'API'>):
708def create_test(
709    token,
710    organization_context,
711    business_unit_id=None,
712    created_by_user_id=None,
713    asset_id=None,
714    artifact_id=None,
715    test_name=None,
716    product_id=None,
717    test_type=None,
718    tools=[],
719    upload_method: UploadMethod = UploadMethod.API,
720):
721    """
722    Create a new Test object for uploading files.
723    This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary.
724    Please see the examples in the Github repository for more information:
725    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/upload_test_results.py
726    - https://github.com/FiniteStateInc/finite-state-sdk-python/blob/main/examples/uploading_a_binary.py
727
728    Args:
729        token (str):
730            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
731        organization_context (str):
732            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
733        business_unit_id (str, required):
734            Business Unit ID to associate the Test with.
735        created_by_user_id (str, required):
736            User ID of the user creating the Test.
737        asset_id (str, required):
738            Asset ID to associate the Test with.
739        artifact_id (str, required):
740            Artifact ID to associate the Test with.
741        test_name (str, required):
742            The name of the Test being created.
743        product_id (str, optional):
744            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
745        test_type (str, required):
746            The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
747        tools (list, optional):
748            List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
749        upload_method (UploadMethod, required):
750            The method of uploading the test results.
751
752    Raises:
753        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
754        Exception: Raised if the query fails.
755
756    Returns:
757        dict: createTest Object
758    """
759    if not business_unit_id:
760        raise ValueError("Business unit ID is required")
761    if not created_by_user_id:
762        raise ValueError("Created by user ID is required")
763    if not asset_id:
764        raise ValueError("Asset ID is required")
765    if not artifact_id:
766        raise ValueError("Artifact ID is required")
767    if not test_name:
768        raise ValueError("Test name is required")
769    if not test_type:
770        raise ValueError("Test type is required")
771
772    graphql_query = '''
773    mutation CreateTestMutation($input: CreateTestInput!) {
774        createTest(input: $input) {
775            id
776            name
777            artifactUnderTest {
778                id
779                name
780                assetVersion {
781                    id
782                    name
783                    asset {
784                        id
785                        name
786                        dependentProducts {
787                            id
788                            name
789                        }
790                    }
791                }
792            }
793            createdBy {
794                id
795                email
796            }
797            ctx {
798                asset
799                products
800                businessUnits
801            }
802            uploadMethod
803        }
804    }
805    '''
806
807    # Asset name, business unit context, and creating user are required
808    variables = {
809        "input": {
810            "name": test_name,
811            "createdBy": created_by_user_id,
812            "artifactUnderTest": artifact_id,
813            "testResultFileFormat": test_type,
814            "ctx": {
815                "asset": asset_id,
816                "businessUnits": [business_unit_id]
817            },
818            "tools": tools,
819            "uploadMethod": upload_method.value
820        }
821    }
822
823    if product_id is not None:
824        variables["input"]["ctx"]["products"] = product_id
825
826    response = send_graphql_query(token, organization_context, graphql_query, variables)
827    return response['data']

Create a new Test object for uploading files. This is an advanced method - you are probably looking for create_new_asset_version_and_upload_test_results or create_new_asset_version_and_upload_binary. Please see the examples in the Github repository for more information:

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): The type of test being created. Valid values are "cyclonedx" and "finite_state_binary_analysis".
  • tools (list, optional): List of Tool objects used to perform the test. Each Tool object is a dict that should have a "name" and "description" field. This is used to describe the actual scanner that was used to perform the test.
  • upload_method (UploadMethod, required): The method of uploading the test results.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, test_name, or test_type are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_binary_analysis( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
830def create_test_as_binary_analysis(token, organization_context, business_unit_id=None, created_by_user_id=None,
831                                   asset_id=None, artifact_id=None, test_name=None, product_id=None,
832                                   upload_method: UploadMethod = UploadMethod.API):
833    """
834    Create a new Test object for uploading files for Finite State Binary Analysis.
835
836    Args:
837        token (str):
838            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
839        organization_context (str):
840            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
841        business_unit_id (str, required):
842            Business Unit ID to associate the Test with.
843        created_by_user_id (str, required):
844            User ID of the user creating the Test.
845        asset_id (str, required):
846            Asset ID to associate the Test with.
847        artifact_id (str, required):
848            Artifact ID to associate the Test with.
849        test_name (str, required):
850            The name of the Test being created.
851        product_id (str, optional):
852            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
853        upload_method (UploadMethod, optional):
854            The method of uploading the test results. Default is UploadMethod.API.
855
856    Raises:
857        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
858        Exception: Raised if the query fails.
859
860    Returns:
861        dict: createTest Object
862    """
863    tools = [
864        {
865            "description": "SBOM and Vulnerability Analysis from Finite State Binary SCA and Binary SAST.",
866            "name": "Finite State Binary Analysis"
867        }
868    ]
869    return create_test(token, organization_context, business_unit_id=business_unit_id,
870                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
871                       test_name=test_name, product_id=product_id, test_type="finite_state_binary_analysis",
872                       tools=tools, upload_method=upload_method)

Create a new Test object for uploading files for Finite State Binary Analysis.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_cyclone_dx( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
875def create_test_as_cyclone_dx(
876    token,
877    organization_context,
878    business_unit_id=None,
879    created_by_user_id=None,
880    asset_id=None,
881    artifact_id=None,
882    test_name=None,
883    product_id=None,
884    upload_method: UploadMethod = UploadMethod.API,
885):
886    """
887    Create a new Test object for uploading CycloneDX files.
888
889    Args:
890        token (str):
891            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
892        organization_context (str):
893            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
894        business_unit_id (str, required):
895            Business Unit ID to associate the Test with.
896        created_by_user_id (str, required):
897            User ID of the user creating the Test.
898        asset_id (str, required):
899            Asset ID to associate the Test with.
900        artifact_id (str, required):
901            Artifact ID to associate the Test with.
902        test_name (str, required):
903            The name of the Test being created.
904        product_id (str, optional):
905            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
906        upload_method (UploadMethod, optional):
907            The method of uploading the test results. Default is UploadMethod.API.
908
909    Raises:
910        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
911        Exception: Raised if the query fails.
912
913    Returns:
914        dict: createTest Object
915    """
916    return create_test(token, organization_context, business_unit_id=business_unit_id,
917                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
918                       test_name=test_name, product_id=product_id, test_type="cyclonedx", upload_method=upload_method)

Create a new Test object for uploading CycloneDX files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def create_test_as_third_party_scanner( token, organization_context, business_unit_id=None, created_by_user_id=None, asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None, upload_method: UploadMethod = <UploadMethod.API: 'API'>):
921def create_test_as_third_party_scanner(token, organization_context, business_unit_id=None, created_by_user_id=None,
922                                       asset_id=None, artifact_id=None, test_name=None, product_id=None, test_type=None,
923                                       upload_method: UploadMethod = UploadMethod.API):
924    """
925    Create a new Test object for uploading Third Party Scanner files.
926
927    Args:
928        token (str):
929            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
930        organization_context (str):
931            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
932        business_unit_id (str, required):
933            Business Unit ID to associate the Test with.
934        created_by_user_id (str, required):
935            User ID of the user creating the Test.
936        asset_id (str, required):
937            Asset ID to associate the Test with.
938        artifact_id (str, required):
939            Artifact ID to associate the Test with.
940        test_name (str, required):
941            The name of the Test being created.
942        product_id (str, optional):
943            Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
944        test_type (str, required):
945            Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
946        upload_method (UploadMethod, optional):
947            The method of uploading the test results. Default is UploadMethod.API.
948
949    Raises:
950        ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
951        Exception: Raised if the query fails.
952
953    Returns:
954        dict: createTest Object
955    """
956    return create_test(token, organization_context, business_unit_id=business_unit_id,
957                       created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id,
958                       test_name=test_name, product_id=product_id, test_type=test_type, upload_method=upload_method)

Create a new Test object for uploading Third Party Scanner files.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • business_unit_id (str, required): Business Unit ID to associate the Test with.
  • created_by_user_id (str, required): User ID of the user creating the Test.
  • asset_id (str, required): Asset ID to associate the Test with.
  • artifact_id (str, required): Artifact ID to associate the Test with.
  • test_name (str, required): The name of the Test being created.
  • product_id (str, optional): Product ID to associate the Test with. If not specified, the Test will not be associated with a product.
  • test_type (str, required): Test type of the scanner which indicates the output file format from the scanner. Valid values are "cyclonedx" and others. For the full list see the API documentation.
  • upload_method (UploadMethod, optional): The method of uploading the test results. Default is UploadMethod.API.
Raises:
  • ValueError: Raised if business_unit_id, created_by_user_id, asset_id, artifact_id, or test_name are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: createTest Object

def download_asset_version_report( token, organization_context, asset_version_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
 961def download_asset_version_report(token, organization_context, asset_version_id=None, report_type=None,
 962                                  report_subtype=None, output_filename=None, verbose=False):
 963    """
 964    Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
 965
 966    Args:
 967        token (str):
 968            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
 969        organization_context (str):
 970            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
 971        asset_version_id (str, required):
 972            The Asset Version ID to download the report for.
 973        report_type (str, required):
 974            The file type of the report to download. Valid values are "CSV" and "PDF".
 975        report_subtype (str, required):
 976            The type of report to download. Based on available reports for the `report_type` specified
 977            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
 978            Valid values for PDF are "RISK_SUMMARY".
 979        output_filename (str, optional):
 980            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
 981        verbose (bool, optional):
 982            If True, will print additional information to the console. Defaults to False.
 983
 984    Raises:
 985        ValueError: Raised if required parameters are not provided.
 986        Exception: Raised if the query fails.
 987
 988    Returns:
 989        None
 990    """
 991    url = generate_report_download_url(token, organization_context, asset_version_id=asset_version_id,
 992                                       report_type=report_type, report_subtype=report_subtype, verbose=verbose)
 993
 994    # Send an HTTP GET request to the URL
 995    response = requests.get(url)
 996
 997    # Check if the request was successful (status code 200)
 998    if response.status_code == 200:
 999        # Open a local file in binary write mode and write the content to it
1000        if verbose:
1001            print("File downloaded successfully.")
1002        with open(output_filename, 'wb') as file:
1003            file.write(response.content)
1004            if verbose:
1005                print(f'Wrote file to {output_filename}')
1006    else:
1007        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific asset version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, required): The Asset Version ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def download_product_report( token, organization_context, product_id=None, report_type=None, report_subtype=None, output_filename=None, verbose=False):
1010def download_product_report(token, organization_context, product_id=None, report_type=None, report_subtype=None,
1011                            output_filename=None, verbose=False):
1012    """
1013    Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.
1014
1015    Args:
1016        token (str):
1017            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1018        organization_context (str):
1019            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1020        product_id (str, required):
1021            The Product ID to download the report for.
1022        report_type (str, required):
1023            The file type of the report to download. Valid values are "CSV".
1024        report_subtype (str, required):
1025            The type of report to download. Based on available reports for the `report_type` specified
1026            Valid values for CSV are "ALL_FINDINGS".
1027        output_filename (str, optional):
1028            The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
1029        verbose (bool, optional):
1030            If True, will print additional information to the console. Defaults to False.
1031    """
1032    url = generate_report_download_url(token, organization_context, product_id=product_id, report_type=report_type,
1033                                       report_subtype=report_subtype, verbose=verbose)
1034
1035    # Send an HTTP GET request to the URL
1036    response = requests.get(url)
1037
1038    # Check if the request was successful (status code 200)
1039    if response.status_code == 200:
1040        # Open a local file in binary write mode and write the content to it
1041        if verbose:
1042            print("File downloaded successfully.")
1043        with open(output_filename, 'wb') as file:
1044            file.write(response.content)
1045            if verbose:
1046                print(f'Wrote file to {output_filename}')
1047    else:
1048        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download a report for a specific product and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the report is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, required): The Product ID to download the report for.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS".
  • output_filename (str, optional): The local filename to save the report to. If not provided, the report will be saved to a file named "report.csv" or "report.pdf" in the current directory based on the report type.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
def download_sbom( token, organization_context, sbom_type='CYCLONEDX', sbom_subtype='SBOM_ONLY', asset_version_id=None, output_filename='sbom.json', verbose=False):
1051def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None,
1052                  output_filename="sbom.json", verbose=False):
1053    """
1054    Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
1055
1056    Args:
1057        token (str):
1058            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1059        organization_context (str):
1060            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1061        sbom_type (str, required):
1062            The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
1063        sbom_subtype (str, required):
1064            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
1065        asset_version_id (str, required):
1066            The Asset Version ID to download the SBOM for.
1067        output_filename (str, required):
1068            The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
1069        verbose (bool, optional):
1070            If True, will print additional information to the console. Defaults to False.
1071
1072    Raises:
1073        ValueError: Raised if required parameters are not provided.
1074        Exception: Raised if the query fails.
1075
1076    Returns:
1077        None
1078    """
1079    url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype,
1080                                     asset_version_id=asset_version_id, verbose=verbose)
1081
1082    # Send an HTTP GET request to the URL
1083    response = requests.get(url)
1084
1085    # Check if the request was successful (status code 200)
1086    if response.status_code == 200:
1087        # Open a local file in binary write mode and write the content to it
1088        if verbose:
1089            print("File downloaded successfully.")
1090        with open(output_filename, 'wb') as file:
1091            file.write(response.content)
1092            if verbose:
1093                print(f'Wrote file to {output_filename}')
1094    else:
1095        raise Exception(f"Failed to download the file. Status code: {response.status_code}")

Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
  • asset_version_id (str, required): The Asset Version ID to download the SBOM for.
  • output_filename (str, required): The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
  • verbose (bool, optional): If True, will print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

None

def file_chunks(file_path, chunk_size=5368709120):
1098def file_chunks(file_path, chunk_size=1024 * 1024 * 1024 * 5):
1099    """
1100    Helper method to read a file in chunks.
1101
1102    Args:
1103        file_path (str):
1104            Local path to the file to read.
1105        chunk_size (int, optional):
1106            The size of the chunks to read. Defaults to 5GB.
1107
1108    Yields:
1109        bytes: The next chunk of the file.
1110
1111    Raises:
1112        FileIO Exceptions: Raised if the file cannot be opened or read correctly.
1113    """
1114    with open(file_path, 'rb') as f:
1115        while True:
1116            chunk = f.read(chunk_size)
1117            if chunk:
1118                yield chunk
1119            else:
1120                break

Helper method to read a file in chunks.

Arguments:
  • file_path (str): Local path to the file to read.
  • chunk_size (int, optional): The size of the chunks to read. Defaults to 5GB.
Yields:

bytes: The next chunk of the file.

Raises:
  • FileIO Exceptions: Raised if the file cannot be opened or read correctly.
def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1123def get_all_artifacts(token, organization_context, artifact_id=None, business_unit_id=None):
1124    """
1125    Get all artifacts in the organization. Uses pagination to get all results.
1126
1127    Args:
1128        token (str):
1129            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1130        organization_context (str):
1131            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1132        artifact_id (str, optional):
1133            An optional Artifact ID if this is used to get a single artifact, by default None
1134        business_unit_id (str, optional):
1135            An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
1136
1137    Raises:
1138        Exception: Raised if the query fails.
1139
1140    Returns:
1141        list: List of Artifact Objects
1142    """
1143    return get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1144                                     queries.ALL_ARTIFACTS['variables'](artifact_id, business_unit_id), 'allAssets')

Get all artifacts in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • artifact_id (str, optional): An optional Artifact ID if this is used to get a single artifact, by default None
  • business_unit_id (str, optional): An optional Business Unit ID if this is used to get artifacts for a single business unit, by default None
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Artifact Objects

def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1147def get_all_assets(token, organization_context, asset_id=None, business_unit_id=None):
1148    """
1149    Gets all assets in the organization. Uses pagination to get all results.
1150
1151    Args:
1152        token (str):
1153            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1154        organization_context (str):
1155            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1156        asset_id (str, optional):
1157            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1158        business_unit_id (str, optional):
1159            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1160
1161    Raises:
1162        Exception: Raised if the query fails.
1163
1164    Returns:
1165        list: List of Asset Objects
1166    """
1167    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1168                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets all assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_all_asset_versions(token, organization_context):
1171def get_all_asset_versions(token, organization_context):
1172    """
1173    Get all asset versions in the organization. Uses pagination to get all results.
1174
1175    Args:
1176        token (str):
1177            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1178        organization_context (str):
1179            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1180
1181    Raises:
1182        Exception: Raised if the query fails.
1183
1184    Returns:
1185        list: List of AssetVersion Objects
1186    """
1187    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1188                                     queries.ALL_ASSET_VERSIONS['variables'], 'allAssetVersions')

Get all asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_all_asset_versions_for_product(token, organization_context, product_id):
1191def get_all_asset_versions_for_product(token, organization_context, product_id):
1192    """
1193    Get all asset versions for a product. Uses pagination to get all results.
1194
1195    Args:
1196        token (str):
1197            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1198        organization_context (str):
1199            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1200        product_id (str):
1201            The Product ID to get asset versions for
1202
1203    Returns:
1204        list: List of AssetVersion Objects
1205    """
1206    return get_all_paginated_results(token, organization_context, queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['query'],
1207                                     queries.ONE_PRODUCT_ALL_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Get all asset versions for a product. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str): The Product ID to get asset versions for
Returns:

list: List of AssetVersion Objects

def get_all_business_units(token, organization_context):
1210def get_all_business_units(token, organization_context):
1211    """
1212    Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.
1213
1214    Args:
1215        token (str):
1216            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1217        organization_context (str):
1218            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1219
1220    Raises:
1221        Exception: Raised if the query fails.
1222
1223    Returns:
1224        list: List of Group Objects
1225    """
1226    return get_all_paginated_results(token, organization_context, queries.ALL_BUSINESS_UNITS['query'],
1227                                     queries.ALL_BUSINESS_UNITS['variables'], 'allGroups')

Get all business units in the organization. NOTE: The return type here is Group. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Group Objects

def get_all_organizations(token, organization_context):
1230def get_all_organizations(token, organization_context):
1231    """
1232    Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.
1233
1234    Args:
1235        token (str):
1236            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1237        organization_context (str):
1238            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1239
1240    Raises:
1241        Exception: Raised if the query fails.
1242
1243    Returns:
1244        list: List of Organization Objects
1245    """
1246    return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'],
1247                                     queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')

Get all organizations available to the user. For most users there is only one organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Organization Objects

def get_all_paginated_results( token, organization_context, query, variables=None, field=None, limit=None):
1250def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
1251    """
1252    Get all results from a paginated GraphQL query
1253
1254    Args:
1255        token (str):
1256            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1257        organization_context (str):
1258            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1259        query (str):
1260            The GraphQL query string
1261        variables (dict, optional):
1262            Variables to be used in the GraphQL query, by default None
1263        field (str, required):
1264            The field in the response JSON that contains the results
1265        limit (int, optional):
1266            The maximum number of results to return. If not provided, will return all results. By default None
1267
1268    Raises:
1269        Exception: If the response status code is not 200, or if the field is not in the response JSON
1270
1271    Returns:
1272        list: List of results
1273    """
1274
1275    if not field:
1276        raise Exception("Error: field is required")
1277
1278    # query the API for the first page of results
1279    response_data = send_graphql_query(token, organization_context, query, variables)
1280
1281    # if there are no results, return an empty list
1282    if not response_data:
1283        return []
1284
1285    # create a list to store the results
1286    results = []
1287
1288    # add the first page of results to the list
1289    if field in response_data['data']:
1290        results.extend(response_data['data'][field])
1291    else:
1292        raise Exception(f"Error: {field} not in response JSON")
1293
1294    if len(response_data['data'][field]) > 0:
1295        # get the cursor from the last entry in the list
1296        cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1297
1298        while cursor:
1299            if limit is not None:
1300                if len(results) >= limit:
1301                    break
1302
1303            variables['after'] = cursor
1304
1305            # add the next page of results to the list
1306            response_data = send_graphql_query(token, organization_context, query, variables)
1307            results.extend(response_data['data'][field])
1308
1309            try:
1310                cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']
1311            except IndexError:
1312                # when there is no additional cursor, stop getting more pages
1313                cursor = None
1314
1315    return results

Get all results from a paginated GraphQL query

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
  • field (str, required): The field in the response JSON that contains the results
  • limit (int, optional): The maximum number of results to return. If not provided, will return all results. By default None
Raises:
  • Exception: If the response status code is not 200, or if the field is not in the response JSON
Returns:

list: List of results

def get_all_products(token, organization_context):
1318def get_all_products(token, organization_context):
1319    """
1320    Get all products in the organization. Uses pagination to get all results.
1321
1322    Args:
1323        token (str):
1324            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1325        organization_context (str):
1326            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1327
1328    Raises:
1329        Exception: Raised if the query fails.
1330
1331    Returns:
1332        list: List of Product Objects
1333
1334    .. deprecated:: 0.1.4. Use get_products instead.
1335    """
1336    warn('`get_all_products` is deprecated. Use: `get_products instead`', DeprecationWarning, stacklevel=2)
1337    return get_all_paginated_results(token, organization_context, queries.ALL_PRODUCTS['query'],
1338                                     queries.ALL_PRODUCTS['variables'], 'allProducts')

Get all products in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Product Objects

Deprecated since version 0.1.4. Use get_products instead..

def get_all_users(token, organization_context):
1341def get_all_users(token, organization_context):
1342    """
1343    Get all users in the organization. Uses pagination to get all results.
1344
1345    Args:
1346        token (str):
1347            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1348        organization_context (str):
1349            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1350
1351    Raises:
1352        Exception: Raised if the query fails.
1353
1354    Returns:
1355        list: List of User Objects
1356    """
1357    return get_all_paginated_results(token, organization_context, queries.ALL_USERS['query'],
1358                                     queries.ALL_USERS['variables'], 'allUsers')

Get all users in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of User Objects

def get_artifact_context(token, organization_context, artifact_id):
1361def get_artifact_context(token, organization_context, artifact_id):
1362    """
1363    Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.
1364
1365    Args:
1366        token (str):
1367            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1368        organization_context (str):
1369            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1370
1371    Raises:
1372        Exception: Raised if the query fails.
1373
1374    Returns:
1375        dict: Artifact Context Object
1376    """
1377    artifact = get_all_paginated_results(token, organization_context, queries.ALL_ARTIFACTS['query'],
1378                                         queries.ALL_ARTIFACTS['variables'](artifact_id, None), 'allAssets')
1379
1380    return artifact[0]['ctx']

Get the context for a single artifact. This is typically used for querying for existing context, which is used for role based access control. This is not used for creating new artifacts.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
Raises:
  • Exception: Raised if the query fails.
Returns:

dict: Artifact Context Object

def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1383def get_assets(token, organization_context, asset_id=None, business_unit_id=None):
1384    """
1385    Gets assets in the organization. Uses pagination to get all results.
1386
1387    Args:
1388        token (str):
1389            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1390        organization_context (str):
1391            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1392        asset_id (str, optional):
1393            Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
1394        business_unit_id (str, optional):
1395            Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
1396
1397    Raises:
1398        Exception: Raised if the query fails.
1399
1400    Returns:
1401        list: List of Asset Objects
1402    """
1403    return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'],
1404                                     queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')

Gets assets in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_id (str, optional): Asset ID to get, by default None. If None specified, will get all Assets. If specified, will get only the Asset with that ID.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Assets. If specified, will get only the Assets in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of Asset Objects

def get_asset_versions( token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1407def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
1408    """
1409    Gets asset versions in the organization. Uses pagination to get all results.
1410
1411    Args:
1412        token (str):
1413            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1414        organization_context (str):
1415            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1416        asset_version_id (str, optional):
1417            Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
1418        asset_id (str, optional):
1419            Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
1420        business_unit_id (str, optional):
1421            Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
1422
1423    Raises:
1424        Exception: Raised if the query fails.
1425
1426    Returns:
1427        list: List of AssetVersion Objects
1428    """
1429    return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'],
1430                                     queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id,
1431                                                                             asset_id=asset_id,
1432                                                                             business_unit_id=business_unit_id),
1433                                     'allAssetVersions')

Gets asset versions in the organization. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
  • asset_id (str, optional): Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
  • business_unit_id (str, optional): Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
  • Exception: Raised if the query fails.
Returns:

list: List of AssetVersion Objects

def get_auth_token( client_id, client_secret, token_url='https://platform.finitestate.io/api/v1/auth/token', audience='https://platform.finitestate.io/api/v1/graphql'):
1436def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
1437    """
1438    Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
1439
1440    Args:
1441        client_id (str):
1442            CLIENT_ID as specified in the API documentation
1443        client_secret (str):
1444            CLIENT_SECRET as specified in the API documentation
1445        token_url (str, optional):
1446            Token URL, by default TOKEN_URL
1447        audience (str, optional):
1448            Audience, by default AUDIENCE
1449
1450    Raises:
1451        Exception: If the response status code is not 200
1452
1453    Returns:
1454        str: Auth token. Use this token as the Authorization header in subsequent API calls.
1455    """
1456    payload = {
1457        "client_id": client_id,
1458        "client_secret": client_secret,
1459        "audience": AUDIENCE,
1460        "grant_type": "client_credentials"
1461    }
1462
1463    headers = {
1464        'content-type': "application/json"
1465    }
1466
1467    response = requests.post(TOKEN_URL, data=json.dumps(payload), headers=headers)
1468    if response.status_code == 200:
1469        auth_token = response.json()['access_token']
1470    else:
1471        raise Exception(f"Error: {response.status_code} - {response.text}")
1472
1473    return auth_token

Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET

Arguments:
  • client_id (str): CLIENT_ID as specified in the API documentation
  • client_secret (str): CLIENT_SECRET as specified in the API documentation
  • token_url (str, optional): Token URL, by default TOKEN_URL
  • audience (str, optional): Audience, by default AUDIENCE
Raises:
  • Exception: If the response status code is not 200
Returns:

str: Auth token. Use this token as the Authorization header in subsequent API calls.

def get_findings( token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None, severity=None, count=False, limit=None):
1476def get_findings(token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None,
1477                 severity=None, count=False, limit=None):
1478    """
1479    Gets all the Findings for an Asset Version. Uses pagination to get all results.
1480    Args:
1481        token (str):
1482            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1483        organization_context (str):
1484            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1485        asset_version_id (str, optional):
1486            Asset Version ID to get findings for. If not provided, will get all findings in the organization.
1487        finding_id (str, optional):
1488            The ID of a specific finding to get. If specified, will return only the finding with that ID.
1489        category (str, optional):
1490            The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
1491            This can be a single string, or an array of values.
1492        status (str, optional):
1493            The status of Findings to return.
1494        severity (str, optional):
1495            The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
1496        count (bool, optional):
1497            If True, will return the count of findings instead of the findings themselves. Defaults to False.
1498        limit (int, optional):
1499            The maximum number of findings to return. If not specified, will return all findings, up to the default of 1000.
1500
1501    Raises:
1502        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1503
1504    Returns:
1505        list: List of Finding Objects
1506    """
1507
1508    if count:
1509        return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'],
1510                                  queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id,
1511                                                                          finding_id=finding_id, category=category,
1512                                                                          status=status, severity=severity,
1513                                                                          limit=limit))["data"]["_allFindingsMeta"]
1514    else:
1515        return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'],
1516                                         queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id,
1517                                                                           finding_id=finding_id, category=category,
1518                                                                           status=status, severity=severity,
1519                                                                           limit=limit), 'allFindings', limit=limit)

Gets all the Findings for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get findings for. If not provided, will get all findings in the organization.
  • finding_id (str, optional): The ID of a specific finding to get. If specified, will return only the finding with that ID.
  • category (str, optional): The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category. This can be a single string, or an array of values.
  • status (str, optional): The status of Findings to return.
  • severity (str, optional): The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
  • count (bool, optional): If True, will return the count of findings instead of the findings themselves. Defaults to False.
  • limit (int, optional): The maximum number of findings to return. If not specified, will return all findings, up to the default of 1000.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Finding Objects

def get_product_asset_versions(token, organization_context, product_id=None):
1522def get_product_asset_versions(token, organization_context, product_id=None):
1523    """
1524    Gets all the asset versions for a product.
1525    Args:
1526        token (str):
1527            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1528        organization_context (str):
1529            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1530        product_id (str, optional):
1531            Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
1532    Raises:
1533        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1534    Returns:
1535        list: List of AssetVersion Objects
1536    """
1537    if not product_id:
1538        raise Exception("Product ID is required")
1539
1540    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'],
1541                                     queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')

Gets all the asset versions for a product.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get asset versions for. If not provided, will get all asset versions in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of AssetVersion Objects

def get_products( token, organization_context, product_id=None, business_unit_id=None) -> list:
1544def get_products(token, organization_context, product_id=None, business_unit_id=None) -> list:
1545    """
1546    Gets all the products for the specified business unit.
1547    Args:
1548        token (str):
1549            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1550        organization_context (str):
1551            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1552        product_id (str, optional):
1553            Product ID to get. If not provided, will get all products in the organization.
1554        business_unit_id (str, optional):
1555            Business Unit ID to get products for. If not provided, will get all products in the organization.
1556    Raises:
1557        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1558    Returns:
1559        list: List of Product Objects
1560    """
1561
1562    return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS['query'],
1563                                     queries.GET_PRODUCTS['variables'](product_id=product_id,
1564                                                                       business_unit_id=business_unit_id),
1565                                     'allProducts')

Gets all the products for the specified business unit.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • product_id (str, optional): Product ID to get. If not provided, will get all products in the organization.
  • business_unit_id (str, optional): Business Unit ID to get products for. If not provided, will get all products in the organization.
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Product Objects

def generate_report_download_url( token, organization_context, asset_version_id=None, product_id=None, report_type=None, report_subtype=None, verbose=False) -> str:
1568def generate_report_download_url(token, organization_context, asset_version_id=None, product_id=None, report_type=None,
1569                                 report_subtype=None, verbose=False) -> str:
1570    """
1571    Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report.
1572    This may take several minutes to complete, depending on the size of the report.
1573
1574    Args:
1575        token (str):
1576            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1577        organization_context (str):
1578            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1579        asset_version_id (str, optional):
1580            Asset Version ID to download the report for. Either `asset_version_id` or `product_id` are required.
1581        product_id (str, optional):
1582            Product ID to download the report for. Either `asset_version_id` or `product_id` are required.
1583        report_type (str, required):
1584            The file type of the report to download. Valid values are "CSV" and "PDF".
1585        report_subtype (str, required):
1586            The type of report to download. Based on available reports for the `report_type` specified
1587            Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE".
1588            Valid values for PDF are "RISK_SUMMARY".
1589        verbose (bool, optional):
1590            If True, print additional information to the console. Defaults to False.
1591    """
1592    if not report_type:
1593        raise ValueError("Report Type is required")
1594    if not report_subtype:
1595        raise ValueError("Report Subtype is required")
1596    if not asset_version_id and not product_id:
1597        raise ValueError("Asset Version ID or Product ID is required")
1598
1599    if asset_version_id and product_id:
1600        raise ValueError("Asset Version ID and Product ID are mutually exclusive")
1601
1602    if report_type not in ["CSV", "PDF"]:
1603        raise Exception(f"Report Type {report_type} not supported")
1604
1605    if report_type == "CSV":
1606        if report_subtype not in ["ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE"]:
1607            raise Exception(f"Report Subtype {report_subtype} not supported")
1608
1609        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1610                                                            report_type=report_type, report_subtype=report_subtype)
1611        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1612                                                              report_type=report_type, report_subtype=report_subtype)
1613
1614        response_data = send_graphql_query(token, organization_context, mutation, variables)
1615        if verbose:
1616            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1617
1618        # get exportJobId from the result
1619        if asset_version_id:
1620            export_job_id = response_data['data']['launchArtifactCSVExport']['exportJobId']
1621        elif product_id:
1622            export_job_id = response_data['data']['launchProductCSVExport']['exportJobId']
1623        else:
1624            raise Exception(
1625                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1626
1627        if verbose:
1628            print(f'Export Job ID: {export_job_id}')
1629
1630    if report_type == "PDF":
1631        if report_subtype not in ["RISK_SUMMARY"]:
1632            raise Exception(f"Report Subtype {report_subtype} not supported")
1633
1634        mutation = queries.LAUNCH_REPORT_EXPORT['mutation'](asset_version_id=asset_version_id, product_id=product_id,
1635                                                            report_type=report_type, report_subtype=report_subtype)
1636        variables = queries.LAUNCH_REPORT_EXPORT['variables'](asset_version_id=asset_version_id, product_id=product_id,
1637                                                              report_type=report_type, report_subtype=report_subtype)
1638
1639        response_data = send_graphql_query(token, organization_context, mutation, variables)
1640        if verbose:
1641            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1642
1643        # get exportJobId from the result
1644        if asset_version_id:
1645            export_job_id = response_data['data']['launchArtifactPdfExport']['exportJobId']
1646        elif product_id:
1647            export_job_id = response_data['data']['launchProductPdfExport']['exportJobId']
1648        else:
1649            raise Exception(
1650                "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1651
1652        if verbose:
1653            print(f'Export Job ID: {export_job_id}')
1654
1655    if not export_job_id:
1656        raise Exception(
1657            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1658
1659    # poll the API until the export job is complete
1660    sleep_time = 10
1661    total_time = 0
1662    if verbose:
1663        print(f'Polling every {sleep_time} seconds for export job to complete')
1664
1665    while True:
1666        time.sleep(sleep_time)
1667        total_time += sleep_time
1668        if verbose:
1669            print(f'Total time elapsed: {total_time} seconds')
1670
1671        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1672        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1673
1674        response_data = send_graphql_query(token, organization_context, query, variables)
1675
1676        if verbose:
1677            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1678
1679        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == 'COMPLETED':
1680            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1681                if verbose:
1682                    print(
1683                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1684                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of a report, and returns a pre-signed URL for downloading the report. This may take several minutes to complete, depending on the size of the report.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to download the report for. Either asset_version_id or product_id are required.
  • product_id (str, optional): Product ID to download the report for. Either asset_version_id or product_id are required.
  • report_type (str, required): The file type of the report to download. Valid values are "CSV" and "PDF".
  • report_subtype (str, required): The type of report to download. Based on available reports for the report_type specified Valid values for CSV are "ALL_FINDINGS", "ALL_COMPONENTS", "EXPLOIT_INTELLIGENCE". Valid values for PDF are "RISK_SUMMARY".
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
def generate_sbom_download_url( token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, verbose=False) -> str:
1687def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None,
1688                               verbose=False) -> str:
1689    """
1690    Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
1691    This may take several minutes to complete, depending on the size of SBOM.
1692
1693    Args:
1694        token (str):
1695            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1696        organization_context (str):
1697            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1698        sbom_type (str, required):
1699            The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
1700        sbom_subtype (str, required):
1701            The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
1702        asset_version_id (str, required):
1703            Asset Version ID to download the SBOM for.
1704        verbose (bool, optional):
1705            If True, print additional information to the console. Defaults to False.
1706
1707    Raises:
1708        ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
1709        Exception: Raised if the query fails.
1710
1711    Returns:
1712        str: URL to download the SBOM from.
1713    """
1714
1715    if not sbom_type:
1716        raise ValueError("SBOM Type is required")
1717    if not sbom_subtype:
1718        raise ValueError("SBOM Subtype is required")
1719    if not asset_version_id:
1720        raise ValueError("Asset Version ID is required")
1721
1722    if sbom_type not in ["CYCLONEDX", "SPDX"]:
1723        raise Exception(f"SBOM Type {sbom_type} not supported")
1724
1725    if sbom_type == "CYCLONEDX":
1726        if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
1727            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1728
1729        mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
1730        variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1731
1732        response_data = send_graphql_query(token, organization_context, mutation, variables)
1733        if verbose:
1734            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1735
1736        # get exportJobId from the result
1737        export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
1738        if verbose:
1739            print(f'Export Job ID: {export_job_id}')
1740
1741    if sbom_type == "SPDX":
1742        if sbom_subtype not in ["SBOM_ONLY"]:
1743            raise Exception(f"SBOM Subtype {sbom_subtype} not supported")
1744
1745        mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
1746        variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)
1747
1748        response_data = send_graphql_query(token, organization_context, mutation, variables)
1749        if verbose:
1750            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1751
1752        # get exportJobId from the result
1753        export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
1754        if verbose:
1755            print(f'Export Job ID: {export_job_id}')
1756
1757    if not export_job_id:
1758        raise Exception(
1759            "Error: Export Job ID not found - this should not happen, please contact your Finite State representative")
1760
1761    # poll the API until the export job is complete
1762    sleep_time = 10
1763    total_time = 0
1764    if verbose:
1765        print(f'Polling every {sleep_time} seconds for export job to complete')
1766    while True:
1767        time.sleep(sleep_time)
1768        total_time += sleep_time
1769        if verbose:
1770            print(f'Total time elapsed: {total_time} seconds')
1771
1772        query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
1773        variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)
1774
1775        response_data = send_graphql_query(token, organization_context, query, variables)
1776
1777        if verbose:
1778            print(f'Response Data: {json.dumps(response_data, indent=4)}')
1779
1780        if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
1781            if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
1782                if verbose:
1783                    print(
1784                        f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
1785                return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']

Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM. This may take several minutes to complete, depending on the size of SBOM.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • sbom_type (str, required): The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
  • sbom_subtype (str, required): The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
  • asset_version_id (str, required): Asset Version ID to download the SBOM for.
  • verbose (bool, optional): If True, print additional information to the console. Defaults to False.
Raises:
  • ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
  • Exception: Raised if the query fails.
Returns:

str: URL to download the SBOM from.

def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1788def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
1789    """
1790    Gets all the Software Components for an Asset Version. Uses pagination to get all results.
1791    Args:
1792        token (str):
1793            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1794        organization_context (str):
1795            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1796        asset_version_id (str, optional):
1797            Asset Version ID to get software components for.
1798        type (str, optional):
1799            The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
1800    Raises:
1801        Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
1802    Returns:
1803        list: List of Software Component Objects
1804    """
1805    if not asset_version_id:
1806        raise Exception("Asset Version ID is required")
1807
1808    return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'],
1809                                     queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id,
1810                                                                                  type=type),
1811                                     'allSoftwareComponentInstances')

Gets all the Software Components for an Asset Version. Uses pagination to get all results.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • asset_version_id (str, optional): Asset Version ID to get software components for.
  • type (str, optional): The type of software component to return. Valid values are "APPLICATION", "ARCHIVE", "CONTAINER", "DEVICE", "FILE", "FIRMWARE", "FRAMEWORK", "INSTALL", "LIBRARY", "OPERATING_SYSTEM", "OTHER", "SERVICE", "SOURCE". If not specified, will return all software components. See https://docs.finitestate.io/types/software-component-type
Raises:
  • Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:

list: List of Software Component Objects

def search_sbom( token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False) -> list:
1814def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT',
1815                case_sensitive=False) -> list:
1816    """
1817    Searches the SBOM of a specific asset version or the entire organization for matching software components.
1818    Search Methods: EXACT or CONTAINS
1819    An exact match will return only the software component whose name matches the name exactly.
1820    A contains match will return all software components whose name contains the search string.
1821
1822    Args:
1823        token (str):
1824            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1825        organization_context (str):
1826            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1827        name (str, required):
1828            Name of the software component to search for.
1829        version (str, optional):
1830            Version of the software component to search for. If not specified, will search for all versions of the software component.
1831        asset_version_id (str, optional):
1832            Asset Version ID to search for software components in. If not specified, will search the entire organization.
1833        search_method (str, optional):
1834            Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
1835        case_sensitive (bool, optional):
1836            Whether or not to perform a case sensitive search. Defaults to False.
1837    Raises:
1838        ValueError: Raised if name is not provided.
1839        Exception: Raised if the query fails.
1840    Returns:
1841        list: List of SoftwareComponentInstance Objects
1842    """
1843    if asset_version_id:
1844        query = '''
1845query GetSoftwareComponentInstances(
1846    $filter: SoftwareComponentInstanceFilter
1847    $after: String
1848    $first: Int
1849) {
1850    allSoftwareComponentInstances(
1851        filter: $filter
1852        after: $after
1853        first: $first
1854    ) {
1855        _cursor
1856        id
1857        name
1858        version
1859        originalComponents {
1860            id
1861            name
1862            version
1863        }
1864    }
1865}
1866'''
1867    else:
1868        # gets the asset version info that contains the software component
1869        query = '''
1870query GetSoftwareComponentInstances(
1871    $filter: SoftwareComponentInstanceFilter
1872    $after: String
1873    $first: Int
1874) {
1875    allSoftwareComponentInstances(
1876        filter: $filter
1877        after: $after
1878        first: $first
1879    ) {
1880        _cursor
1881        id
1882        name
1883        version
1884        assetVersion {
1885            id
1886            name
1887            asset {
1888                id
1889                name
1890            }
1891        }
1892    }
1893}
1894'''
1895
1896    variables = {
1897        "filter": {
1898            "mergedComponentRefId": None
1899        },
1900        "after": None,
1901        "first": 100
1902    }
1903
1904    if asset_version_id:
1905        variables["filter"]["assetVersionRefId"] = asset_version_id
1906
1907    if search_method == 'EXACT':
1908        if case_sensitive:
1909            variables["filter"]["name"] = name
1910        else:
1911            variables["filter"]["name_like"] = name
1912    elif search_method == 'CONTAINS':
1913        variables["filter"]["name_contains"] = name
1914
1915    if version:
1916        if search_method == 'EXACT':
1917            variables["filter"]["version"] = version
1918        elif search_method == 'CONTAINS':
1919            variables["filter"]["version_contains"] = version
1920
1921    records = get_all_paginated_results(token, organization_context, query, variables=variables,
1922                                        field="allSoftwareComponentInstances")
1923
1924    return records

Searches the SBOM of a specific asset version or the entire organization for matching software components. Search Methods: EXACT or CONTAINS An exact match will return only the software component whose name matches the name exactly. A contains match will return all software components whose name contains the search string.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • name (str, required): Name of the software component to search for.
  • version (str, optional): Version of the software component to search for. If not specified, will search for all versions of the software component.
  • asset_version_id (str, optional): Asset Version ID to search for software components in. If not specified, will search the entire organization.
  • search_method (str, optional): Search method to use. Valid values are "EXACT" and "CONTAINS". Defaults to "EXACT".
  • case_sensitive (bool, optional): Whether or not to perform a case sensitive search. Defaults to False.
Raises:
  • ValueError: Raised if name is not provided.
  • Exception: Raised if the query fails.
Returns:

list: List of SoftwareComponentInstance Objects

def send_graphql_query(token, organization_context, query, variables=None):
1927def send_graphql_query(token, organization_context, query, variables=None):
1928    """
1929    Send a GraphQL query to the API
1930
1931    Args:
1932        token (str):
1933            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
1934        organization_context (str):
1935            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1936        query (str):
1937            The GraphQL query string
1938        variables (dict, optional):
1939            Variables to be used in the GraphQL query, by default None
1940
1941    Raises:
1942        Exception: If the response status code is not 200
1943
1944    Returns:
1945        dict: Response JSON
1946    """
1947    headers = {
1948        'Content-Type': 'application/json',
1949        'Authorization': f'Bearer {token}',
1950        'Organization-Context': organization_context
1951    }
1952    data = {
1953        'query': query,
1954        'variables': variables
1955    }
1956
1957    response = requests.post(API_URL, headers=headers, json=data)
1958
1959    if response.status_code == 200:
1960        thejson = response.json()
1961
1962        if "errors" in thejson:
1963            raise Exception(f"Error: {thejson['errors']}")
1964
1965        return thejson
1966    else:
1967        raise Exception(f"Error: {response.status_code} - {response.text}")

Send a GraphQL query to the API

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • query (str): The GraphQL query string
  • variables (dict, optional): Variables to be used in the GraphQL query, by default None
Raises:
  • Exception: If the response status code is not 200
Returns:

dict: Response JSON

def update_finding_statuses( token, organization_context, user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
1970def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None,
1971                            justification=None, response=None, comment=None):
1972    """
1973    Updates the status of a findings or multiple findings. This is a blocking call.
1974
1975    Args:
1976        token (str):
1977            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
1978        organization_context (str):
1979            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
1980        user_id (str, required):
1981            User ID to update the finding status for.
1982        finding_ids (str, required):
1983            Finding ID to update the status for.
1984        status (str, required):
1985            Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
1986        justification (str, optional):
1987            Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
1988        response (str, optional):
1989            Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see  https://docs.finitestate.io/types/finding-status-response-enum
1990        comment (str, optional):
1991            Optional comment to add to the finding status update.
1992
1993    Raises:
1994        ValueError: Raised if required parameters are not provided.
1995        Exception: Raised if the query fails.
1996
1997    Returns:
1998        dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
1999    """
2000    if not user_id:
2001        raise ValueError("User ID is required")
2002    if not finding_ids:
2003        raise ValueError("Finding ID is required")
2004    if not status:
2005        raise ValueError("Status is required")
2006
2007    mutation = queries.UPDATE_FINDING_STATUSES['mutation']
2008    variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status,
2009                                                             justification=justification, response=response,
2010                                                             comment=comment)
2011
2012    return send_graphql_query(token, organization_context, mutation, variables)

Updates the status of a findings or multiple findings. This is a blocking call.

Arguments:
  • token (str): Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
  • organization_context (str): Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
  • user_id (str, required): User ID to update the finding status for.
  • finding_ids (str, required): Finding ID to update the status for.
  • status (str, required): Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
  • justification (str, optional): Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
  • response (str, optional): Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
  • comment (str, optional): Optional comment to add to the finding status update.
Raises:
  • ValueError: Raised if required parameters are not provided.
  • Exception: Raised if the query fails.
Returns:

dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response

def upload_file_for_binary_analysis( token, organization_context, test_id=None, file_path=None, chunk_size=5368709120, quick_scan=False):
2015def upload_file_for_binary_analysis(token, organization_context, test_id=None, file_path=None,
2016                                    chunk_size=1024 * 1024 * 1024 * 5, quick_scan=False):
2017    """
2018    Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. Chunk size defaults to 5GB.
2019    NOTE: This is NOT for uploading third party scanner results. Use upload_test_results_file for that.
2020
2021    Args:
2022        token (str):
2023            Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
2024        organization_context (str):
2025            Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
2026        test_id (str, required):
2027            Test ID to upload the file for.
2028        file_path (str, required):
2029            Local path to the file to upload.
2030        chunk_size (int, optional):
2031            The size of the chunks to read. Defaults to 5GB.
2032        quick_scan (bool, optional):
2033            If True, will perform a quick scan of the Binary. Defaults to False (Full Scan). For details, please see the API documentation.
2034
2035    Raises:
2036        ValueError: Raised if test_id or file_path are not provided.
2037        Exception: Raised if the query fails.
2038
2039    Returns:
2040        dict: The response from the GraphQL query, a completeMultipartUpload Object.
2041    """
2042    # To upload a file for Binary Analysis, you must use the generateMultiplePartUploadUrl mutation
2043
2044    if not test_id:
2045        raise ValueError("Test ID is required")
2046    if not file_path:
2047        raise ValueError("File path is required")
2048
2049    # Start Multi-part Upload
2050    graphql_query = '''
2051    mutation Start($testId: ID!) {
2052        startMultipartUploadV2(testId: $testId) {
2053            uploadId
2054            key
2055        }
2056    }
2057    '''
2058
2059    variables = {
2060        "testId": test_id