Client

terminusdb_client.client

terminusdb_client.client.Client

Client.py Client is the Python public API for TerminusDB

JWTAuth Objects

class JWTAuth(requests.auth.AuthBase)
Class for JWT Authentication in requests

APITokenAuth Objects

class APITokenAuth(requests.auth.AuthBase)
Class for API Token Authentication in requests

ResourceType Objects

class ResourceType(Enum)
Enum for the different TerminusDB resources

Client Objects

class Client()
Client for TerminusDB server.
Arguments:
  • server_url (str): URL of the server that this client connected.
  • api (str): API endpoint for this client.
  • team (str): Team that this client is using. "admin" for local dbs.
  • db (str): Database that this client is connected to.
  • user (str): TerminiusDB user that this client is using. "admin" for local dbs.
  • branch (str): Branch of the database that this client is connected to. Default to "main".
  • ref (str, None): Ref setting for the client. Default to None.
  • repo (str): Repo identifier of the database that this client is connected to. Default to "local".

close

def close(self) -> None
Undo connect and close the connection.
The connection will be unusable from this point forward; an Error (or subclass) exception will be raised if any operation is attempted with the connection, unless connect is call again.

log

def log(self,
team: Optional[str] = None,
db: Optional[str] = None,
start: int = 0,
count: int = -1)
Get commit history of a database
Arguments:
  • team (str): The team from which the database is. Defaults to the class property.
  • db (str): The database. Defaults to the class property.
  • start (int): Commit index to start from. Defaults to 0.
  • count (int): Amount of commits to get. Defaults to -1 which gets all.
Returns:
list: List of the following commit objects:
{
"@id":"InitialCommit/hpl18q42dbnab4vzq8me4bg1xn8p2a0",
"@type":"InitialCommit",
"author":"system",
"identifier":"hpl18q42dbnab4vzq8me4bg1xn8p2a0",
"message":"create initial schema",
"schema":"layer_data:Layer_4234adfe377fa9563a17ad764ac37f5dcb14de13668ea725ef0748248229a91b",
"timestamp":1660919664.9129035
}

get_commit_history

def get_commit_history(self, max_history: int = 500) -> list
Get the whole commit history.
Commit history - Commit id, author of the commit, commit message and the commit time, in the current branch from the current commit, ordered backwards in time, will be returned in a dictionary in the follow format:
{ "commit_id":
{ "author": "commit_author",
"message": "commit_message",
"timestamp: <datetime object of the timestamp>"
}
}
Arguments:
  • max_history (int): maximum number of commit that would return, counting backwards from your current commit. Default is set to 500. It needs to be nop-negative, if input is 0 it will still give the last commit.
Examples:
>>> from terminusdb_client import Client
>>> client = Client("http://127.0.0.1:6363"
>>> client.connect(db="bank_balance_example")
>>> client.get_commit_history()
Returns:
list:

get_all_branches

def get_all_branches(self, get_data_version=False)
Get all the branches available in the database.

rollback

def rollback(self, steps=1) -> None
Curently not implementated. Please check back later.

Raises

NotImplementedError Since TerminusDB currently does not support open transactions. This method is not applicable to it's usage. To reset commit head, use Client.reset

copy

def copy(self) -> "Client"
Create a deep copy of this client.
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> clone = client.copy()
>>> assert client is not clone
Returns:
Client: The copied client instance.

set_db

def set_db(self, dbid: str, team: Optional[str] = None) -> str
Set the connection to another database. This will reset the connection.
Arguments:
  • dbid (str): Database identifer to set in the config.
  • team (str): Team identifer to set in the config. If not passed in, it will use the current one.
Examples:
>>> client = Client("http://127.0.0.1:6363")
>>> client.set_db("database1")
Returns:
str: The current database identifier.

resource

def resource(self, ttype: ResourceType, val: Optional[str] = None) -> str
Create a resource identifier string based on the current config.
Arguments:
  • ttype (ResourceType): Type of resource.
  • val (str): Branch or commit identifier.
Examples:
>>> client = Client("http://127.0.0.1:6363")
>>> client.resource(ResourceType.DB)
>>> client.resource(ResourceType.META)
>>> client.resource(ResourceType.COMMITS)
>>> client.resource(ResourceType.REF, "<reference>")
>>> client.resource(ResourceType.BRANCH, "<branch>")
Returns:
str: The constructed resource string.

create_database

def create_database(self,
dbid: str,
team: Optional[str] = None,
label: Optional[str] = None,
description: Optional[str] = None,
prefixes: Optional[dict] = None,
include_schema: bool = True) -> None
Create a TerminusDB database by posting
a terminus:Database document to the Terminus Server.
Arguments:
  • dbid (str): Unique identifier of the database.
  • team (str): ID of the Team in which to create the DB (defaults to 'admin')
  • label (str): Database name.
  • description (str): Database description.
  • prefixes (dict): Optional dict containing "@base" and "@schema" keys.
@base (str) IRI to use when doc: prefixes are expanded. Defaults to terminusdb:///data. @schema (str) IRI to use when scm: prefixes are expanded. Defaults to terminusdb:///schema.
  • include_schema (bool): If True, a main schema graph will be created, otherwise only a main instance graph will be created.
Raises:
  • InterfaceError: if the client does not connect to a server
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.create_database("someDB", "admin", "Database Label", "My Description")

delete_database

def delete_database(self,
dbid: Optional[str] = None,
team: Optional[str] = None,
force: bool = False) -> None
Delete a TerminusDB database.
If team is provided, then the team in the config will be updated and the new value will be used in future requests to the server.
Arguments:
  • dbid (str): ID of the database to delete
  • team (str): the team in which the database resides (defaults to "admin")
  • force (bool): None
Raises:
  • UserWarning: If the value of dbid is None.
  • InterfaceError: if the client does not connect to a server.
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.delete_database("<database>", "<team>")

get_triples

def get_triples(self, graph_type: str) -> str
Retrieves the contents of the specified graph as triples encoded in turtle format
Arguments:
  • graph_type (str): Graph type, either "instance" or "schema".
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
str:

update_triples

def update_triples(self, graph_type: str, turtle, commit_msg: str) -> None
Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
Arguments:
  • graph_type (str): Graph type, either "instance" or "schema".
  • turtle: Valid set of triples in Turtle format.
  • commit_msg (str): Commit message.
Raises:
  • InterfaceError: if the client does not connect to a database

insert_triples

def insert_triples(self,
graph_type: str,
turtle,
commit_msg: Optional[str] = None) -> None
Inserts into the specified graph with the triples encoded in turtle format.
Arguments:
  • graph_type (str): Graph type, either "instance" or "schema".
  • turtle: Valid set of triples in Turtle format.
  • commit_msg (str): Commit message.
Raises:
  • InterfaceError: if the client does not connect to a database

query_document

def query_document(self,
document_template: dict,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version: bool = False,
**kwargs) -> Union[Iterable, list]
Retrieves all documents that match a given document template
Arguments:
  • document_template (dict): Template for the document that is being retrived
  • graph_type (str): Graph type, either "instance" or "schema".
  • as_list (bool): If the result returned as list rather than an iterator.
  • get_data_version (bool): If the data version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
Iterable:

get_document

def get_document(self,
iri_id: str,
graph_type: str = "instance",
get_data_version: bool = False,
**kwargs) -> dict
Retrieves the document of the iri_id
Arguments:
  • iri_id (str): Iri id for the docuemnt that is retriving
  • graph_type (str): Graph type, either "instance" or "schema".
  • get_data_version (bool): If the data version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.
  • kwargs (``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "minimized", "unfold"
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
dict:

get_documents_by_type

def get_documents_by_type(self,
doc_type: str,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version=False,
**kwargs) -> Union[Iterable, list]
Retrieves the documents by type
Arguments:
  • doc_type (str): Specific type for the docuemnts that is retriving
  • graph_type (str): Graph type, either "instance" or "schema".
  • skip (int): The starting posiion of the returning results, default to be 0
  • count (int or None): The maximum number of returned result, if None (default) it will return all of the avalible result.
  • as_list (bool): If the result returned as list rather than an iterator.
  • get_data_version (bool): If the version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.
  • kwargs (``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "unfold"
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
iterable: Stream of dictionaries

get_all_documents

def get_all_documents(self,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version: bool = False,
**kwargs) -> Union[Iterable, list, tuple]
Retrieves all avalibale the documents
Arguments:
  • graph_type (str): Graph type, either "instance" or "schema".
  • skip (int): The starting posiion of the returning results, default to be 0
  • count (int or None): The maximum number of returned result, if None (default) it will return all of the avalible result.
  • as_list (bool): If the result returned as list rather than an iterator.
  • get_data_version (bool): If the version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.
  • kwargs (``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "unfold"
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
iterable: Stream of dictionaries

get_existing_classes

def get_existing_classes(self)
Get all the existing classes (only ids) in a database.

insert_document

def insert_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
full_replace: bool = False,
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024,
raw_json: bool = False) -> None
Inserts the specified document(s)
Arguments:
  • document (dict or list of dict): Document(s) to be inserted.
  • graph_type (str): Graph type, either "inference", "instance" or "schema".
  • full_replace (: bool): If True then the whole graph will be replaced. WARNING: you should also supply the context object as the first element in the list of documents if using this option.
  • commit_msg (str): Commit message.
  • last_data_version (str): Last version before the update, used to check if the document has been changed unknowingly
  • compress (str or int): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it is never it will never compress the data.
  • raw_json (bool): Update as raw json
Raises:
  • InterfaceError: if the client does not connect to a database
Returns:
list: list of ids of the inseted docuemnts

replace_document

def replace_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024,
create: bool = False,
raw_json: bool = False) -> None
Updates the specified document(s)
Arguments:
  • document (dict or list of dict): Document(s) to be updated.
  • graph_type (str): Graph type, either "instance" or "schema".
  • commit_msg (str): Commit message.
  • last_data_version (str): Last version before the update, used to check if the document has been changed unknowingly
  • compress (str or int): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it is never it will never compress the data.
  • create (bool): Create the document if it does not yet exist.
  • raw_json (bool): Update as raw json
Raises:
  • InterfaceError: if the client does not connect to a database

update_document

def update_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024) -> None
Updates the specified document(s). Add the document if not existed.
Arguments:
  • document (dict or list of dict): Document(s) to be updated.
  • graph_type (str): Graph type, either "instance" or "schema".
  • commit_msg (str): Commit message.
  • last_data_version (str): Last version before the update, used to check if the document has been changed unknowingly
  • compress (str or int): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it is never it will never compress the data.
Raises:
  • InterfaceError: if the client does not connect to a database

delete_document

def delete_document(self,
document: Union[str, list, dict, Iterable],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None) -> None
Delete the specified document(s)
Arguments:
  • document (str or list of str): Document(s) (as dictionary or DocumentTemplate objects) or id(s) of document(s) to be updated.
  • graph_type (str): Graph type, either "instance" or "schema".
  • commit_msg (str): Commit message.
  • last_data_version (str): Last version before the update, used to check if the document has been changed unknowingly
Raises:
  • InterfaceError: if the client does not connect to a database

has_doc

def has_doc(self, doc_id: str, graph_type: str = "instance") -> bool
Check if a certain document exist in a database
Arguments:
  • doc_id (str): Id of document to be checked.
  • graph_type (str): Graph type, either "instance" or "schema".
  • returns: None
  • -------: None
  • Bool: if the document exist

get_class_frame

def get_class_frame(self, class_name)
Get the frame of the class of class_name. Provide information about all the avaliable properties of that class.
Arguments:
  • class_name (str): Name of the class
  • returns: None
  • -------: None
  • dict: Dictionary containing information

commit

def commit(self)
Not implementated: open transactions currently not suportted. Please check back later.

query

def query(self,
woql_query: Union[dict, WOQLQuery],
commit_msg: Optional[str] = None,
get_data_version: bool = False,
last_data_version: Optional[str] = None) -> Union[dict, str]
Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
Arguments:
  • woql_query (dict or WOQLQuery object): A woql query as an object or dict
  • commit_mg (str): A message that will be written to the commit log to describe the change
  • get_data_version (bool): If the data version of the query result(s) should be obtained. If True, the method return the result and the version as a tuple.
  • last_data_version (str): Last version before the update, used to check if the document has been changed unknowingly
  • file_dict (**deprecated**): File dictionary to be associated with post name => filename, for multipart POST
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> Client(server="http://localhost:6363").query(woql, "updating graph")
Returns:
dict:

create_branch

def create_branch(self, new_branch_id: str, empty: bool = False) -> None
Create a branch starting from the current branch.
Arguments:
  • new_branch_id (str): New branch identifier.
  • empty (bool): Create an empty branch if true (no starting commit)
Raises:
  • InterfaceError: if the client does not connect to a database

delete_branch

def delete_branch(self, branch_id: str) -> None
Delete a branch
Arguments:
  • branch_id (str): Branch to delete
Raises:
  • InterfaceError: if the client does not connect to a database

pull

def pull(self,
remote: str = "origin",
remote_branch: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Pull updates from a remote repository to the current database.
Arguments:
  • remote (str): remote to pull from, default "origin"
  • remote_branch (str): remote branch to pull from, default to be your current barnch
  • message (str): optional commit message
  • author (str): option to overide the author of the operation
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.pull()
Returns:
dict:

fetch

def fetch(self, remote_id: str) -> dict
Fatch the brach from a remote
Arguments:
  • remote_id (str): id of the remote
Raises:
  • InterfaceError: if the client does not connect to a database

push

def push(self,
remote: str = "origin",
remote_branch: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Push changes from a branch to a remote repo
Arguments:
  • remote (str): remote to push to, default "origin"
  • remote_branch (str): remote branch to push to, default to be your current barnch
  • message (str): optional commit message
  • author (str): option to overide the author of the operation
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> Client(server="http://localhost:6363").push(remote="origin", remote_branch = "main", author = "admin", message = "commit message"})
Returns:
dict:

rebase

def rebase(self,
branch: Optional[str] = None,
commit: Optional[str] = None,
rebase_source: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Rebase the current branch onto the specified remote branch. Need to specify one of 'branch','commit' or the 'rebase_source'.
Arguments:
  • branch (str): the branch for the rebase
  • rebase_source (str): the source branch for the rebase
  • message (str): the commit message
  • author (str): the commit author
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.rebase("the_branch")
Returns:
dict:

reset

def reset(self,
commit: Optional[str] = None,
soft: bool = False,
use_path: bool = False) -> None
Reset the current branch HEAD to the specified commit path. If soft is not True, it will be a hard reset, meaning reset to that commit in the backend and newer commit will be wipped out. If soft is True, the client will only reference to that commit and can be reset to the newest commit when done.
Arguments:
  • commit (string): Commit id or path to the commit (if use_path is True), for instance '234980523ffaf93' or 'admin/database/local/commit/234980523ffaf93'. If not provided, it will reset to the newest commit (useful when need to go back after a soft reset).
  • soft (bool): Flag indicating if the reset if soft, that is referencing to a previous commit instead of resetting to a previous commit in the backend and wipping newer commits.
  • use_path (bool): Wheather or not the commit given is an id or path. Default using id and use_path is False.
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.reset('234980523ffaf93')
>>> client.reset('admin/database/local/commit/234980523ffaf93', use_path=True)

optimize

def optimize(self, path: str) -> None
Optimize the specified path.
Arguments:
  • path (string): Path to optimize, for instance admin/database/_meta for the repo graph.
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.optimize('admin/database') # optimise database branch (here main)
>>> client.optimize('admin/database/_meta') # optimise the repository graph (actually creates a squashed flat layer)
>>> client.optimize('admin/database/local/_commits') # commit graph is optimised

squash

def squash(self,
message: Optional[str] = None,
author: Optional[str] = None,
reset: bool = False) -> str
Squash the current branch HEAD into a commit
Arguments:
  • message (string): Message for the newly created squash commit
  • author (string): Author of the commit
  • reset (bool): Perform reset after squash
Raises:
  • InterfaceError: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.connect(user="admin", key="root", team="admin", db="some_db")
>>> client.squash('This is a squash commit message!')
Returns:
str: commit id to be reset

apply

def apply(self,
before_version,
after_version,
branch=None,
message=None,
author=None)
Diff two different commits and apply changes on branch
Arguments:
  • before_version (string): Before branch/commit to compare
  • after_object (string): After branch/commit to compare
  • branch (string): Branch to apply to. Optional.

diff_object

def