Client
Client.py Client is the Python public API for TerminusDB
class JWTAuth(requests.auth.AuthBase)
Class for JWT Authentication in requests
class APITokenAuth(requests.auth.AuthBase)
Class for API Token Authentication in requests
class ResourceType(Enum)
Enum for the different TerminusDB resources
class Client()
Client for TerminusDB server.
Arguments:
server_url
(str
): URL of the server that this client connected.api
(str
): API endpoint for this client.team
(str
): Team that this client is using. "admin" for local dbs.db
(str
): Database that this client is connected to.user
(str
): TerminiusDB user that this client is using. "admin" for local dbs.branch
(str
): Branch of the database that this client is connected to. Default to "main".ref
(str, None
): Ref setting for the client. Default to None.repo
(str
): Repo identifier of the database that this client is connected to. Default to "local".
def close(self) -> None
Undo connect and close the connection.
The connection will be unusable from this point forward; an Error (or subclass) exception will be raised if any operation is attempted with the connection, unless connect is call again.
def log(self,
team: Optional[str] = None,
db: Optional[str] = None,
start: int = 0,
count: int = -1)
Get commit history of a database
Arguments:
team
(str
): The team from which the database is. Defaults to the class property.db
(str
): The database. Defaults to the class property.start
(int
): Commit index to start from. Defaults to 0.count
(int
): Amount of commits to get. Defaults to -1 which gets all.
Returns:
list
: List of the following commit objects: {
"@id":"InitialCommit/hpl18q42dbnab4vzq8me4bg1xn8p2a0",
"@type":"InitialCommit",
"author":"system",
"identifier":"hpl18q42dbnab4vzq8me4bg1xn8p2a0",
"message":"create initial schema",
"schema":"layer_data:Layer_4234adfe377fa9563a17ad764ac37f5dcb14de13668ea725ef0748248229a91b",
"timestamp":1660919664.9129035
}
def get_commit_history(self, max_history: int = 500) -> list
Get the whole commit history.
Commit history - Commit id, author of the commit, commit message and the commit time, in the current branch from the current commit, ordered backwards in time, will be returned in a dictionary in the follow format:
{ "commit_id":
{ "author": "commit_author",
"message": "commit_message",
"timestamp: <datetime object of the timestamp>"
}
}
Arguments:
max_history
(int
): maximum number of commit that would return, counting backwards from your current commit. Default is set to 500. It needs to be nop-negative, if input is 0 it will still give the last commit.
Examples:
>>> from terminusdb_client import Client
>>> client = Client("http://127.0.0.1:6363"
>>> client.connect(db="bank_balance_example")
>>> client.get_commit_history()
Returns:
list
:def get_all_branches(self, get_data_version=False)
Get all the branches available in the database.
def rollback(self, steps=1) -> None
Curently not implementated. Please check back later.
NotImplementedError Since TerminusDB currently does not support open transactions. This method is not applicable to it's usage. To reset commit head, use Client.reset
def copy(self) -> "Client"
Create a deep copy of this client.
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> clone = client.copy()
>>> assert client is not clone
Returns:
Client
: The copied client instance.def set_db(self, dbid: str, team: Optional[str] = None) -> str
Set the connection to another database. This will reset the connection.
Arguments:
dbid
(str
): Database identifer to set in the config.team
(str
): Team identifer to set in the config. If not passed in, it will use the current one.
Examples:
>>> client = Client("http://127.0.0.1:6363")
>>> client.set_db("database1")
Returns:
str
: The current database identifier.def resource(self, ttype: ResourceType, val: Optional[str] = None) -> str
Create a resource identifier string based on the current config.
Arguments:
ttype
(ResourceType
): Type of resource.val
(str
): Branch or commit identifier.
Examples:
>>> client = Client("http://127.0.0.1:6363")
>>> client.resource(ResourceType.DB)
>>> client.resource(ResourceType.META)
>>> client.resource(ResourceType.COMMITS)
>>> client.resource(ResourceType.REF, "<reference>")
>>> client.resource(ResourceType.BRANCH, "<branch>")
Returns:
str
: The constructed resource string.def create_database(self,
dbid: str,
team: Optional[str] = None,
label: Optional[str] = None,
description: Optional[str] = None,
prefixes: Optional[dict] = None,
include_schema: bool = True) -> None
Create a TerminusDB database by posting
a terminus:Database document to the Terminus Server.
Arguments:
dbid
(str
): Unique identifier of the database.team
(str
): ID of the Team in which to create the DB (defaults to 'admin')label
(str
): Database name.description
(str
): Database description.prefixes
(dict
): Optional dict containing"@base"
and"@schema"
keys.
@base (str) IRI to use when
doc:
prefixes are expanded. Defaults to terminusdb:///data
. @schema (str) IRI to use when scm:
prefixes are expanded. Defaults to terminusdb:///schema
.include_schema
(bool
): IfTrue
, a main schema graph will be created, otherwise only a main instance graph will be created.
Raises:
InterfaceError
: if the client does not connect to a server
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.create_database("someDB", "admin", "Database Label", "My Description")
def delete_database(self,
dbid: Optional[str] = None,
team: Optional[str] = None,
force: bool = False) -> None
Delete a TerminusDB database.
If
team
is provided, then the team in the config will be updated and the new value will be used in future requests to the server.Arguments:
dbid
(str
): ID of the database to deleteteam
(str
): the team in which the database resides (defaults to "admin")force
(bool
): None
Raises:
UserWarning
: If the value of dbid is None.InterfaceError
: if the client does not connect to a server.
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.delete_database("<database>", "<team>")
def get_triples(self, graph_type: str) -> str
Retrieves the contents of the specified graph as triples encoded in turtle format
Arguments:
graph_type
(str
): Graph type, either "instance" or "schema".
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
str
:def update_triples(self, graph_type: str, turtle, commit_msg: str) -> None
Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
Arguments:
graph_type
(str
): Graph type, either "instance" or "schema".turtle
: Valid set of triples in Turtle format.commit_msg
(str
): Commit message.
Raises:
InterfaceError
: if the client does not connect to a database
def insert_triples(self,
graph_type: str,
turtle,
commit_msg: Optional[str] = None) -> None
Inserts into the specified graph with the triples encoded in turtle format.
Arguments:
graph_type
(str
): Graph type, either "instance" or "schema".turtle
: Valid set of triples in Turtle format.commit_msg
(str
): Commit message.
Raises:
InterfaceError
: if the client does not connect to a database
def query_document(self,
document_template: dict,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version: bool = False,
**kwargs) -> Union[Iterable, list]
Retrieves all documents that match a given document template
Arguments:
document_template
(dict
): Template for the document that is being retrivedgraph_type
(str
): Graph type, either "instance" or "schema".as_list
(bool
): If the result returned as list rather than an iterator.get_data_version
(bool
): If the data version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
Iterable
:def get_document(self,
iri_id: str,
graph_type: str = "instance",
get_data_version: bool = False,
**kwargs) -> dict
Retrieves the document of the iri_id
Arguments:
iri_id
(str
): Iri id for the docuemnt that is retrivinggraph_type
(str
): Graph type, either "instance" or "schema".get_data_version
(bool
): If the data version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.kwargs
(``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "minimized", "unfold"
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
dict
:def get_documents_by_type(self,
doc_type: str,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version=False,
**kwargs) -> Union[Iterable, list]
Retrieves the documents by type
Arguments:
doc_type
(str
): Specific type for the docuemnts that is retrivinggraph_type
(str
): Graph type, either "instance" or "schema".skip
(int
): The starting posiion of the returning results, default to be 0count
(int or None
): The maximum number of returned result, if None (default) it will return all of the avalible result.as_list
(bool
): If the result returned as list rather than an iterator.get_data_version
(bool
): If the version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.kwargs
(``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "unfold"
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
iterable
: Stream of dictionariesdef get_all_documents(self,
graph_type: str = "instance",
skip: int = 0,
count: Optional[int] = None,
as_list: bool = False,
get_data_version: bool = False,
**kwargs) -> Union[Iterable, list, tuple]
Retrieves all avalibale the documents
Arguments:
graph_type
(str
): Graph type, either "instance" or "schema".skip
(int
): The starting posiion of the returning results, default to be 0count
(int or None
): The maximum number of returned result, if None (default) it will return all of the avalible result.as_list
(bool
): If the result returned as list rather than an iterator.get_data_version
(bool
): If the version of the document(s) should be obtained. If True, the method return the result and the version as a tuple.kwargs
(``): Additional boolean flags for retriving. Currently avaliable: "prefixed", "unfold"
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
iterable
: Stream of dictionariesdef get_existing_classes(self)
Get all the existing classes (only ids) in a database.
def insert_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
full_replace: bool = False,
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024,
raw_json: bool = False) -> None
Inserts the specified document(s)
Arguments:
document
(dict or list of dict
): Document(s) to be inserted.graph_type
(str
): Graph type, either "inference", "instance" or "schema".full_replace
(: bool
): If True then the whole graph will be replaced. WARNING: you should also supply the context object as the first element in the list of documents if using this option.commit_msg
(str
): Commit message.last_data_version
(str
): Last version before the update, used to check if the document has been changed unknowinglycompress
(str or int
): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it isnever
it will never compress the data.raw_json
(bool
): Update as raw json
Raises:
InterfaceError
: if the client does not connect to a database
Returns:
list
: list of ids of the inseted docuemntsdef replace_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024,
create: bool = False,
raw_json: bool = False) -> None
Updates the specified document(s)
Arguments:
document
(dict or list of dict
): Document(s) to be updated.graph_type
(str
): Graph type, either "instance" or "schema".commit_msg
(str
): Commit message.last_data_version
(str
): Last version before the update, used to check if the document has been changed unknowinglycompress
(str or int
): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it isnever
it will never compress the data.create
(bool
): Create the document if it does not yet exist.raw_json
(bool
): Update as raw json
Raises:
InterfaceError
: if the client does not connect to a database
def update_document(
self,
document: Union[dict, List[dict], "Schema", # noqa:F821
"DocumentTemplate", # noqa:F821
List["DocumentTemplate"], # noqa:F821
],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None,
compress: Union[str, int] = 1024) -> None
Updates the specified document(s). Add the document if not existed.
Arguments:
document
(dict or list of dict
): Document(s) to be updated.graph_type
(str
): Graph type, either "instance" or "schema".commit_msg
(str
): Commit message.last_data_version
(str
): Last version before the update, used to check if the document has been changed unknowinglycompress
(str or int
): If it is an integer, size of the data larger than this (in bytes) will be compress with gzip in the request (assume encoding as UTF-8, 0 = always compress). If it isnever
it will never compress the data.
Raises:
InterfaceError
: if the client does not connect to a database
def delete_document(self,
document: Union[str, list, dict, Iterable],
graph_type: str = "instance",
commit_msg: Optional[str] = None,
last_data_version: Optional[str] = None) -> None
Delete the specified document(s)
Arguments:
document
(str or list of str
): Document(s) (as dictionary or DocumentTemplate objects) or id(s) of document(s) to be updated.graph_type
(str
): Graph type, either "instance" or "schema".commit_msg
(str
): Commit message.last_data_version
(str
): Last version before the update, used to check if the document has been changed unknowingly
Raises:
InterfaceError
: if the client does not connect to a database
def has_doc(self, doc_id: str, graph_type: str = "instance") -> bool
Check if a certain document exist in a database
Arguments:
doc_id
(str
): Id of document to be checked.graph_type
(str
): Graph type, either "instance" or "schema".returns
: None-------
: NoneBool
: if the document exist
def get_class_frame(self, class_name)
Get the frame of the class of class_name. Provide information about all the avaliable properties of that class.
Arguments:
class_name
(str
): Name of the classreturns
: None-------
: Nonedict
: Dictionary containing information
def commit(self)
Not implementated: open transactions currently not suportted. Please check back later.
def query(self,
woql_query: Union[dict, WOQLQuery],
commit_msg: Optional[str] = None,
get_data_version: bool = False,
last_data_version: Optional[str] = None) -> Union[dict, str]
Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
Arguments:
woql_query
(dict or WOQLQuery object
): A woql query as an object or dictcommit_mg
(str
): A message that will be written to the commit log to describe the changeget_data_version
(bool
): If the data version of the query result(s) should be obtained. If True, the method return the result and the version as a tuple.last_data_version
(str
): Last version before the update, used to check if the document has been changed unknowinglyfile_dict
(**deprecated**
): File dictionary to be associated with post name => filename, for multipart POST
Raises:
InterfaceError
: if the client does not connect to a database
Examples:
>>> Client(server="http://localhost:6363").query(woql, "updating graph")
Returns:
dict
:def create_branch(self, new_branch_id: str, empty: bool = False) -> None
Create a branch starting from the current branch.
Arguments:
new_branch_id
(str
): New branch identifier.empty
(bool
): Create an empty branch if true (no starting commit)
Raises:
InterfaceError
: if the client does not connect to a database
def delete_branch(self, branch_id: str) -> None
Delete a branch
Arguments:
branch_id
(str
): Branch to delete
Raises:
InterfaceError
: if the client does not connect to a database
def pull(self,
remote: str = "origin",
remote_branch: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Pull updates from a remote repository to the current database.
Arguments:
remote
(str
): remote to pull from, default "origin"remote_branch
(str
): remote branch to pull from, default to be your current barnchmessage
(str
): optional commit messageauthor
(str
): option to overide the author of the operation
Raises:
InterfaceError
: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.pull()
Returns:
dict
:def fetch(self, remote_id: str) -> dict
Fatch the brach from a remote
Arguments:
remote_id
(str
): id of the remote
Raises:
InterfaceError
: if the client does not connect to a database
def push(self,
remote: str = "origin",
remote_branch: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Push changes from a branch to a remote repo
Arguments:
remote
(str
): remote to push to, default "origin"remote_branch
(str
): remote branch to push to, default to be your current barnchmessage
(str
): optional commit messageauthor
(str
): option to overide the author of the operation
Raises:
InterfaceError
: if the client does not connect to a database
Examples:
>>> Client(server="http://localhost:6363").push(remote="origin", remote_branch = "main", author = "admin", message = "commit message"})
Returns:
dict
:def rebase(self,
branch: Optional[str] = None,
commit: Optional[str] = None,
rebase_source: Optional[str] = None,
message: Optional[str] = None,
author: Optional[str] = None) -> dict
Rebase the current branch onto the specified remote branch. Need to specify one of 'branch','commit' or the 'rebase_source'.
Arguments:
branch
(str
): the branch for the rebaserebase_source
(str
): the source branch for the rebasemessage
(str
): the commit messageauthor
(str
): the commit author
Raises:
InterfaceError
: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")
>>> client.rebase("the_branch")
Returns:
dict
:def reset(self,
commit: Optional[str] = None,
soft: bool = False,
use_path: bool = False) -> None
Reset the current branch HEAD to the specified commit path. If
soft
is not True, it will be a hard reset, meaning reset to that commit in the backend and newer commit will be wipped out. If soft
is True, the client will only reference to that commit and can be reset to the newest commit when done.Arguments:
commit
(string
): Commit id or path to the commit (if use_path is True), for instance '234980523ffaf93' or 'admin/database/local/commit/234980523ffaf93'. If not provided, it will reset to the newest commit (useful when need to go back after a soft reset).soft
(bool
): Flag indicating if the reset if soft, that is referencing to a previous commit instead of resetting to a previous commit in the backend and wipping newer commits.use_path
(bool
): Wheather or not the commit given is an id or path. Default using id and use_path is False.
Raises:
InterfaceError
: if the client does not connect to a database
Examples:
>>> client = Client("http://127.0.0.1:6363/")