Module pyfx.internal.connection

Interacting with a Pricefx platform directly.

This platform defines a low-level wrapper to interact with the Pricefx platform. For a higher level API, see the pyfx.api.domain package.


class Connection

Abstract class for a Connection.

A connection abstract the low-lever details of interacting with a "backend". This backend can be an actual Pricefx instance, or can be substituted with, e.g. a local mock.


  • abc.ABC



def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

Add an object and returns its actual attributes.

def append_to_table(self, typedid: str, data: AvroStream) ‑> None

Append data to an already existing DMTable or DMDataSource.


the typed id of the table to append
the content of the data to push in avro format
def attach_file(self, typedid: str, name: str, content: ) ‑> None

Attach a content as a file with a given name to the model.

def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None

Create a table in the backend.


the name of the table to push
a list of dictionaries that describe the columns of the data source
   { name:"productId", label:"Product ID", type:"TEXT", key:true},
   { name:"productGroup", label:"ProductGroup", type:"TEXT", dimension:true},
   { name:"revenue", label:"Revenue", type:"MONEY"},
the content of the source file to push in avro format
the label of the table (optional, defaults to name)
the name of the table owner (optional, set it to create a DMT)
if True then removes the preceding data source having the same name if it exists before pushing the data (default = True).
def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]

Get the calculation items associated with a specific typedid.

def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]

Get the attributes of a specific fields collection.

def get_object(self, typedid: str) ‑> Dict[str, Any]

Get the attributes of a specific element.

def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]

Get the information about attachments of the model.

def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]

List all the fields collections of a given type.

def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]

List all the elements of a given type.

def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

Fetch file from the backend.

def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None

Push a new calculation item associated with a specific typedid.

def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

Stream the content of a data source.

def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

Update an object and returns its actual attributes.

def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None

Update the job status on the backend.


the id of the jst
the new job status
the new job progress percent (optional)
a message for the new status change (optional)
a dictionary associating result names to their value (optional). Will be available to next evaluations and calculations.
class ConnectionComposed (dispatch: Dict[str, Connection], default_connection: Connection)

A connection that composes a remote and local connections.

This composite connection allows to mix and match calls between a remote and local connection according to a given configuration.



def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

def append_to_table(self, typedid: str, data: AvroStream) ‑> None

See Connection corresponding method.

def attach_file(self, typedid: str, name: str, content: ) ‑> None

See Connection corresponding method.

def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None

See Connection corresponding method.

def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]

See Connection corresponding method.

def get_object(self, typedid: str) ‑> Dict[str, Any]

See Connection corresponding method.

def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None

See Connection corresponding method.

def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None

See Connection corresponding method.

class ConnectionLocal (path: pathlib.Path, logformat: Optional[str] = None)

A connection that use the local filesystem.

This "connection" actually pull/push data from/to the local filesystem. Useful debugging or runing scripts locally.



def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

This implementation of the methods always returns the attributes.

def append_to_table(self, typedid: str, data: AvroStream) ‑> None

See Connection corresponding method.

def attach_file(self, typedid: str, name: str, content: ) ‑> None

See Connection corresponding method.

def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None

See Connection corresponding method.

def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]

See Connection corresponding method.

This implementation will returns a minimal dict.

def get_object(self, typedid: str) ‑> Dict[str, Any]

See Connection corresponding method.

This implementation will returns a minimal dict.

def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]

See Connection corresponding method.

This implementation of the methods always returns an empty list.

def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

This implementation of the methods always returns an empty list.

def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None

See Connection corresponding method.

def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

This implementation of the methods always returns the attributes.

def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None

See Connection corresponding method.

class ConnectionRemote (endpoint: str, pfxsession: PfxSession)

A connection to a remote instance.

This class abstracts the various aspects of interacting with the Pricefx platform behind an unified interface.



def add_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

def append_to_table(self, typedid: str, data: AvroStream) ‑> None

See Connection corresponding method.

def attach_file(self, typedid: str, name: str, content: ) ‑> None

See Connection corresponding method.

def create_table(self, name: str, fields_spec: List[Dict], content: AvroStream, label: Optional[str] = None, owner_typedid: Optional[str] = None, replace_existing: bool = True) ‑> None

See Connection corresponding method.

def get_calcitems(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def get_fcs(self, typedid: str, params: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]

See Connection corresponding method.

def get_object(self, typedid: str) ‑> Dict[str, Any]

See Connection corresponding method.

def list_attachments(self, typedid: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def list_fcs(self, type_code: str, params: Optional[Dict[str, Any]] = None) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def list_objects(self, type_code: str) ‑> List[Dict[str, Any]]

See Connection corresponding method.

def pull_file(self, owner_typedid: str, attachment_typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def push_calcitem(self, typedid: str, key1: str, key2: str, value: Any) ‑> None

See Connection corresponding method.

def stream_fcs(self, typedid: str, chunk_size: int = 128) ‑> Iterator[bytes]

See Connection corresponding method.

def update_object(self, type_code: str, attributes: Dict[str, Any]) ‑> Dict[str, Any]

See Connection corresponding method.

def update_status(self, jst_id: int, status_code: JobStatus, progress: Optional[int], msg: Optional[str] = None, results: Optional[Dict[str, Any]] = None) ‑> None

See Connection corresponding method.

class JobStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

Possible status of a job for the Pricefx REST API.


  • enum.Enum

Class variables