Client Reference Architecture
This document provides a detailed reference architecture for the client module in the Mapepire Python implementation. It is intended to serve as a guide for implementing Mapepire clients in other programming languages.
Table of Contents
Overview
The Mapepire Python client module provides a WebSocket-based interface to connect to and interact with Mapepire servers. The architecture follows a job-based model where:
- A
SQLJob
establishes and maintains the WebSocket connection to the server - The
Query
class handles the execution of SQL queries and management of their results - Both classes work together to provide a complete client implementation for Mapepire
This reference architecture aims to provide sufficient details for implementing compatible clients in other programming languages.
SQLJob
SQLJob Class Structure
The SQLJob
class extends the BaseJob
abstract class, providing concrete implementations for establishing connections and executing queries.
class SQLJob(BaseJob): def __init__(self, creds=None, options={}, **kwargs): # Initialization logic
# Connection management methods def connect(self, db2_server, **kwargs): # Connection logic
def close(self): # Close connection logic
# Query methods def query(self, sql, opts=None): # Create Query object
def query_and_run(self, sql, opts=None, **kwargs): # Create and execute Query
def get_status(self): # Return job status
# Context management def __enter__(self): # Resource acquisition
def __exit__(self, *args, **kwargs): # Resource cleanup
SQLJob Initialization
The SQLJob
is initialized with:
creds
: Server credentials (optional during initialization, required before connection)options
: Dictionary of connection options**kwargs
: Additional keyword arguments
During initialization, the SQLJob:
- Calls the parent class initializer
- Initializes the WebSocket connection to
None
- Sets up internal state tracking (unique ID counter, response handling, job status)
- Generates a unique identifier for this job instance
Connection Management
The SQLJob
manages WebSocket connections to the Mapepire server:
_get_channel
: Creates a WebSocket connection to the serverconnect
: Establishes a connection with the provided credentials- Parses connection input (dictionary, path to config file, or DaemonServer object)
- Creates a WebSocket channel
- Sends a connection message to the server
- Processes the response and updates job status
close
: Closes the connection and sets job status toEnded
The connection protocol uses a JSON message format:
{ "id": "[unique_id]", "type": "connect", "technique": "tcp", "application": "Python Client", "props": "[connection_props]"}
Query Operations
SQLJob provides methods for creating and executing queries:
query
: Creates a newQuery
object with the given SQL and options- Returns a configured Query object but does not execute it
query_and_run
: Creates a Query object and immediately executes it- Returns the query results
- Handles any exceptions that occur during execution
Job Status Management
The SQLJob maintains an internal state tracking the connection status:
NotStarted
: Initial state or after failed connectionReady
: After successful connectionBusy
: (Not explicitly set in code)Ended
: After connection closure
Resource Management
SQLJob implements the context manager protocol (__enter__
and __exit__
) to allow usage with Python’s with
statement, ensuring proper resource cleanup:
with SQLJob(credentials) as job: results = job.query_and_run("SELECT * FROM table")# Connection automatically closed when exiting the with block
Query
Query Class Structure
The Query
class handles SQL query execution and result management:
class Query(Generic[T]): global_query_list = [] # Class variable tracking all queries
def __init__(self, job, query, opts): # Initialization logic
# Query execution methods def prepare_sql_execute(self): # Prepare and execute SQL
def run(self, rows_to_fetch=None): # Run the query
def fetch_more(self, rows_to_fetch=None): # Fetch additional results
def close(self): # Close the query
# Context management def __enter__(self): # Resource acquisition
def __exit__(self, exc_type, exc_value, traceback): # Resource cleanup
Query Initialization
The Query
is initialized with:
job
: A reference to the SQLJob managing the connectionquery
: The SQL query string to executeopts
: A QueryOptions object controlling execution behavior
During initialization, the Query:
- Stores references to the job and SQL
- Processes options (is_prepared, parameters, is_cl_command, should_auto_close, is_terse_results)
- Sets initial state to
NOT_YET_RUN
- Adds itself to the global query list
Query State Management
The Query
maintains an internal state tracking execution progress:
NOT_YET_RUN
: Initial stateRUN_MORE_DATA_AVAIL
: After execution with more results availableRUN_DONE
: After all results have been fetched or the query is closedERROR
: If an error occurs during execution
Query Execution
Query provides methods for executing SQL:
_execute_query
: Internal method that sends a query message and receives the responseprepare_sql_execute
: Prepares a SQL statement with parametersrun
: Executes the query with optional row count limit- Handles different query types (SQL vs. CL command)
- Updates query state based on response
- Captures correlation ID for future fetch operations
fetch_more
: Retrieves additional rows when more results are available
Data Fetching
The data fetching protocol uses a continuation-based approach:
- Initial query execution returns a limited number of rows
- If more rows are available, the
is_done
flag is set to false - Subsequent
fetch_more
calls use the correlation ID to retrieve more rows - When all data is retrieved, the
is_done
flag is set to true
Query Resource Management
Query implements the context manager protocol (__enter__
and __exit__
) to ensure proper resource cleanup:
with job.query("SELECT * FROM table") as query: results = query.run() # If more results available while query.state != QueryState.RUN_DONE: more_results = query.fetch_more()# Query automatically closed when exiting the with block
Data Types
DaemonServer
The DaemonServer
dataclass encapsulates server connection parameters:
@dataclassclass DaemonServer: host: str # Server hostname user: str # Username password: str # Password port: Optional[Union[str, int]] # Server port ignoreUnauthorized: Optional[bool] # Whether to ignore TLS verification errors ca: Optional[Union[str, bytes]] # Certificate Authority for TLS verification
QueryOptions
The QueryOptions
dataclass configures query execution behavior:
@dataclassclass QueryOptions: isTerseResults: Optional[bool] # Whether to return simplified results isClCommand: Optional[bool] # Whether the query is a CL command parameters: Optional[List[Any]] # Query parameters for prepared statements autoClose: Optional[bool] # Whether to auto-close the query
QueryState
The QueryState
enum tracks the execution state of a query:
class QueryState(Enum): NOT_YET_RUN = (1,) # Query not executed yet RUN_MORE_DATA_AVAIL = (2,) # Query executed, more data available RUN_DONE = (3,) # Query execution complete ERROR = 4 # Error occurred during execution
JobStatus
The JobStatus
enum tracks the state of a SQLJob:
class JobStatus(Enum): NotStarted = "notStarted" # Initial state Ready = "ready" # Connected and ready Busy = "busy" # Executing a query Ended = "ended" # Connection closed
WebSocket Protocol
The Mapepire client communicates with the server using a JSON-based WebSocket protocol.
Connection Messages
Connection establishment:
{ "id": "[unique_id]", "type": "connect", "technique": "tcp", "application": "Python Client", "props": "[connection_properties]"}
Connection response:
{ "id": "[unique_id]", "success": true, "job": "[job_id]", "sql_rc": 0, "sql_state": ""}
Query Messages
SQL Query:
{ "id": "[unique_id]", "type": "sql", "sql": "[sql_statement]", "terse": false, "rows": 100}
Prepared Statement:
{ "id": "[unique_id]", "type": "prepare_sql_execute", "sql": "[sql_statement]", "rows": 100, "parameters": ["[param1]", "[param2]", ...]}
Fetch More:
{ "id": "[unique_id]", "cont_id": "[correlation_id]", "type": "sqlmore", "sql": "[sql_statement]", "rows": 100}
Close Query:
{ "id": "[unique_id]", "cont_id": "[correlation_id]", "type": "sqlclose"}
CL Command:
{ "id": "[unique_id]", "type": "cl", "terse": false, "cmd": "[cl_command]"}
Response Handling
All server responses follow a common structure with specialized fields:
{ "id": "[unique_id]", "success": true|false, "sql_rc": 0, "sql_state": "", "error": "[error_message]", "is_done": true|false, "has_results": true|false, "update_count": 0, "metadata": { "column_count": 3, "columns": [ { "display_size": 10, "label": "COLUMN1", "name": "COLUMN1", "type": "INTEGER" }, ... ] }, "data": [ ["row1_col1", "row1_col2", ...], ["row2_col1", "row2_col2", ...], ... ]}
Error Handling
The Mapepire client implements error handling through several mechanisms:
- The
@handle_ws_errors
decorator catches and processes WebSocket-related exceptions - Connection failures update the job status and raise exceptions with error details
- Query execution failures update the query state to
ERROR
and raise exceptions - The context manager protocol ensures resources are properly cleaned up even when errors occur
When implementing clients in other languages, it is important to maintain similar error handling patterns to ensure robustness and resource cleanup.
This is an outline of the Python reference archetecture for Mapepire
. The core components of the reference archetecture are the SQLJob
and Query
classes. The SQLJob
class manages the WebSocket connection to the server and provides methods to create and run queries. The Query
class manages the state of the query, sends it to the server, and fetches results.
Core Functions
SQL Job
High-Level Overview
The SQLJob
“class-like” object is designed to manage the client connection to the mapepire-server
. It handles the creation of unique identifiers for queries, manages WebSocket connections for sending and receiving data, and provides methods to create and run SQL queries. The class ensures that queries are executed in a controlled manner, maintaining their state and handling errors appropriately.
Class Definitions and Functions
class SQLJob: def __init__(self, options: Dict[Any, Any] = {}) -> None: """ Initialize a new instance of the class.
Args: options (Dict[Any, Any], optional): A dictionary of options for the job. Defaults to an empty dictionary.
Attributes: options (Dict[Any, Any]): Stores the options passed during initialization. _unique_id_counter (int): Counter for generating unique IDs. _reponse_emitter: Placeholder for a response emitter, initially set to None. _status (JobStatus): The current status of the job, initially set to NotStarted. _trace_file: Placeholder for a trace file, initially set to None. _is_tracing_channeldata (bool): Flag indicating whether channel data tracing is enabled, initially set to True. __unique_id: A unique identifier generated for the job. id (Optional[str]): An optional identifier for the job, initially set to None. """ self.options = options self._unique_id_counter: int = 0 self._reponse_emitter = None self._status: JobStatus = JobStatus.NotStarted self._trace_file = None self._is_tracing_channeldata: bool = True
self.__unique_id = self._get_unique_id("sqljob") self.id: Optional[str] = None
_get_unique_id(self, prefix: str = "id") -> str
-
Parameters:
- prefix (
str
): A string containing the prefix for the unique identifier.
- prefix (
-
Returns: (
str
) A unique identifier for a query.This function should update a counter for the number of queries executed and return a unique identifier for the query.
_get_channel(self, db2_server: DaemonServer) -> WebSocket
-
Parameters:
- db2_server (
Dict[str, Any]
): A dictionary-like object containing the connection information for the DB2 server. Here is a sample definition of theDaemonServer
class:
@dataclassclass DaemonServer:host: struser: strpassword: strport: intignoreUnauthorized: Optional[bool] = Noneca: Optional[Union[str, bytes]] = None - db2_server (
-
Returns: (
WebSocket
) A WebSocket connection to the DB2 server.This function is responsible for setting up a secure WebSocket connection to the
mapepire-server
. It constructs the connection URI, prepares the necessary headers for authentication, configures SSL options, and finally establishes the connection. The function returns the WebSocket object, which can then be used for communication with the server.
send(self, content: Any)
-
Parameters:
- content (
Any
): Any object that can be serialized into a JSON string.
- content (
-
Returns: None
This function is responsible for sending data to the
mapepire-server
over a WebSocket connection.
connect(self, db2_server: DaemonServer) -> Dict[Any, Any]
-
Parameters:
- db2_server (
DaemonServer
): A dictionary-like object containing the connection information for the DB2 server. Here is a sample definition of theDaemonServer
class:
@dataclassclass DaemonServer:host: struser: strpassword: strport: intignoreUnauthorized: Optional[bool] = Noneca: Optional[Union[str, bytes]] = None - db2_server (
-
Returns: (
Dict[str, Any]
) A dictionary-like object containing the response from the server.This function is responsible for establishing a connection to the
mapepire-server
. It calls the_get_channel(db2_server)
function to create a WebSocket connection, sends the connection request to the server, and waits for a response. The function returns the response from the server, which contains information about the connection status.
query(self, sql: str, opts: Optional[Union[Dict[str, Any], QueryOptions]] = None)
-
Parameters:
- sql (
str
): A string containing the SQL query to be executed. - opts (
Optional[Union[Dict[str, Any], QueryOptions]]
): A dictionary-like object containing additional options for the query. Here is a sample definition of theQueryOptions
class:
@dataclassclass QueryOptions:isTerseResults: Optional[bool] = NoneisClCommand: Optional[bool] = Noneparameters: Optional[List[str]] = NoneautoClose: Optional[bool] = None - sql (
-
Returns: (
Query
) AQuery
object representing the query to be executed.This function is responsible for creating a
Query
object that represents the SQL query to be executed. It constructs the query object with the provided SQL statement and any additional options. The function returns theQuery
object, which can then be used to run the query.
query_and_run(self, sql: str, opts: Optional[Union[Dict[str, Any], QueryOptions]] = None)
-
Parameters:
- sql (
str
): A string containing the SQL query to be executed. - opts (
Optional[Union[Dict[str, Any], QueryOptions]]
): A dictionary-like object containing additional options for the query.
- sql (
-
Returns: (
Query
) AQuery
object representing the query to be executed.This function is a convenience method that combines the
query
andrun
functions. It creates aQuery
object with the provided SQL statement and options, then immediately runs the query. The function returns theQuery
object, which can be used to fetch more data or close the query.
close(self)
-
Parameters: None
-
Returns: None
This function is responsible for closing the WebSocket connection to the DB2 server. It sends a close request to the server and waits for a response. Once the connection is closed, the function cleans up any resources associated with the connection.
Query
High-Level Overview
The Query
class-like object is designed to manage and execute SQL queries, handle their states, and fetch additional data. The Query
object inherits an SQLJob
instance and uses it’s WebSocket connection to send and receive data. See the Query Class definition below.
Class Definitions and Functions
class Query(Generic[T]): global_query_list: List["Query[Any]"] = []
def __init__(self, job: SQLJob, query: str, opts: QueryOptions) -> None: """ Initialize a new instance of the Query class.
Args: job (SQLJob): The SQL job associated with this query. query (str): The SQL query string. opts (QueryOptions): Options for configuring the query.
Attributes: job (SQLJob): Stores the SQL job associated with this query. is_prepared (bool): Indicates whether the query is prepared based on the presence of parameters. parameters (Optional[List[str]]): The parameters for the query, if any. sql (str): The SQL query string. is_cl_command (Optional[bool]): Indicates if the query is a command line command. should_auto_close (Optional[bool]): Indicates if the query should auto-close. is_terse_results (Optional[bool]): Indicates if the query should return terse results. _rows_to_fetch (int): The number of rows to fetch, default is 100. state (QueryState): The current state of the query, initially set to NOT_YET_RUN.
Class Attributes: global_query_list (List["Query[Any]"]): A global list of all Query instances. """ self.job = job self.is_prepared: bool = True if opts.parameters is not None else False self.parameters: Optional[List[str]] = opts.parameters self.sql: str = query self.is_cl_command: Optional[bool] = opts.isClCommand self.should_auto_close: Optional[bool] = opts.autoClose self.is_terse_results: Optional[bool] = opts.isTerseResults
self._rows_to_fetch: int = 100 self.state: QueryState = QueryState.NOT_YET_RUN
Query.global_query_list.append(self)
run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]
-
Parameters:
- rows_to_fetch (
Optional[int]
): An integer specifying the number of rows to fetch from the query result.
- rows_to_fetch (
-
Returns: (
Dict[str, Any]
) A dictionary-like object containing the response from the server.This function is responsible for executing the query and fetching the initial set of results. It sends the query to the server, waits for a response, and returns the result. If
rows_to_fetch
is specified, the function fetches the specified number of rows from the result set.
fetch_more(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]
-
Parameters:
- rows_to_fetch (
Optional[int]
): An integer specifying the number of rows to fetch from the query result.
- rows_to_fetch (
-
Returns: (
Dict[str, Any]
) A dictionary-like object containing the response from the server.This function is responsible for fetching additional rows from the query result. It sends a request to the server to fetch more rows, waits for a response, and returns the result.
Query Objects
A query object is a dictionary-like object that represents a SQL query to be executed. It contains the SQL statement, any additional options, and the unique identifier for the query. The query object is used to manage the state of the query, send it to the server, and fetch results.
Here is a sample definition of a query object:
query_object = { "id": self.job._get_unique_id("query"), "type": "prepare_sql_execute" if self.is_prepared else "sql", "sql": self.sql, "terse": self.is_terse_results, "rows": rows_to_fetch, "parameters": self.parameters,}
Here is a sample definition of a query object for a cl command:
query_object = { "id": self.job._get_unique_id("clcommand"), "type": "cl", "terse": self.is_terse_results, "cmd": self.sql,}
here is a sample definition of a query object for fetch more:
query_object = { "id": self.job._get_unique_id("fetchMore"), "cont_id": self._correlation_id, "type": "sqlmore", "sql": self.sql, "rows": rows_to_fetch,}
Example usage in python
creds = DaemonServer( host="localhost", port=8085, user=user, password=pw, ignoreUnauthorized=True,)
job = SQLJob()res = job.connect(creds)query = job.query("select * from sample.employee")result = query.run(rows_to_fetch=5)job.close()