Source code for lib.api

from .converter import PandasConverter
from .rpcs import GrpcApi
from .utils import CiphermodeException, normalize_address, parse_sql_permissions
from .auth import AuthHandler

from .proto import common_pb2


[docs] class CiphermodeApi: def __init__(self, address, auth_handler, cert=None, tls_domain=None, private_key=None, certificate_chain=None, *args, **kwargs): """ Initialize the CiphermodeApi instance. Args: address (str): The address of the server. auth_handler (AuthenticationHandler): An instance of an authentication handler. cert (str, optional): Path to a TLS certificate file. tls_domain (str, optional): The domain protected by the TLS certificate. private_key (str, optional): The client's private key. certificate_chain (str, optional): The client's certificate chain. *args: Arguments for the PandasConverter. **kwargs: Kwargs for the PandasConverter. """ self.stub = GrpcApi( address, auth_handler, cert, tls_domain, private_key, certificate_chain) self.converter = PandasConverter(*args, **kwargs)
[docs] def list_users(self): """ List all users. Returns: DataFrame: A pandas DataFrame containing the list of users. """ ids = self.list_users_ids() res = self.stub.populate_users(ids) return self.converter.list_users(res)
[docs] def list_users_ids(self): """ List the IDs of all users. Returns: list[str]: A list of user IDs. """ return self.stub.list_users()
[docs] def add_user_role(self, user_id, role): """ Add a role to a user. Valid roles are: - 'data_owner': [SMPC] The user can upload datasets and approve computations using them. - 'analyst': [SMPC] The user can create computations. - 'admin': [SMPC] The user can perform all actions of 'data_owner' and 'analyst', and can modify acls and user roles. Args: user_id (str): The ID of the user. role (str): The role to be added to the user. Returns: DataFrame: A pandas DataFrame containing the updated user information. """ self.stub.add_user_role(user_id=user_id, role=role) users = self.stub.populate_users([user_id]) return self.converter.list_users(users)
[docs] def remove_user_role(self, user_id, role): """ Remove a role from a user. Args: user_id (str): The ID of the user. role (str): The role to be removed from the user. Returns: DataFrame: A pandas DataFrame containing the updated user information. """ self.stub.remove_user_role(user_id=user_id, role=role) users = self.stub.populate_users([user_id]) return self.converter.list_users(users)
[docs] def list_groups(self): """ List all groups. Returns: DataFrame: A pandas DataFrame containing the list of groups. """ ids = self.list_groups_ids() res = self.stub.populate_groups(ids) return self.converter.list_groups(res)
[docs] def list_groups_ids(self): """ List the IDs of all groups. Returns: list[str]: A list of group IDs. """ return self.stub.list_groups()
[docs] def run_gc(self): """ Run garbage collection. Returns: int: The number of collected values. """ return self.stub.run_gc()
[docs] def node_connections(self): """ Get node connections. Returns: DataFrame: A pandas DataFrame containing the node connections. """ res = self.stub.node_connections() return self.converter.node_connections(res)
[docs] def local_node_connections(self): """ Get local node connections. Returns: list: Local node connections. """ res = self.stub.local_node_connections() # TODO: reuse things from `self.converter.node_connections` return res.stats.connections
[docs] def build_info(self): """ Get the build information. Returns: Object: An object containing the build information. """ return self.stub.build_info()
[docs] def upload_dataset(self, name='', description='', type='columnwise', endpoint='', data=None, column_permissions='everything', sql_permissions='', include_report=True, publish=False, async_init=False, allow_secure_test=False): """ Upload a dataset. Args: name (str, optional): The name of the dataset. description (str, optional): A description of the dataset. type (str, optional): The type of the dataset. Default is 'columnwise', available options are {'typed_value', 'columnwise', 'rowwise', 'model'}. endpoint (str, optional): In case of non-local datasets (cloud storage, remote SQL server), the address of the dataset object. data (list, optional): In case of local datasets, the data to upload (CSV files for columnwise/rowwise types, binary data of an ONNX model, or TypedValue JSON otherwise). column_permissions (str, optional): The column permissions of the dataset. Default is 'everything'. Avaliable options are {'everything', 'everything_local', None}. sql_permissions (str, optional): The SQL permissions of the dataset. include_report (bool, optional): Whether to include a report in the upload. publish (bool, optional): Whether to make dataset visible for all organizations. async_init (bool, optional): Whether to download the dataset from the `endpoint` asynchronously. allow_secure_test (bool, optional): Whether to allow the dataset to be used in SecureTest computations. Returns: A pandas Series containing the uploaded dataset. The 'id' field in the return value is the ID of the uploaded dataset, which is used to refer to this dataset in computations and other operations. If async_init was True, user should wait for the dataset to be finalized before using it. This status of the dataset can be checked by calling `self.get_dataset()` with the returned dataset ID. Raises: CiphermodeException: If both endpoint and data are specified, or if permissions are given for a non-columnwise dataset. """ if endpoint and data: raise CiphermodeException( 'Cannot specify both endpoint and data for dataset upload') permissions = common_pb2.PermissionConfig() if (column_permissions is not None) and (column_permissions != 'everything'): if type != 'columnwise': raise CiphermodeException( 'Only columnwise datasets can have column permissions') permissions.column_permissions.global_permission.permission_type = { 'everything': common_pb2.ColumnPermissions.Permission.PermissionType.EVERYTHING, 'everything_local': common_pb2.ColumnPermissions.Permission.PermissionType.EVERYTHING_LOCAL}[column_permissions] if allow_secure_test and not sql_permissions: default_permissions = """ { plaintext_allowed: false, join_allowed: true, aggregate_allowed: true, grouping_allowed: false, filtering_allowed: true, aggregate_required: true, join_required: true, } """ sql_permissions = f"default_permission {{ global : {default_permissions}, local : {default_permissions} }}" if sql_permissions: if type != 'columnwise': raise CiphermodeException( 'Only columnwise datasets can have SQL permissions') permissions.sql_column_permissions.MergeFrom( parse_sql_permissions(sql_permissions)) if type == 'onnx_model': raise CiphermodeException('Working with ONNX models unsuported in this version') res = self.stub.upload_dataset(data, {'columnwise': common_pb2.DatasetType.COLUMNWISE_TABLE, 'rowwise': common_pb2.DatasetType.ROWWISE_TABLE, 'typed_value': common_pb2.DatasetType.SINGLE_VALUE, 'onnx_model': common_pb2.DatasetType.SINGLE_VALUE}[type], name, description, permissions, include_report, endpoint, publish, async_init) return self.converter.list_datasets(res).iloc[0]
[docs] def upload_and_publish_dataset(self, *args, **kwargs): """ Upload a dataset and than make it visible for all organizations. See `upload_dataset` for arguments. Returns: DataFrame: A pandas DataFrame containing the uploaded and published dataset. """ kwargs['publish'] = True return self.upload_dataset(*args, **kwargs)
[docs] def publish_dataset(self, id): """ Make the dataset visible for all organizations. Args: id (str): The ID of the dataset. Returns: DataFrame: A pandas DataFrame containing the published dataset. """ res = self.stub.expose_dataset(id) return self.converter.list_datasets(res)
[docs] def list_datasets(self): """ List all datasets. For each dataset, lists metadata about the dataset object, but not about the data itself. e.g. the name, description, visibility, and permissions of the dataset. Returns: DataFrame: A pandas DataFrame containing the list of datasets. """ ids = self.list_datasets_ids() res = self.stub.populate_datasets(ids) return self.converter.list_datasets(res)
[docs] def list_datasets_ids(self): """ List the IDs of all datasets. Returns: list[str]: A list of dataset IDs. """ return self.stub.list_datasets()
[docs] def show_dataset(self, dataset_id): """ Display metadata about the dataset with the specified ID. Returns metadata about the data stored in this object, e.g. the number of rows and columns, the type of data in the table, permissions, and the number of shards (if applicable). Args: dataset_id (str): The ID of the dataset. Returns: DataFrame: A pandas DataFrame containing the dataset information. """ res = self.stub.populate_datasets([dataset_id]) (_, resp, *__) = res[0] dataset_values = self.stub.get_dataset_values(dataset_id) return self.converter.show_dataset(resp.dataset, dataset_values)
[docs] def get_dataset(self, dataset_id): """ Get information about the dataset with the specified ID. Args: dataset_id (str): The ID of the dataset. Returns: Dataset: Metadata about the dataset with the specified ID, e.g. name, description, visibility, permissions. Does not return metadata about the data itself. """ (_, resp, *__) = self.stub.populate_datasets([dataset_id])[0] return resp.dataset
[docs] def delete_dataset(self, dataset_id): """ Delete dataset with the specified ID. Args: dataset_id (str): The ID of the dataset. Returns: bool: True if dataset was successfully deleted. """ return self.stub.delete_dataset(dataset_id) is not None
[docs] def get_report(self, dataset_id): """ Get the report of the specified dataset. Args: dataset_id (str): The ID of the dataset. Returns: Report (str): The report of the specified dataset. """ return self.stub.get_report(dataset_id)
[docs] def upload_graph(self, serialized_graph): """ Upload a serialized graph. Args: serialized_graph (str): The serialized Ciphercore graph to upload. Returns: DataFrame: A pandas DataFrame containing the uploaded graph information. """ res = self.stub.upload_graph(serialized_graph) return self.converter.list_graphs(res)
[docs] def download_graph(self, id): """ Download a graph with the specified ID. Args: id (str): The ID of the graph. Returns: str: The serialized Ciphercore graph. """ return self.stub.download_graph(id)
[docs] def list_graphs(self): """ List all Ciphercore graphs currently uploaded. Returns: DataFrame: A pandas DataFrame containing the list of graphs. """ ids = self.list_graphs_ids() res = self.stub.populate_graphs(ids) return self.converter.list_graphs(res)
[docs] def list_graphs_ids(self): """ List the IDs of all graphs. Returns: list[str]: A list of graph IDs. """ return self.stub.list_graphs()
[docs] def create_computation(self, orchestrator, graphs_config, name, description, config=None): """ Create a computation. Computation object specifies what computation to execute, regardless of the data. The same computation can be used multiple times with different datasets. Note that there are easier-to-use functions for specific computations (PSI, SQL, NN training, etc.). We strongly recommend using those functions when possible. Args: orchestrator (str): The orchestrator type for the computation. graphs_config (dict): The "graph name -> graph ID" mapping. name (str): The name of the computation. description (str): The description of the computation. config (dict, optional): Additional orchestrator-specific configuration for the computation. Returns: DataFrame: A pandas DataFrame containing the created computation information. The 'id' field of this DataFrame is the ID of the created computation, which is used to refer to this computation in other operations. """ res = self.stub.create_computation( orchestrator, graphs_config, name, description, config=config) return self.converter.list_computations(res)
[docs] def create_single_graph_computation(self, serialized_graph, name='', description=''): """ Create a single graph computation. These computations are usually used for testing or basic examples, e.g. computing the sum of two numbers with SMPC. Args: serialized_graph (str): The serialized Ciphercore graph to create a computation for. name (str, optional): The name of the computation. description (str, optional): The description of the computation. Returns: DataFrame: A pandas DataFrame containing the created computation information. """ res = self.stub.upload_graph(serialized_graph) if len(res) != 1: return graph_id = res[0][0] res = self.stub.create_computation( 'single_graph', {"graph": graph_id}, name, description) return self.converter.list_computations(res)
[docs] def list_computations(self): """ List all computations. Returns: DataFrame: A pandas DataFrame containing the list of computations. """ ids = self.list_computations_ids() res = self.stub.populate_computations(ids) return self.converter.list_computations(res)
[docs] def list_computations_ids(self): """ List the IDs of all computations. Returns: list[str]: A list of computation IDs. """ return self.stub.list_computations()
[docs] def list_cloud_uploads(self): """ List all cloud uploads. Returns: DataFrame: A pandas DataFrame containing the list of cloud uploads. """ ids = self.stub.list_cloud_uploads() res = self.stub.populate_cloud_uploads(ids) return self.converter.list_cloud_uploads(res)
[docs] def get_cloud_upload(self, id): """ Get cloud upload with the specified ID. Used to track the progress of uploading a particular computation session result to cloud storage. Returns: DataFrame: A pandas DataFrame containing the cloud upload information. """ res = self.stub.populate_cloud_uploads([id]) return self.converter.list_cloud_uploads(res)
[docs] def get_psi_computation(self, first_dataset_columns, second_dataset_columns, sharded=True): """ Create a PSI (Private Set Intersection) computation. Args: first_dataset_columns (list[str]): The list of columns from the first dataset to join. second_dataset_columns (list[str]): The list of columns from the second dataset to join. sharded (bool, optional): Whether to shard the computation. This is useful for reducing the memory usage of a computation on large datasets. Default is True. Returns: str: The ID of the created computation. """ if not isinstance(first_dataset_columns, list) or not isinstance(second_dataset_columns, list): raise ValueError( 'first_dataset_columns and second_dataset_columns should be lists of strings.') if len(first_dataset_columns) != len(second_dataset_columns): raise ValueError( 'first_dataset_columns and second_dataset_columns should have the same length.') # TODO: for efficiency reasons, maybe reuse existing computation, if columns match? join_column_pairs = [] for first_col, second_col in zip(first_dataset_columns, second_dataset_columns): join_column_pairs.append( common_pb2.JoinColumnPair( first_dataset_column=first_col, second_dataset_column=second_col ) ) config = common_pb2.OrchestratorConfig( psi_config=common_pb2.PsiConfig( join_columns=join_column_pairs, sharded=sharded, ) ) return self.stub.create_computation( 'psi', {}, 'PSI of {} and {}'.format(first_dataset_columns, second_dataset_columns), "", config=config)[0][0]
[docs] def get_mlp_computation(self, layers, batch_size, optimizer, learning_rate, loss, epochs, precision): """ Create an MLP (Multi-Layer Perceptron) computation. Args: layers (list): The list with the sizes of hidden layers in the MLP (note that the last one should be 1 in most cases). batch_size (int): The batch size for training. optimizer (str): The optimizer to use for training (we currently support 'adam', 'adagrad' and 'sgd'). learning_rate (float): The learning rate for training. loss (str): The loss function to use for training (we currently support 'log_loss' and 'mse'). epochs (int): The number of epochs for training. precision (int): The precision for training (it is conducted with fixed precision numbers, with `2**precision` as denominator). Returns: str: The ID of the created computation. """ config = common_pb2.OrchestratorConfig( ml_config=common_pb2.MlpConfig(hidden_layers=layers, batch_size=batch_size, optimizer=optimizer, learning_rate=learning_rate, loss=loss, epochs=epochs, precision=precision)) return self.stub.create_computation( 'neural_network_training', {}, f'MLP with {len(layers)} layers', str(config), config=config)[0][0]
[docs] def get_nn_inference_computation(self, batch_size, precision): """ Create a neural network inference computation. Args: batch_size (int): The batch size for inference, should be the same as for training. precision (int): The precision for inference, should be the same as for training. Returns: str: The ID of the created computation. """ config = common_pb2.OrchestratorConfig(nn_inference_config=common_pb2.NnInferenceConfig( batch_size=batch_size, precision=precision)) return self.stub.create_computation( 'neural_network_inference', {}, 'NN Inference', str(config), config=config)[0][0]
[docs] def get_llm_inference_computation(self, max_len, num_layers, embedding_dim, num_heads, temperature, top_p): """ Create a LLM inference computation. Args: max_len (int): The maximum length of the generated text. num_layers (int): The number of layers in the transformer. embedding_dim (int): The embedding dimension of the transformer. num_heads (int): The number of heads in the transformer. temperature (float): The temperature for the sampling. top_p (float): The top p for the sampling. Returns: str: The ID of the created computation. """ config = common_pb2.OrchestratorConfig(llm_inference_config=common_pb2.LlmInferenceConfig( max_len=max_len, num_layers=num_layers, embedding_dim=embedding_dim, num_heads=num_heads, temperature=temperature, top_p=top_p)) return self.stub.create_computation( 'llm_inference', {}, 'LLM Inference', str(config), config=config)[0][0]
[docs] def get_sql_computation(self, query): """ Create an SQL computation. Args: query (str): The SQL query to execute. It can refer to tables by names, these names need to be specified in the corresponding computation session. Returns: str: The ID of the created computation. """ # TODO: for efficiency reasons, maybe reuse existing computation, if query is the same? config = common_pb2.OrchestratorConfig( sql_config=common_pb2.SqlConfig( query=query ) ) return self.stub.create_computation( 'sql', {}, 'SQL query: {}'.format(query), "", config=config)[0][0]
[docs] def get_knn_computation(self, num_neighbors, has_labels=False): """ Create a KNN (k-nearest-neighbors) computation. Args: num_neighbors (int): The number of neighbors to consider in the KNN computation. has_labels (bool, optional): Whether the input data has labels. Default is False. Returns: str: The ID of the created computation. """ # TODO: for efficiency reasons, maybe reuse existing computation, if num_neighbors is the same? config = common_pb2.OrchestratorConfig( knn_config=common_pb2.KnnConfig( num_neighbors=num_neighbors, label_aggregation=common_pb2.KnnConfig.LabelAggregation.MEAN if has_labels else common_pb2.KnnConfig.LabelAggregation.NONE ) ) return self.stub.create_computation( 'nearest_neighbors', {}, 'KNN[neighbors={}]'.format(num_neighbors), "", config=config)[0][0]
[docs] def create_computation_session(self, computation_id, data_config, name='', description=''): """ Create a computation session. A computation session is an instantiation of a computation on specific datasets, specified by the data_config argument. Args: computation_id (str): The ID of the computation. data_config (dict): The mapping (name -> dataset ID). Names are orchestrator-specific (see orchestrator-specific functions for details, e.g. `create_psi`). name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: A pandas Series containing the created computation session information. The 'id' in this Series is the ID of the created computation session, which is used to refer to this session in other operations. """ self.stub.expose_computation(computation_id) res = self.stub.create_computation_session( computation_id, data_config, name, description) return self.converter.list_computation_sessions(res).iloc[0]
[docs] def create_psi(self, first_dataset_id, second_dataset_id, first_dataset_columns, second_dataset_columns, name='', description='', sharded=True): """ Create a PSI (Private Set Intersection) computation session. Args: first_dataset_id (str): The ID of the first dataset. second_dataset_id (str): The ID of the second dataset. first_dataset_columns (list[str]): The column from the first dataset to join. second_dataset_columns (list[str]): The column from the second dataset to join. name (str, optional): The name of the session. description (str, optional): The description of the session. sharded (bool, optional): Whether to shard the computation. Default is True. Returns: A pandas Series containing the created computation session information. """ computation_id = self.get_psi_computation( first_dataset_columns, second_dataset_columns, sharded) data_config = {'input_0': first_dataset_id, 'input_1': second_dataset_id} return self.create_computation_session(computation_id, data_config, name, description)
[docs] def create_sql(self, query, data_config, name='', description=''): """ Create an SQL computation session. Args: query (str): The SQL query to execute. The query should refer to columns with lowercase names. data_config (dict): The mapping (table name -> dataset ID). SQL queries refer to datasets by the table names in this mapping. name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: DataFrame: A pandas DataFrame containing the created computation session information. """ computation_id = self.get_sql_computation(query) return self.create_computation_session(computation_id, data_config, name, description)
[docs] def create_mlp(self, train_datasets, validation_datasets, test_datasets, model_dataset, layers=[100, 1], batch_size=64, optimizer='adam', learning_rate=3e-4, loss='log_loss', epochs=3, precision=15, name='', description=''): """ Create an MLP (Multi-Layer Perceptron) training computation session. Args: train_datasets (list): The list of training dataset IDs. validation_datasets (list): The list of validation dataset IDs. test_datasets (list): The list of testing dataset IDs. layers (list, optional): List of hidden layer sizes in the MLP (in most cases, the last one should be 1). Default is [100, 1]. batch_size (int, optional): Batch size for training. Default is 64. optimizer (str, optional): Optimizer to use for training. Default is 'adam', supported optimizers are 'adam', 'adagrad', 'sgd'. learning_rate (float, optional): Learning rate for training. Default is 3e-4. loss (str, optional): Loss function to use for training. Default is 'log_loss', supported losses are 'log_loss' and 'mse'. epochs (int, optional): Number of epochs for training. Default is 3. precision (int, optional): Precision for training. Default is 15. Training is performed in fixed-point arithmetic with denominator `2**precision`. name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: DataFrame: A pandas DataFrame containing the created computation session information. """ def config_for_datasets(datasets, name): return {f'{name}{i}': id for i, id in enumerate(datasets)} if model_dataset: data_config = {**config_for_datasets(train_datasets, 'training'), **config_for_datasets( validation_datasets, 'validation'), **config_for_datasets(test_datasets, 'testing'), **config_for_datasets([model_dataset], 'model')} else: data_config = {**config_for_datasets(train_datasets, 'training'), **config_for_datasets( validation_datasets, 'validation'), **config_for_datasets(test_datasets, 'testing')} loss = {'mse': common_pb2.MlpConfig.MSE, 'log_loss': common_pb2.MlpConfig.LOG_LOSS}[loss] computation_id = self.get_mlp_computation( layers, batch_size, optimizer, learning_rate, loss, epochs, precision) return self.create_computation_session(computation_id, data_config, name, description)
[docs] def create_nn_inference(self, inference_dataset_id, model_dataset_id, batch_size=64, precision=15, name='', description=''): """ Create a neural network inference computation session. Args: inference_dataset_id (str): The ID of the inference dataset. model_dataset_id (str): The ID of the model dataset. batch_size (int, optional): The batch size for inference. Default is 64, should be the same as for training. precision (int, optional): The precision for inference. Default is 15, should be the same as for training. name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: DataFrame: A pandas DataFrame containing the created computation session information. """ data_config = {'inference0': inference_dataset_id, 'model': model_dataset_id} computation_id = self.get_nn_inference_computation( batch_size, precision) return self.create_computation_session(computation_id, data_config, name, description)
[docs] def create_llm_inference(self, inference_dataset_id, model_dataset_id, max_len=128, num_layers=8, embedding_dim=512, num_heads=16, temperature=0.85, top_p=0.85, name='', description=''): """ Create a LLM inference computation session. Args: inference_dataset_id (str): The ID of the inference dataset. model_dataset_id (str): The ID of the model dataset. max_len (int, optional): The maximum length of the generated sequence. num_layers (int, optional): The number of layers in the model. embedding_dim (int, optional): The embedding dimension of the model. num_heads (int, optional): The number of attention heads in the model. temperature (float, optional): The temperature for sampling. top_p (float, optional): The top-p heuristic value for sampling. name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: DataFrame: A pandas DataFrame containing the created computation session information. """ data_config = {'prompt': inference_dataset_id, 'model': model_dataset_id} computation_id = self.get_llm_inference_computation( max_len, num_layers, embedding_dim, num_heads, temperature, top_p) return self.create_computation_session(computation_id, data_config, name, description)
[docs] def create_knn(self, key_dataset_id, query_dataset_id, num_neighbors, value_dataset_id=None, name='', description=''): """ Create a KNN (k-Nearest-Neighbors) computation session. Args: key_dataset_id (str): The ID of the rowwise dataset with lookup keys (vectors). query_dataset_id (str): The ID of the rowwise dataset with lookup queries (vectors). num_neighbors (int): The number of neighbors to consider in the KNN computation. value_dataset_id (str, optional): The ID of the dataset with labels. Default is None. name (str, optional): The name of the session. description (str, optional): The description of the session. Returns: DataFrame: A pandas DataFrame containing the created computation session information. """ data_config = {'keys': key_dataset_id, 'queries': query_dataset_id} if value_dataset_id: data_config['values'] = value_dataset_id computation_id = self.get_knn_computation( num_neighbors, has_labels=(value_dataset_id is not None)) return self.create_computation_session(computation_id, data_config, name, description)
[docs] def list_computation_sessions(self, filter_computation_session_ids=None, show_tags=False): """ List computation sessions. Args: filter_computation_session_ids (list[str], optional): List of specific computation session IDs to return. If None, all computation sessions are returned. Default is None. show_tags (bool, optional): Whether to include the tags column. Returns: DataFrame: A pandas DataFrame containing the list of computation sessions. """ ids = filter_computation_session_ids if not filter_computation_session_ids: ids = self.list_computation_sessions_ids() res = self.stub.populate_computation_sessions(ids) return self.converter.list_computation_sessions(res, show_tags)
[docs] def list_computation_sessions_ids(self): """ List computation session IDs. Returns: list[str]: A list of computation session IDs. """ return self.stub.list_computation_sessions()
[docs] def tag_computation_session(self, id, key, value=None): """ Tag computation session. Args: id (str): The ID of the computation session to start. key (str): Tag key. value (str, optional): Tag value. If None, the tag with a given key is removed instead. """ res = self.stub.tag_computation_session(id, key, value)
[docs] def start_computation_session(self, id): """ Start a specific computation session. Args: id (str): The ID of the computation session to start. Returns: DataFrame: A pandas DataFrame containing the started computation session information. """ res = self.stub.start_computation_session(id) return self.converter.list_computation_sessions(res)
[docs] def cancel_computation_session(self, id): """ Cancel a specific computation session. Args: id (str): The ID of the computation session to cancel. Returns: DataFrame: A pandas DataFrame containing the cancelled computation session information. """ res = self.stub.cancel_computation_session(id) return self.converter.list_computation_sessions(res)
[docs] def download_computation_session_result(self, id, onnx=False): """ Download the result of a specific computation session. Args: id (str): The ID of the computation session to download. onnx (bool, optional): Whether to convert the result to ONNX protobuf. Default is False. Returns: DataFrame: A pandas DataFrame containing the downloaded computation session result. Raises: CiphermodeException: if more than one of csv, onnx and float_array is set. """ session = self.stub.populate_computation_sessions([id])[0][1].data computation = self.stub.populate_computations( [session.computation_id])[0][1].computation orchestrator = computation.orchestrator_name if onnx: if orchestrator not in ['neural_network_training']: raise CiphermodeException( 'Cannot convert to onnx for orchestrator {}'.format(orchestrator)) results = session.metadata.results if len(results) == 0: raise CiphermodeException('Session contains no results') if len(results) > 1: raise CiphermodeException( 'Not implemented: session contains multiple results') result = results[0] output_keys = list(result.outputs.keys()) if len(output_keys) > 1: raise CiphermodeException( 'Not implemented: session result contains multiple outputs') output_key = output_keys[0] payload = self.stub.download_computation_session_output( id, 0, output_key) output_format = result.outputs[output_key].output_format return self.converter.view_typed_value(payload, output_format, onnx)
[docs] def upload_computation_session_result(self, id, endpoint): """ Uploads the result of a computation session to a specified endpoint. The endpoint should be a valid filename within a cloud storage bucket for a supported cloud provider (AWS, GCP, or Azure). The deployment needs to have necessary credentials to perform the upload. Args: id (str): The ID of the computation session. endpoint (str): The endpoint to which the computation session result will be uploaded. Returns: DataFrame: A pandas DataFrame containing the new dataset. """ session = self.stub.populate_computation_sessions([id])[0][1].data results = session.metadata.results if len(results) > 1: raise CiphermodeException( 'Not implemented: session contains multiple results') result = results[0] output_keys = list(result.outputs.keys()) if len(output_keys) > 1: raise CiphermodeException( 'Not implemented: session result contains multiple outputs') output_key = output_keys[0] return self.stub.upload_computation_session_output(id, 0, output_key, endpoint)
[docs] def save_computation_session_result(self, id, name='', description='', as_csv=False, include_summary=False, sql_permissions=None, publish=False): """ Saves the result of a computation session to a new dataset. Args: id (str): The ID of the computation session. name (str, optional): The name to assign to the dataset. description (str, optional): The description to assign to the dataset. as_csv (bool, optional): Whether to treat the computation result as a CSV-like table (results in a columnwise dataset). include_summary (bool, optional): Whether to include a dataset summary for the newly created dataset. sql_permissions (str, optional): The SQL permissions to assign to the dataset. publish (bool, optional): Whether to make dataset visible for all organizations. Returns: DataFrame: A pandas DataFrame containing the new dataset. """ permissions = common_pb2.PermissionConfig() if sql_permissions: permissions.sql_column_permissions.MergeFrom( parse_sql_permissions(sql_permissions)) res = self.stub.save_computation_session_result( id, name, description, permissions, as_csv=as_csv, include_report=include_summary, publish=publish) return self.converter.list_datasets(res)
[docs] def list_data_requests(self, filter_computation_session_id=None, filter_can_approve=False): """ Lists data requests. Args: filter_computation_session_id (str, optional): If provided, only data requests for this computation session ID will be returned. filter_can_approve (bool): If true, only data requests that the user can approve will be returned. Returns: DataFrame: A pandas DataFrame containing the list of data requests. """ ids = self.list_data_requests_ids(filter_computation_session_id, filter_can_approve) res = self.stub.populate_data_approvals(ids) return self.converter.list_data_approvals(res, filter_computation_session_id)
[docs] def list_data_requests_ids(self, filter_computation_session_id=None, filter_can_approve=False): """ Lists the IDs of data requests. Args: filter_computation_session_id (str, optional): If provided, only data requests for this computation session ID will be returned. filter_can_approve (bool): If true, only data requests that the user can approve will be returned. Returns: list[str]: A list of data request IDs. """ return self.stub.list_data_approvals(filter_computation_session_id, filter_can_approve)
def _update_data_approval(self, id, status=None, comment=''): res = self.stub.update_data_approval(id, status, comment) return self.converter.list_data_approvals(res)
[docs] def approve_data_request(self, id, comment=''): """ Approves a data request. Args: id (str): The ID of the data request to approve. comment (str, optional): A comment to attach to the data request. Returns: DataFrame: A pandas DataFrame containing the approved data request. """ return self._update_data_approval(id, common_pb2.DataApproval.APPROVED, comment)
[docs] def reject_data_request(self, id, comment=''): """ Rejects a data request. Args: id (str): The ID of the data request to reject. comment (str, optional): A comment to attach to the data request. Returns: DataFrame: A pandas DataFrame containing the rejected data request. """ return self._update_data_approval(id, common_pb2.DataApproval.REJECTED, comment)
[docs] def comment_data_request(self, id, comment=''): """ Comments on a data request. Args: id (str): The ID of the data request to comment on. comment (str): The comment to attach to the data request. Returns: DataFrame: A pandas DataFrame containing the commented data request. """ return self._update_data_approval(id, None, comment)
[docs] def create_explore_dataset_intersection(self, dataset_id1, dataset_id2, column_names1, column_names2, use_approx_match_rate=False): """ Creates an exploration of the intersection between two datasets. Args: dataset_id1 (str): The ID of the first dataset. dataset_id2 (str): The ID of the second dataset. column_names1 (list(str)): Names of the columns in the first dataset to compare. column_names2 (list(str)): Names of the columns in the second dataset to compare. use_approx_match_rate (bool, optional): Whether to use approximate match rate. Default is False. Returns: String: computation_session_id """ return self.stub.create_explore_dataset_intersection( dataset_id1, dataset_id2, column_names1, column_names2, use_approx_match_rate=use_approx_match_rate)
[docs] def poll_explore_dataset_intersection(self, session_id): """ Polls the exploration of a dataset intersection. Args: session_id (str): The session id associated with the dataset intersection exploration. Returns: ExploreDatasetIntersectionResponse: Object containing explore computation details. """ return self.stub.poll_explore_dataset_intersection(session_id)
[docs] def list_user_events(self, timestamp_ms, num_events, user=''): """ Lists user audit events up to a given timestamp. Admin only. Args: timestamp_ms (int): Timestamp, in milliseconds. num_events (int): Number of events to fetch. user (str, optional): Email address to filter events on. Returns: DataFrame: A pandas DataFrame containing user audit events. """ res = self.stub.list_user_events(timestamp_ms, num_events, user) return self.converter.list_user_events(res)
[docs] def list_node_events(self, timestamp_ms, num_events): """ Lists node audit events up to a given timestamp. Admin only. Args: timestamp_ms (int): Timestamp, in milliseconds. num_events (int): Number of events to fetch. Returns: DataFrame: A pandas DataFrame containing node audit events. """ res = self.stub.list_node_events(timestamp_ms, num_events) return self.converter.list_node_events(res)
[docs] def hash_dataset_columns(self, dataset_id, hash_column_names, new_dataset_name, async_init=False): """ Hashes entries of dataset with given column names to create a succinct representation of the input dataset. Succinct representations output by this method can be matched with `create_psi` to get hash values they have in common. Args: dataset_id (str): The dataset ID. hash_column_names (list[str]): Columns from the dataset to hash. new_dataset_name (str): New dataset name. async_init (bool, optional): If true, function returns immediately after creating new dataset object and populates it with hashes asynchronously. Returns: A pandas Series containing the dataset ID for the succinct representation. This dataset contains a single column of (de-duplicated) hash values, each value corresponding to some set of rows in the input dataset where entries indexed by columns in `hash_column_names` had the same hash. If async_init was True, user should wait for the dataset to be finalized before using it. This status of the dataset can be checked by calling `self.get_dataset()` with the returned dataset ID. """ res = self.stub.hash_dataset_columns(dataset_id, hash_column_names, new_dataset_name, async_init) return self.converter.list_datasets(res).iloc[0]
[docs] def waterfall_gather(self, original_dataset_id, stage_session_ids, endpoint): """ Post-processes the results of multiple PSI computations on hashed datasets output by `hash_dataset_columns` to obtain the indices of rows in the original dataset that matched, along with the index of the first computation they matched in. Used to implement a multi-stage "waterfall" join by providing ordered session IDs for each stage. Can also be called with a single stage to obtain the row indices that matched for a single PSI computation. Args: original_dataset_id (str): The original dataset ID. stage_session_ids (list[str]): Waterfall session IDs. Each should correspond to a PSI computation (made by `create_psi`) on hashed datasets (made with `hash_dataset_columns`). Should be non-empty. endpoint (str): The endpoint to which the computation session result will be uploaded. Returns: If `endpoint` is empty, returns the result directly, encoded as bytes. Otherwise, returns a string that can be input to self.get_cloud_upload() to check the progress of uploading the result to the cloud. """ resp = self.stub.waterfall_gather(original_dataset_id, stage_session_ids, endpoint) if endpoint == "": return resp.payload else: return resp.cloud_upload_id
[docs] def create_client(frontend_address, auth_config='~/.ciphercore/auth_config', token_path='~/.ciphercore/token', custom_root_ca=None, tls_domain='localhost', private_key=None, certificate_chain=None, *args, **kwargs): """ Create a CiphermodeApi instance and intialize it. Args: frontend_address (str): The address of the server. auth_config (str, optional): Path to auth config. token_path (str, optional): Path to file containing OpenIDConnect token. custom_root_ca (str, optional): Path to a TLS certificate file. tls_domain (str, optional): The domain protected by the TLS certificate. private_key (str, optional): Path to the client's private key. certificate_chain (str, optional): Path to the client's certificate chain. *args: Arguments for the PandasConverter. **kwargs: Kwargs for the PandasConverter. Returns: CiphermodeApi: An instance of the CiphermodeApi. """ frontend_address = normalize_address(frontend_address) cert = None if custom_root_ca is None else open( custom_root_ca, 'rb').read() private_key = None if private_key is None else open( private_key, 'rb').read() certificate_chain = None if certificate_chain is None else open( certificate_chain, 'rb').read() auth_handler = AuthHandler( frontend_address, auth_config, token_path, cert, tls_domain) return CiphermodeApi(frontend_address, auth_handler, cert=cert, tls_domain=tls_domain, private_key=private_key, certificate_chain=certificate_chain, *args, **kwargs)