Module pyfx.internal.connection
Interacting with a Pricefx platform directly.
This platform defines a low-level wrapper to interact with the Pricefx platform.
For a higher level API, see the pyfx.api.domain
package.
Classes
class Connection
-
Abstract class for a Connection.
A connection abstract the low-lever details of interacting with a "backend". This backend can be an actual Pricefx instance, or can be substituted with, e.g. a local mock.
Ancestors
- abc.ABC
Subclasses
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
Add an object and returns its actual attributes.
def attach_file(self, typedid: str, name: str, content:
) ‑> None -
Attach a content as a file with a given name to the model.
def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None
-
Create a table in the backend.
Args
name
- the name of the table to push
fields_spec
- a list of dictionaries that describe the columns of the data source
[ { name:"productId", label:"Product ID", type:"TEXT", key:true}, { name:"productGroup", label:"ProductGroup", type:"TEXT", dimension:true}, { name:"revenue", label:"Revenue", type:"MONEY"}, ]
content
- the content of the source file to push in avro format
label
- the label of the table (optional, defaults to name)
owner_typedid
- the name of the table owner (optional, set it to create a DMT)
replace_existing
- if True then removes the preceding data source having the same name if it exists before pushing the data (default = True).
def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]
-
Get the calculation items associated with a specific typedid.
def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
Get the attributes of a specific fields collection.
def get_object(self, typedid: str) ‑> Dict[str, Any]
-
Get the attributes of a specific element.
def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]
-
Get the information about attachments of the model.
def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]
-
List all the fields collections of a given type.
def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]
-
List all the elements of a given type.
def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
Fetch file from the backend.
def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None
-
Push a new calculation item associated with a specific typedid.
def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
Stream the content of a data source.
def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
Update an object and returns its actual attributes.
def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None
-
Update the job status on the backend.
Args
jst_id
- the id of the jst
status_code
- the new job status
progress
- the new job progress percent (optional)
msg
- a message for the new status change (optional)
results
- a dictionary associating result names to their value (optional). Will be available to next evaluations and calculations.
def update_table(self, typedid: str, data: AvroStream) ‑> None
-
Update data of an already existing DMTable or DMDataSource.
Values of rows with same keys will be updated. Rows with new keys will be appended.
Args
typedid
- the typed id of the table to append
data
- the content of the data to push in avro format
class ConnectionComposed (dispatch: Dict[str, Connection], default_connection: Connection)
-
A connection that composes a remote and local connections.
This composite connection allows to mix and match calls between a remote and local connection according to a given configuration.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def attach_file(self, typedid: str, name: str, content:
) ‑> None -
See
Connection
corresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None
-
See
Connection
corresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def get_object(self, typedid: str) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None
-
See
Connection
corresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None
-
See
Connection
corresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None
-
See
Connection
corresponding method.
class ConnectionLocal (path: pathlib.Path, logformat: Optional[str] = None)
-
A connection that use the local filesystem.
This "connection" actually pull/push data from/to the local filesystem. Useful debugging or runing scripts locally.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method.This implementation of the methods always returns the attributes.
def attach_file(self, typedid: str, name: str, content:
) ‑> None -
See
Connection
corresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None
-
See
Connection
corresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
See
Connection
corresponding method.This implementation will returns a minimal dict.
def get_object(self, typedid: str) ‑> Dict[str, Any]
-
See
Connection
corresponding method.This implementation will returns a minimal dict.
def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method.This implementation of the methods always returns an empty list.
def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method.This implementation of the methods always returns an empty list.
def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None
-
See
Connection
corresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method.This implementation of the methods always returns the attributes.
def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None
-
See
Connection
corresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None
-
See
Connection
corresponding method.
class ConnectionRemote (endpoint: str, pfxsession: PfxSession)
-
A connection to a remote instance.
This class abstracts the various aspects of interacting with the Pricefx platform behind an unified interface.
Ancestors
- Connection
- abc.ABC
Methods
def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def attach_file(self, typedid: str, name: str, content:
) ‑> None -
See
Connection
corresponding method. def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None
-
See
Connection
corresponding method. def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def get_object(self, typedid: str) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]
-
See
Connection
corresponding method. def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None
-
See
Connection
corresponding method. def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]
-
See
Connection
corresponding method. def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]
-
See
Connection
corresponding method. def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None
-
See
Connection
corresponding method. def update_table(self, typedid: str, data: AvroStream) ‑> None
-
See
Connection
corresponding method.
class JobStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Possible status of a job for the Pricefx REST API.
Ancestors
- enum.Enum
Class variables
var FAILED
var FINISHED
var PROCESSING