Source code for deephaven.experimental.s3

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from typing import Optional, Union
from warnings import warn

import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.time import DurationLike, to_j_duration

# If we move S3 to a permanent module, we should remove this try/except block and just import the types directly.
try:
    _JCredentials = jpy.get_type("io.deephaven.extensions.s3.Credentials")
    _JS3Instructions = jpy.get_type("io.deephaven.extensions.s3.S3Instructions")
except Exception:
    _JCredentials = None
    _JS3Instructions = None

"""
    This module is useful for reading from and writing to S3-compatible APIs.
    Importing this module requires the S3 specific deephaven extensions (artifact name deephaven-extensions-s3) to be
    included in the package. This is an opt-out functionality included by default. If not included, importing this
    module will fail to find the java types.
"""

[docs]class Credentials(JObjectWrapper): """ Credentials object for authenticating with an S3 server. """ j_object_type = _JCredentials def __init__(self, _j_object: jpy.JType): """ Initializes the credentials object. Args: _j_object (Credentials): the Java credentials object. """ self._j_object = _j_object @property def j_object(self) -> jpy.JType: return self._j_object
[docs] @classmethod def resolving(cls) -> 'Credentials': """ Default credentials provider used by Deephaven which resolves credentials in the following order: 1. If a profile name, config file path, or credentials file path is provided via S3 Instructions, use the profile_name for loading the credentials from the config and credentials file and fail if none is found. 2. Otherwise, use the default AWS SDK behavior that looks for credentials in this order: Java System Properties (`aws.accessKeyId` and `aws.secretAccessKey`), Environment Variables (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`), Credential profiles file at the default location (~/.aws/credentials), or Instance profile credentials delivered through the Amazon EC2 metadata service. If still none found, fall back to anonymous credentials, which can only be used to read data with S3 policy set to allow anonymous access. Returns: Credentials: the credentials object. """ return cls(_JCredentials.resolving())
[docs] @classmethod def default(cls) -> 'Credentials': """ Default credentials provider used by the AWS SDK that looks for credentials in this order: Java System Properties (`aws.accessKeyId` and `aws.secretAccessKey`), Environment Variables (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`), Credential profiles file at the default location (~/.aws/credentials), and Instance profile credentials delivered through the Amazon EC2 metadata service. Returns: Credentials: the credentials object. """ return cls(_JCredentials.defaultCredentials())
[docs] @classmethod def basic(cls, access_key_id: str, secret_access_key: str) -> 'Credentials': """ Basic credentials provider with the specified access key id and secret access key. Args: access_key_id (str): the access key id, used to identify the user. secret_access_key (str): the secret access key, used to authenticate the user. Returns: Credentials: the credentials object. """ return cls(_JCredentials.basic(access_key_id, secret_access_key))
[docs] @classmethod def anonymous(cls) -> 'Credentials': """ Anonymous credentials provider, which can only be used to read data with S3 policy set to allow anonymous access. Returns: Credentials: the credentials object. """ return cls(_JCredentials.anonymous())
[docs] @classmethod def profile(cls) -> 'Credentials': """ Use the profile name, config file path, or credentials file path from S3 Instructions for loading the credentials and fail if none found. Returns: Credentials: the credentials object. """ return cls(_JCredentials.profile())
[docs]class S3Instructions(JObjectWrapper): """ S3Instructions provides specialized instructions for reading from and writing to S3-compatible APIs. """ j_object_type = _JS3Instructions or type(None) def __init__(self, region_name: Optional[str] = None, credentials: Optional[Credentials] = None, max_concurrent_requests: Optional[int] = None, read_ahead_count: Optional[int] = None, fragment_size: Optional[int] = None, connection_timeout: Optional[DurationLike] = None, read_timeout: Optional[DurationLike] = None, access_key_id: Optional[str] = None, secret_access_key: Optional[str] = None, anonymous_access: bool = False, endpoint_override: Optional[str] = None, write_part_size: Optional[int] = None, num_concurrent_write_parts: Optional[int] = None, profile_name: Optional[str] = None, config_file_path: Optional[str] = None, credentials_file_path: Optional[str] = None): """ Initializes the instructions. Args: region_name (str): the region name for reading parquet files. If not provided, the default region will be picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the {user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2. If no region name is derived from the above chain or the derived region name is incorrect for the bucket accessed, the correct region name will be derived internally, at the cost of one additional request. credentials (Credentials): the credentials object for authenticating to the S3 server, defaults to Credentials.resolving(). max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 256. read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment. fragment_size (int): the maximum size of each fragment to read in bytes, defaults to 65536. If there are fewer bytes remaining in the file, the fetched fragment can be smaller. connection_timeout (DurationLike): the amount of time to wait when initially establishing a connection before giving up and timing out. Can be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time duration types. Default to 2 seconds. read_timeout (DurationLike): the amount of time to wait when reading a fragment before giving up and timing out. Can be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time duration types. Default to 2 seconds. access_key_id (str): (Deprecated) the access key for reading files. Both access key and secret access key must be provided to use static credentials. If you specify both access key and secret key, then you cannot provide other credentials like setting anonymous_access or credentials argument. This option is deprecated and should be replaced by setting credentials as Credentials.basic(access_key_id, secret_access_key). secret_access_key (str): (Deprecated) the secret access key for reading files. Both access key and secret key must be provided to use static credentials. If you specify both access key and secret key, then you cannot provide other credentials like setting anonymous_access or credentials argument. This option is deprecated and should be replaced by setting credentials as Credentials.basic(access_key_id, secret_access_key). anonymous_access (bool): (Deprecated) use anonymous credentials, this is useful when the S3 policy has been set to allow anonymous access. By default, is False. If you set this to True, you cannot provide other credentials like setting access_key_id or credentials argument. This option is deprecated and should be replaced by setting credentials as Credentials.anonymous(). endpoint_override (str): the endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. write_part_size (int): The part or chunk size when writing to S3. The default is 10 MiB. The minimum allowed part size is 5242880. Higher part size may increase throughput but also increase memory usage. Writing a single file to S3 can be done in a maximum of 10,000 parts, so the maximum size of a single file that can be written is about 98 GiB for the default part size. num_concurrent_write_parts (int): the maximum number of parts or chunks that can be uploaded concurrently when writing to S3 without blocking, defaults to 64. Setting a higher value may increase throughput, but may also increase memory usage. profile_name (str): the profile name used for configuring the default region, credentials, etc., when reading or writing to S3. If not provided, the AWS SDK picks the profile name from the 'aws.profile' system property, the "AWS_PROFILE" environment variable, or defaults to the string "default". Setting a profile name assumes that the credentials are provided via this profile; if that is not the case, you must explicitly set credentials using the access_key_id and secret_access_key. config_file_path (str): the path to the configuration file to use for configuring the default region, role_arn, output etc. when reading or writing to S3. If not provided, the AWS SDK picks the configuration file from the 'aws.configFile' system property, the "AWS_CONFIG_FILE" environment variable, or defaults to "{user.home}/.aws/config". Setting a configuration file path assumes that the credentials are provided via the config and credentials files; if that is not the case, you must explicitly set credentials using the access_key_id and secret_access_key. For reference on the configuration file format, check https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html credentials_file_path (str): the path to the credentials file to use for configuring the credentials, region, etc. when reading or writing to S3. If not provided, the AWS SDK picks the credentials file from the 'aws.credentialsFile' system property, the "AWS_CREDENTIALS_FILE" environment variable, or defaults to "{user.home}/.aws/credentials". Setting a credentials file path assumes that the credentials are provided via the config and credentials files; if that is not the case, you must explicitly set credentials using the access_key_id and secret_access_key. The main difference between config_file_path and credentials_file_path is around the conventions used in the files. For reference on the credentials file format, check https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html Raises: DHError: If unable to build the instructions object. """ if not _JS3Instructions or not _JCredentials: raise DHError(message="S3Instructions requires the S3 specific deephaven extensions to be included in " "the package") try: builder = self.j_object_type.builder() if region_name is not None: builder.regionName(region_name) if max_concurrent_requests is not None: builder.maxConcurrentRequests(max_concurrent_requests) if read_ahead_count is not None: builder.readAheadCount(read_ahead_count) if fragment_size is not None: builder.fragmentSize(fragment_size) if connection_timeout is not None: builder.connectionTimeout(to_j_duration(connection_timeout)) if read_timeout is not None: builder.readTimeout(to_j_duration(read_timeout)) if ((access_key_id is not None and secret_access_key is None) or (access_key_id is None and secret_access_key is not None)): raise DHError("Either both access_key_id and secret_access_key must be provided or neither") def throw_multiple_credentials_error(credentials1: str, credentials2: str): raise DHError(f"Only one set of credentials can be set, but found {credentials1} and {credentials2}") # Configure the credentials if access_key_id is not None: warn('access_key_id is deprecated, prefer setting credentials as ' 'Credentials.basic(access_key_id, secret_access_key)', DeprecationWarning, stacklevel=2) # TODO(deephaven-core#6165): Delete deprecated parameters if anonymous_access: throw_multiple_credentials_error("access_key_id", "anonymous_access") if credentials is not None: throw_multiple_credentials_error("access_key_id", "credentials") builder.credentials(_JCredentials.basic(access_key_id, secret_access_key)) elif anonymous_access: warn("anonymous_access is deprecated, prefer setting credentials as Credentials.anonymous()", DeprecationWarning, stacklevel=2) # TODO(deephaven-core#6165): Delete deprecated parameters if credentials is not None: throw_multiple_credentials_error("anonymous_access", "credentials") builder.credentials(_JCredentials.anonymous()) elif credentials is not None: builder.credentials(credentials.j_object) if endpoint_override is not None: builder.endpointOverride(endpoint_override) if write_part_size is not None: builder.writePartSize(write_part_size) if num_concurrent_write_parts is not None: builder.numConcurrentWriteParts(num_concurrent_write_parts) if profile_name is not None: builder.profileName(profile_name) if config_file_path is not None: builder.configFilePath(config_file_path) if credentials_file_path is not None: builder.credentialsFilePath(credentials_file_path) self._j_object = builder.build() except Exception as e: raise DHError(e, "Failed to build S3 instructions") from e @property def j_object(self) -> jpy.JType: return self._j_object