Module pyfx.internal.connection
Interacting with a Pricefx platform directly.
This platform defines a low-level wrapper to interact with the Pricefx platform.
For a higher level API, see the pyfx.api.domain package.
Classes
class Connection-
Abstract class for a Connection.
A connection abstract the low-lever details of interacting with a "backend". This backend can be an actual Pricefx instance, or can be substituted with, e.g. a local mock.
Ancestors
- abc.ABC
Subclasses
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
Add an object and returns its actual attributes.
def attach_file(self, typedid: str, name: str, content:) ‑> None -
Attach a content as a file with a given name to the model.
def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None-
Create a table in the backend.
Args
name- the name of the table to push
fields_spec- a list of dictionaries that describe the columns of the data source
[ { name:"productId", label:"Product ID", type:"TEXT", key:true}, { name:"productGroup", label:"ProductGroup", type:"TEXT", dimension:true}, { name:"revenue", label:"Revenue", type:"MONEY"}, ]content- the content of the source file to push in avro format
label- the label of the table (optional, defaults to name)
owner_typedid- the name of the table owner (optional, set it to create a DMT)
replace_existing- if True then removes the preceding data source having the same name if it exists before pushing the data (default = True).
def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]-
Get the calculation items associated with a specific typedid.
def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]-
Get the attributes of a specific fields collection.
def get_object(self, typedid: str) ‑> Dict[str, Any]-
Get the attributes of a specific element.
def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]-
Get the information about attachments of the model.
def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]-
List all the fields collections of a given type.
def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]-
List all the elements of a given type.
def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
Fetch file from the backend.
def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None-
Push a new calculation item associated with a specific typedid.
def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
Stream the content of a data source.
def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
Update an object and returns its actual attributes.
def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None-
Update the job status on the backend.
Args
jst_id- the id of the jst
status_code- the new job status
progress- the new job progress percent (optional)
msg- a message for the new status change (optional)
results- a dictionary associating result names to their value (optional). Will be available to next evaluations and calculations.
def update_table(self, typedid: str, data: AvroStream) ‑> None-
Update data of an already existing DMTable or DMDataSource.
Values of rows with same keys will be updated. Rows with new keys will be appended.
Args
typedid- the typed id of the table to append
data- the content of the data to push in avro format
class ConnectionComposed (dispatch: Dict[str, Connection], default_connection: Connection)-
A connection that composes a remote and local connections.
This composite connection allows to mix and match calls between a remote and local connection according to a given configuration.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def attach_file(self, typedid: str, name: str, content:) ‑> None -
See
Connectioncorresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None-
See
Connectioncorresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def get_object(self, typedid: str) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None-
See
Connectioncorresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None-
See
Connectioncorresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None-
See
Connectioncorresponding method.
class ConnectionLocal (path: pathlib.Path, logformat: Optional[str] = None)-
A connection that use the local filesystem.
This "connection" actually pull/push data from/to the local filesystem. Useful debugging or runing scripts locally.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method.This implementation of the methods always returns the attributes.
def attach_file(self, typedid: str, name: str, content:) ‑> None -
See
Connectioncorresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None-
See
Connectioncorresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]-
See
Connectioncorresponding method.This implementation will returns a minimal dict.
def get_object(self, typedid: str) ‑> Dict[str, Any]-
See
Connectioncorresponding method.This implementation will returns a minimal dict.
def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method.This implementation of the methods always returns an empty list.
def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method.This implementation of the methods always returns an empty list.
def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None-
See
Connectioncorresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method.This implementation of the methods always returns the attributes.
def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None-
See
Connectioncorresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None-
See
Connectioncorresponding method.
class ConnectionRemote (endpoint: str, pfxsession: PfxSession)-
A connection to a remote instance.
This class abstracts the various aspects of interacting with the Pricefx platform behind an unified interface.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def attach_file(self, typedid: str, name: str, content:) ‑> None -
See
Connectioncorresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None-
See
Connectioncorresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def get_object(self, typedid: str) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]-
See
Connectioncorresponding method. def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None-
See
Connectioncorresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]-
See
Connectioncorresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]-
See
Connectioncorresponding method. def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None-
See
Connectioncorresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None-
See
Connectioncorresponding method.
class JobStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Possible status of a job for the Pricefx REST API.
Ancestors
- enum.Enum
Class variables
var FAILEDvar FINISHEDvar PROCESSING