Skip to content

Client Reference Architecture

This document provides a detailed reference architecture for the client module in the Mapepire Python implementation. It is intended to serve as a guide for implementing Mapepire clients in other programming languages.

Table of Contents

Overview

The Mapepire Python client module provides a WebSocket-based interface to connect to and interact with Mapepire servers. The architecture follows a job-based model where:

  1. A SQLJob establishes and maintains the WebSocket connection to the server
  2. The Query class handles the execution of SQL queries and management of their results
  3. Both classes work together to provide a complete client implementation for Mapepire

This reference architecture aims to provide sufficient details for implementing compatible clients in other programming languages.

SQLJob

SQLJob Class Structure

The SQLJob class extends the BaseJob abstract class, providing concrete implementations for establishing connections and executing queries.

class SQLJob(BaseJob):
def __init__(self, creds=None, options={}, **kwargs):
# Initialization logic
# Connection management methods
def connect(self, db2_server, **kwargs):
# Connection logic
def close(self):
# Close connection logic
# Query methods
def query(self, sql, opts=None):
# Create Query object
def query_and_run(self, sql, opts=None, **kwargs):
# Create and execute Query
def get_status(self):
# Return job status
# Context management
def __enter__(self):
# Resource acquisition
def __exit__(self, *args, **kwargs):
# Resource cleanup

SQLJob Initialization

The SQLJob is initialized with:

  • creds: Server credentials (optional during initialization, required before connection)
  • options: Dictionary of connection options
  • **kwargs: Additional keyword arguments

During initialization, the SQLJob:

  1. Calls the parent class initializer
  2. Initializes the WebSocket connection to None
  3. Sets up internal state tracking (unique ID counter, response handling, job status)
  4. Generates a unique identifier for this job instance

Connection Management

The SQLJob manages WebSocket connections to the Mapepire server:

  1. _get_channel: Creates a WebSocket connection to the server
  2. connect: Establishes a connection with the provided credentials
    • Parses connection input (dictionary, path to config file, or DaemonServer object)
    • Creates a WebSocket channel
    • Sends a connection message to the server
    • Processes the response and updates job status
  3. close: Closes the connection and sets job status to Ended

The connection protocol uses a JSON message format:

{
"id": "[unique_id]",
"type": "connect",
"technique": "tcp",
"application": "Python Client",
"props": "[connection_props]"
}

Query Operations

SQLJob provides methods for creating and executing queries:

  1. query: Creates a new Query object with the given SQL and options
    • Returns a configured Query object but does not execute it
  2. query_and_run: Creates a Query object and immediately executes it
    • Returns the query results
    • Handles any exceptions that occur during execution

Job Status Management

The SQLJob maintains an internal state tracking the connection status:

  • NotStarted: Initial state or after failed connection
  • Ready: After successful connection
  • Busy: (Not explicitly set in code)
  • Ended: After connection closure

Resource Management

SQLJob implements the context manager protocol (__enter__ and __exit__) to allow usage with Python’s with statement, ensuring proper resource cleanup:

with SQLJob(credentials) as job:
results = job.query_and_run("SELECT * FROM table")
# Connection automatically closed when exiting the with block

Query

Query Class Structure

The Query class handles SQL query execution and result management:

class Query(Generic[T]):
global_query_list = [] # Class variable tracking all queries
def __init__(self, job, query, opts):
# Initialization logic
# Query execution methods
def prepare_sql_execute(self):
# Prepare and execute SQL
def run(self, rows_to_fetch=None):
# Run the query
def fetch_more(self, rows_to_fetch=None):
# Fetch additional results
def close(self):
# Close the query
# Context management
def __enter__(self):
# Resource acquisition
def __exit__(self, exc_type, exc_value, traceback):
# Resource cleanup

Query Initialization

The Query is initialized with:

  • job: A reference to the SQLJob managing the connection
  • query: The SQL query string to execute
  • opts: A QueryOptions object controlling execution behavior

During initialization, the Query:

  1. Stores references to the job and SQL
  2. Processes options (is_prepared, parameters, is_cl_command, should_auto_close, is_terse_results)
  3. Sets initial state to NOT_YET_RUN
  4. Adds itself to the global query list

Query State Management

The Query maintains an internal state tracking execution progress:

  • NOT_YET_RUN: Initial state
  • RUN_MORE_DATA_AVAIL: After execution with more results available
  • RUN_DONE: After all results have been fetched or the query is closed
  • ERROR: If an error occurs during execution

Query Execution

Query provides methods for executing SQL:

  1. _execute_query: Internal method that sends a query message and receives the response
  2. prepare_sql_execute: Prepares a SQL statement with parameters
  3. run: Executes the query with optional row count limit
    • Handles different query types (SQL vs. CL command)
    • Updates query state based on response
    • Captures correlation ID for future fetch operations
  4. fetch_more: Retrieves additional rows when more results are available

Data Fetching

The data fetching protocol uses a continuation-based approach:

  1. Initial query execution returns a limited number of rows
  2. If more rows are available, the is_done flag is set to false
  3. Subsequent fetch_more calls use the correlation ID to retrieve more rows
  4. When all data is retrieved, the is_done flag is set to true

Query Resource Management

Query implements the context manager protocol (__enter__ and __exit__) to ensure proper resource cleanup:

with job.query("SELECT * FROM table") as query:
results = query.run()
# If more results available
while query.state != QueryState.RUN_DONE:
more_results = query.fetch_more()
# Query automatically closed when exiting the with block

Data Types

DaemonServer

The DaemonServer dataclass encapsulates server connection parameters:

@dataclass
class DaemonServer:
host: str # Server hostname
user: str # Username
password: str # Password
port: Optional[Union[str, int]] # Server port
ignoreUnauthorized: Optional[bool] # Whether to ignore TLS verification errors
ca: Optional[Union[str, bytes]] # Certificate Authority for TLS verification

QueryOptions

The QueryOptions dataclass configures query execution behavior:

@dataclass
class QueryOptions:
isTerseResults: Optional[bool] # Whether to return simplified results
isClCommand: Optional[bool] # Whether the query is a CL command
parameters: Optional[List[Any]] # Query parameters for prepared statements
autoClose: Optional[bool] # Whether to auto-close the query

QueryState

The QueryState enum tracks the execution state of a query:

class QueryState(Enum):
NOT_YET_RUN = (1,) # Query not executed yet
RUN_MORE_DATA_AVAIL = (2,) # Query executed, more data available
RUN_DONE = (3,) # Query execution complete
ERROR = 4 # Error occurred during execution

JobStatus

The JobStatus enum tracks the state of a SQLJob:

class JobStatus(Enum):
NotStarted = "notStarted" # Initial state
Ready = "ready" # Connected and ready
Busy = "busy" # Executing a query
Ended = "ended" # Connection closed

WebSocket Protocol

The Mapepire client communicates with the server using a JSON-based WebSocket protocol.

Connection Messages

Connection establishment:

{
"id": "[unique_id]",
"type": "connect",
"technique": "tcp",
"application": "Python Client",
"props": "[connection_properties]"
}

Connection response:

{
"id": "[unique_id]",
"success": true,
"job": "[job_id]",
"sql_rc": 0,
"sql_state": ""
}

Query Messages

SQL Query:

{
"id": "[unique_id]",
"type": "sql",
"sql": "[sql_statement]",
"terse": false,
"rows": 100
}

Prepared Statement:

{
"id": "[unique_id]",
"type": "prepare_sql_execute",
"sql": "[sql_statement]",
"rows": 100,
"parameters": ["[param1]", "[param2]", ...]
}

Fetch More:

{
"id": "[unique_id]",
"cont_id": "[correlation_id]",
"type": "sqlmore",
"sql": "[sql_statement]",
"rows": 100
}

Close Query:

{
"id": "[unique_id]",
"cont_id": "[correlation_id]",
"type": "sqlclose"
}

CL Command:

{
"id": "[unique_id]",
"type": "cl",
"terse": false,
"cmd": "[cl_command]"
}

Response Handling

All server responses follow a common structure with specialized fields:

{
"id": "[unique_id]",
"success": true|false,
"sql_rc": 0,
"sql_state": "",
"error": "[error_message]",
"is_done": true|false,
"has_results": true|false,
"update_count": 0,
"metadata": {
"column_count": 3,
"columns": [
{
"display_size": 10,
"label": "COLUMN1",
"name": "COLUMN1",
"type": "INTEGER"
},
...
]
},
"data": [
["row1_col1", "row1_col2", ...],
["row2_col1", "row2_col2", ...],
...
]
}

Error Handling

The Mapepire client implements error handling through several mechanisms:

  1. The @handle_ws_errors decorator catches and processes WebSocket-related exceptions
  2. Connection failures update the job status and raise exceptions with error details
  3. Query execution failures update the query state to ERROR and raise exceptions
  4. The context manager protocol ensures resources are properly cleaned up even when errors occur

When implementing clients in other languages, it is important to maintain similar error handling patterns to ensure robustness and resource cleanup.

This is an outline of the Python reference archetecture for Mapepire. The core components of the reference archetecture are the SQLJob and Query classes. The SQLJob class manages the WebSocket connection to the server and provides methods to create and run queries. The Query class manages the state of the query, sends it to the server, and fetches results.

Core Functions

SQL Job

High-Level Overview

The SQLJob “class-like” object is designed to manage the client connection to the mapepire-server. It handles the creation of unique identifiers for queries, manages WebSocket connections for sending and receiving data, and provides methods to create and run SQL queries. The class ensures that queries are executed in a controlled manner, maintaining their state and handling errors appropriately.

Class Definitions and Functions

class SQLJob:
def __init__(self, options: Dict[Any, Any] = {}) -> None:
"""
Initialize a new instance of the class.
Args:
options (Dict[Any, Any], optional): A dictionary of options for the job. Defaults to an empty dictionary.
Attributes:
options (Dict[Any, Any]): Stores the options passed during initialization.
_unique_id_counter (int): Counter for generating unique IDs.
_reponse_emitter: Placeholder for a response emitter, initially set to None.
_status (JobStatus): The current status of the job, initially set to NotStarted.
_trace_file: Placeholder for a trace file, initially set to None.
_is_tracing_channeldata (bool): Flag indicating whether channel data tracing is enabled, initially set to True.
__unique_id: A unique identifier generated for the job.
id (Optional[str]): An optional identifier for the job, initially set to None.
"""
self.options = options
self._unique_id_counter: int = 0
self._reponse_emitter = None
self._status: JobStatus = JobStatus.NotStarted
self._trace_file = None
self._is_tracing_channeldata: bool = True
self.__unique_id = self._get_unique_id("sqljob")
self.id: Optional[str] = None

_get_unique_id(self, prefix: str = "id") -> str

  • Parameters:

    • prefix (str): A string containing the prefix for the unique identifier.
  • Returns: (str) A unique identifier for a query.

    This function should update a counter for the number of queries executed and return a unique identifier for the query.

_get_channel(self, db2_server: DaemonServer) -> WebSocket

  • Parameters:

    • db2_server (Dict[str, Any]): A dictionary-like object containing the connection information for the DB2 server. Here is a sample definition of the DaemonServer class:
    @dataclass
    class DaemonServer:
    host: str
    user: str
    password: str
    port: int
    ignoreUnauthorized: Optional[bool] = None
    ca: Optional[Union[str, bytes]] = None
  • Returns: (WebSocket) A WebSocket connection to the DB2 server.

    This function is responsible for setting up a secure WebSocket connection to the mapepire-server. It constructs the connection URI, prepares the necessary headers for authentication, configures SSL options, and finally establishes the connection. The function returns the WebSocket object, which can then be used for communication with the server.

send(self, content: Any)

  • Parameters:

    • content (Any): Any object that can be serialized into a JSON string.
  • Returns: None

    This function is responsible for sending data to the mapepire-server over a WebSocket connection.

connect(self, db2_server: DaemonServer) -> Dict[Any, Any]

  • Parameters:

    • db2_server (DaemonServer): A dictionary-like object containing the connection information for the DB2 server. Here is a sample definition of the DaemonServer class:
    @dataclass
    class DaemonServer:
    host: str
    user: str
    password: str
    port: int
    ignoreUnauthorized: Optional[bool] = None
    ca: Optional[Union[str, bytes]] = None
  • Returns: (Dict[str, Any]) A dictionary-like object containing the response from the server.

    This function is responsible for establishing a connection to the mapepire-server. It calls the _get_channel(db2_server) function to create a WebSocket connection, sends the connection request to the server, and waits for a response. The function returns the response from the server, which contains information about the connection status.

query(self, sql: str, opts: Optional[Union[Dict[str, Any], QueryOptions]] = None)

  • Parameters:

    • sql (str): A string containing the SQL query to be executed.
    • opts (Optional[Union[Dict[str, Any], QueryOptions]]): A dictionary-like object containing additional options for the query. Here is a sample definition of the QueryOptions class:
    @dataclass
    class QueryOptions:
    isTerseResults: Optional[bool] = None
    isClCommand: Optional[bool] = None
    parameters: Optional[List[str]] = None
    autoClose: Optional[bool] = None
  • Returns: (Query) A Query object representing the query to be executed.

    This function is responsible for creating a Query object that represents the SQL query to be executed. It constructs the query object with the provided SQL statement and any additional options. The function returns the Query object, which can then be used to run the query.

query_and_run(self, sql: str, opts: Optional[Union[Dict[str, Any], QueryOptions]] = None)

  • Parameters:

    • sql (str): A string containing the SQL query to be executed.
    • opts (Optional[Union[Dict[str, Any], QueryOptions]]): A dictionary-like object containing additional options for the query.
  • Returns: (Query) A Query object representing the query to be executed.

    This function is a convenience method that combines the query and run functions. It creates a Query object with the provided SQL statement and options, then immediately runs the query. The function returns the Query object, which can be used to fetch more data or close the query.

close(self)

  • Parameters: None

  • Returns: None

    This function is responsible for closing the WebSocket connection to the DB2 server. It sends a close request to the server and waits for a response. Once the connection is closed, the function cleans up any resources associated with the connection.

Query

High-Level Overview

The Query class-like object is designed to manage and execute SQL queries, handle their states, and fetch additional data. The Query object inherits an SQLJob instance and uses it’s WebSocket connection to send and receive data. See the Query Class definition below.

Class Definitions and Functions

class Query(Generic[T]):
global_query_list: List["Query[Any]"] = []
def __init__(self, job: SQLJob, query: str, opts: QueryOptions) -> None:
"""
Initialize a new instance of the Query class.
Args:
job (SQLJob): The SQL job associated with this query.
query (str): The SQL query string.
opts (QueryOptions): Options for configuring the query.
Attributes:
job (SQLJob): Stores the SQL job associated with this query.
is_prepared (bool): Indicates whether the query is prepared based on the presence of parameters.
parameters (Optional[List[str]]): The parameters for the query, if any.
sql (str): The SQL query string.
is_cl_command (Optional[bool]): Indicates if the query is a command line command.
should_auto_close (Optional[bool]): Indicates if the query should auto-close.
is_terse_results (Optional[bool]): Indicates if the query should return terse results.
_rows_to_fetch (int): The number of rows to fetch, default is 100.
state (QueryState): The current state of the query, initially set to NOT_YET_RUN.
Class Attributes:
global_query_list (List["Query[Any]"]): A global list of all Query instances.
"""
self.job = job
self.is_prepared: bool = True if opts.parameters is not None else False
self.parameters: Optional[List[str]] = opts.parameters
self.sql: str = query
self.is_cl_command: Optional[bool] = opts.isClCommand
self.should_auto_close: Optional[bool] = opts.autoClose
self.is_terse_results: Optional[bool] = opts.isTerseResults
self._rows_to_fetch: int = 100
self.state: QueryState = QueryState.NOT_YET_RUN
Query.global_query_list.append(self)

run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]

  • Parameters:

    • rows_to_fetch (Optional[int]): An integer specifying the number of rows to fetch from the query result.
  • Returns: (Dict[str, Any]) A dictionary-like object containing the response from the server.

    This function is responsible for executing the query and fetching the initial set of results. It sends the query to the server, waits for a response, and returns the result. If rows_to_fetch is specified, the function fetches the specified number of rows from the result set.

fetch_more(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]

  • Parameters:

    • rows_to_fetch (Optional[int]): An integer specifying the number of rows to fetch from the query result.
  • Returns: (Dict[str, Any]) A dictionary-like object containing the response from the server.

    This function is responsible for fetching additional rows from the query result. It sends a request to the server to fetch more rows, waits for a response, and returns the result.

Query Objects

A query object is a dictionary-like object that represents a SQL query to be executed. It contains the SQL statement, any additional options, and the unique identifier for the query. The query object is used to manage the state of the query, send it to the server, and fetch results.

Here is a sample definition of a query object:

query_object = {
"id": self.job._get_unique_id("query"),
"type": "prepare_sql_execute" if self.is_prepared else "sql",
"sql": self.sql,
"terse": self.is_terse_results,
"rows": rows_to_fetch,
"parameters": self.parameters,
}

Here is a sample definition of a query object for a cl command:

query_object = {
"id": self.job._get_unique_id("clcommand"),
"type": "cl",
"terse": self.is_terse_results,
"cmd": self.sql,
}

here is a sample definition of a query object for fetch more:

query_object = {
"id": self.job._get_unique_id("fetchMore"),
"cont_id": self._correlation_id,
"type": "sqlmore",
"sql": self.sql,
"rows": rows_to_fetch,
}

Example usage in python

creds = DaemonServer(
host="localhost",
port=8085,
user=user,
password=pw,
ignoreUnauthorized=True,
)
job = SQLJob()
res = job.connect(creds)
query = job.query("select * from sample.employee")
result = query.run(rows_to_fetch=5)
job.close()