Skip to main content

KeySession

init

KeySession(application_name: str,
api_key: str,
url: str,
auto_start: bool = True,
session_state_listener: Callable[[KeySessionState, str], None] | None = None,
timeout: int = 10,
auto_refresh_schema: bool = True,
exit_on_state_error: bool = True):

Initialize a new KeySession for connecting to the KeySquare platform.

KeySession is the main entry point for interacting with the KeySquare platform, providing methods to subscribe, snapshot, and publish data. The session handles authentication, connection management, and schema synchronization automatically.

Parameters

application_name : str
The name of your application as registered with the KeySquare platform. This is used for authentication and auditing purposes.

api_key : str
The API key for authenticating with the KeySquare platform. This should be obtained from your KeySquare administrator.

url : str
The URL of the KeySquare platform in the format "hostname:port". Example: "localhost:6800" or "keyaccess.company.com:6800"

auto_start : bool, optional
Whether to automatically start the connection upon initialization. If False, you must call start() manually. Default is True.

session_state_listener : Callable[[KeySessionState, str], None], optional
A callback function to handle session state changes. The function receives the new state and an optional message. Default is None.

timeout : int, optional
The default timeout in seconds for starting and data requests. Default is 10 seconds.

auto_refresh_schema : bool, optional
Whether to automatically refresh schema definitions when schema updates are received from the platform. Default is True.

exit_on_state_error : bool, optional
Whether to raise exceptions when the session enters an error state. If False, errors will be logged but not raised. Default is True.

Raises

RuntimeError
If the session fails to connect when auto_start is True, or if there are authentication or network issues during initialization.

Examples

# Basic usage with auto-start
ks = KeySession(
application_name="my-trading-app",
api_key="edcfcdd2-66f0-4fb3-8171-eb655bf56f1c",
url="keyaccess.company.com:6800"
)

# Manual start with custom timeout
ks = KeySession(
application_name="my-app",
api_key="your-api-key",
url="localhost:6800",
auto_start=False,
timeout=30
)
ks.start()

# With custom state listener
def state_handler(state, message):
print(f"Session state changed to: {state} - {message}")

ks = KeySession(
application_name="monitoring-app",
api_key="your-api-key",
url="keyaccess.company.com:6800",
session_state_listener=state_handler
)

KeySession Public Methods

Connection Management

start
Start the KeySession and establish connection to the KeySquare platform. Initializes connection, starts heartbeat publisher, and sets up default message processors.

stop
Stop the KeySession and disconnect from the KeySquare platform. Gracefully shuts down by stopping heartbeat publisher and cleaning up all resources. Note: once stopped, the client should not be restarted.

stop_sync
Synchronously stop the KeySession. Provides synchronous way to stop, useful in notebooks or when async stop() might hang. Note: once stopped, the client should not be restarted.

status
Get the current status of the KeySession connection. Returns KeySessionState indicating if session is connected, disconnected, or in error state.

Data Operations

snapshot
Gets a snapshot of data for the requested data type from the KeySquare platform. Blocks until all snapshot data is received and returns DataFrame or list.

subscribe
Subscribes to live data from the KeySquare platform. Creates persistent subscription that calls callback function for each new update received.

unsubscribe
Unsubscribes from a data subscription and cleans up associated resources. Only sends platform unsubscribe if this is the last subscription for the topic.

publish
Publish data to the KeySquare platform. Sends data and blocks until publication acknowledgment is received, ensuring reliable delivery.

Schema and Discovery

catalog
Get the catalog of available data types and schemas on the KeySquare platform. Returns list of (schema_name, data_type_name) tuples for discovering available data.

sample
Get a sample message structure for the specified data type. Returns sample KeyMessage showing expected structure and field types.

Advanced Features

edit
Send field update requests to modify published data on the KeySquare platform. Allows applications to request modifications to previously published data via patch messages.

store
[EXPERIMENTAL] Store data in the KeySquare message store for persistence. Unlike publish(), saves data for long-term persistence and later retrieval.

set_up_schema_refresh
Set up automatic schema refresh when schema updates are received. Creates subscription to schema update notifications for automatic synchronization.

get_session_data
Get structured session details for the current KeySquare connection. Provides comprehensive connection metadata including URLs, hostnames, ports, and status.

Class Attributes

unlock_experimental_features
A boolean flag to enable experimental features (like store())

Connection Management

start

start(self)

Start the KeySession and establish connection to the KeySquare platform.

This method initializes the connection, starts the heartbeat publisher, sets up default message processors, and optionally configures schema refresh subscriptions. The method will block until the connection is established or times out.

Notes

  • This method is automatically called if auto_start=True in __init__
  • The session must be started before calling snapshot, subscribe, or publish
  • Default subscriptions for acknowledgments are automatically created
  • Schema refresh subscription is created if auto_refresh_schema=True

Raises

RuntimeError
If the session fails to start within the timeout period, or if there are connection or authentication issues.

Example

# Manual start after initialization with auto_start=False
ks = KeySession(
application_name="my-app",
api_key="your-api-key",
url="localhost:6800",
auto_start=False
)
await ks.start() # or ks.start() if using run_async decorator

# Check status after starting
print(f"Session status: {ks.status()}")
# Session status: KeySessionState.CONNECTED

See Also

stop - Stop the session and clean up resources
status - Check the current session state

stop

stop(self)

Stop the KeySession and disconnect from the KeySquare platform.

This method gracefully shuts down the session by stopping the heartbeat publisher, disconnecting from the platform, and cleaning up all resources. The method will block until the shutdown is complete or times out.

Notes

  • All active subscriptions will be automatically unsubscribed
  • The session should not be restarted through start; instead, please recreate the session using init
  • This method is automatically called when the KeySession object is garbage collected
  • Use stop_sync() for synchronous stopping in notebooks or when async hangs

Raises

RuntimeError
If the session components fail to stop within the timeout period.

Example

ks = KeySession(
application_name="my-app",
api_key="your-api-key",
url="localhost:6800"
)

# Do some work with the session
data = ks.snapshot("Price")

# Clean shutdown
await ks.stop()

# Verify session is stopped
print(f"Session status: {ks.status()}")
# Session status: KeySessionState.DISCONNECTED

See Also

stop_sync - Synchronous version of stop for use in notebooks
start - Start the session connection
status - Check the current session state

stop_sync

stop_sync(self)

Synchronously stop the KeySession and disconnect from the KeySquare platform.

This method provides a synchronous way to stop the KeySession, useful in notebooks or environments where the async stop() method might hang. It performs the same cleanup as stop() but without async/await.

Notes

  • Use this method in Jupyter notebooks or when async stop() hangs
  • All active subscriptions will be automatically unsubscribed
  • The session can be restarted by calling start()
  • This method blocks the calling thread until shutdown is complete

Raises

RuntimeError
If the session components fail to stop within the timeout period.

Example

# In a Jupyter notebook or synchronous context
ks = KeySession(
application_name="notebook-app",
api_key="your-api-key",
url="localhost:6800"
)

# Do some analysis
prices = ks.snapshot("Price")
print(f"Retrieved {len(prices)} price records")

# Synchronous cleanup - good for notebooks
ks.stop_sync()
print("Session stopped successfully")
# Session stopped successfully

# Alternative: use in a try/finally block
ks = KeySession("my-app", "api-key", "localhost:6800")
try:
data = ks.snapshot("Instrument")
finally:
ks.stop_sync() # Ensure cleanup even if errors occur

See Also

stop - Asynchronous version of stop
start - Start the session connection

status

status(self) ‑> keysquarepy.key_session_manager.KeySessionState

Get the current status of the KeySession connection.

Returns the current state of the session, which indicates whether the session is connected, disconnected, or in an error state.

Returns

KeySessionState
The current state of the session. See KeySessionState for detailed information about all possible states including:

  • START: Initial state when session is created
  • REGISTERING/REGISTERED: Authentication phases
  • CONNECTING: Establishing gRPC connection
  • DOWNLOADING_SCHEMA/SCHEMA_DOWNLOADED: Schema synchronization
  • CONNECTED: Ready for operations
  • DISCONNECTED: Not connected to platform
  • REGISTRATION_FAILED/SCHEMA_DOWNLOAD_FAILED: Specific error states
  • ERROR: General error state

Example

ks = KeySession(
application_name="my-app",
api_key="your-api-key",
url="localhost:6800"
)

# Check if session is ready for operations
if ks.status() == KeySessionState.CONNECTED:
print("Session is ready!")
data = ks.snapshot("Price")
else:
print(f"Session not ready: {ks.status()}")

# Monitor session status
status = ks.status()
print(f"Current session state: {status}")
# Current session state: KeySessionState.CONNECTED

See Also

start - Start the session connection
stop - Stop the session connection

Data Operations

snapshot

snapshot(self, 
data_type: str,
schema: str = '',
source: str = '',
group: str = '',
id: str = '',
return_type: Literal['pd', 'list'] = 'pd',
timeout: float = 10)

Gets a snapshot of data for the requested data type from the KeySquare platform.

Notes

  • Application name supplied must be permissioned to get data for the requested type.
  • Requested data type must be defined on the platform. This is typically defined by the data publisher.
  • This method will block until all snapshot data is received.

Parameters

data_type : str
The type of data to get. This is the name of the type as defined by the data publisher. An exception will be raised if the data type is not found.

schema : str, optional
The semantic schema name of the data type. Consider setting this if type names overlap across schemas to avoid conflict.

source : str, optional
The source of the data. This is set by the data publisher and is usually specific to type. By default, this is set to blank to get data from any source.

group : str, optional
The group of the data. This is set by the data publisher to group similar data together and is usually specific to type. By default, this is set to GROUP_WILDCARD to get data from any group.

id : str, optional
The id of the data. This is set by the data publisher to uniquely identify the data. By default, this is set to blank to get data from any id. Set this to a specific id to get data for a single row.

return_type : DataType, optional
The type of the return value. This can be either 'pd' (default) for a pandas DataFrame or 'list' for a list of dictionaries.

timeout : float, optional
The time in seconds to wait for the data. This is set to 10 seconds by default.

Returns

list or DataFrame
The data received from the KeySquare platform. The return type is determined by the return_type parameter.

Raises

RuntimeError
If the KeySession is not started before calling this method.

ValueError
If no schema definition is found for the data type and schema.

Example

result = ks.snapshot("Price")
print(f"Received type=[Price] return_type=[{type(result).__name__}], count=[{len(result)}]")
# Received type=[Price] return_type=[DataFrame], count=[5]

subscribe

subscribe(self,
data_type: str,
callback: Callable[[KeyMessage], None],
voided_callback: Callable[[KeyMessage], None] = lambda message: logging.info(
f"Following voided message discarded: {message}"),
schema: str = '',
source: str = ksm.SOURCE_WILDCARD,
group: str = ksm.GROUP_WILDCARD,
id: str = ksm.ID_WILDCARD,
subscription_type: SubscriptionType = SubscriptionType.CACHED_AND_LIVE,
timeout: float = 10) -> Subscription:

Subscribes to live data from the KeySquare platform.

Notes

  • Application name supplied must be permissioned to get data for the requested type.
  • Requested data type must be defined on the platform. This is typically defined by the data publisher.
  • The callback function will be called for each new update received.
  • This method will block until all snapshot data is received
  • The subscription will be kept alive until the unsubscribe method is called.

Parameters

data_type : str
The type of data to get. This is the name of the type as defined by the data publisher. An exception will be raised if the data type is not found.

callback : Callable[[KeyMessage], None]
A callback function to process the data as it is received. This function will be called for each new update received. The callback receives a KeyMessage object.

voided_callback : Callable[[KeyMessage], None], optional
A callback function to process voided messages. If not provided, voided messages will be logged and discarded. The callback receives a KeyMessage object.

schema : str, optional
The semantic schema name of the data type. Consider setting this if type names overlap across schemas to avoid conflict.

source : str, optional
The source of the data. This is set by the data publisher and is usually specific to type. By default, this is set to blank to get data from any source.

group : str, optional
The group of the data. This is set by the data publisher to group similar data together and is usually specific to type. By default, this is set to blank to get data from any group.

id : str, optional
The id of the data. This is set by the data publisher to uniquely identify the data. By default, this is set to blank to get data from any id. Set this to a specific id to get data for a single row.

Returns

Subscription
The subscription object which can be used to unsubscribe from the data.

Raises

RuntimeError
If the KeySession is not started before calling this method.

ValueError
If no schema definition is found for the data type and schema, or if no callback is provided.

Example

subscription = ks.subscribe("Price", callback = lambda message: print(f"Received message: {json.dumps(message, indent=2)}"))
# Received message: ...

unsubscribe

unsubscribe(self, unsub: Subscription)

Unsubscribes from a data subscription and cleans up associated resources.

This method removes subscription from the active subscriptions to the platform. It only sends an unsubscribe request to the platform if this is the last subscription for the given topic (no other subscriptions exist for the same schema, template, source, group, and id combination).

Notes

  • Multiple subscriptions can share the same topic but have different subscription IDs
  • The unsubscribe request to the platform is only done when the last subscription for a topic is removed
    • This optimizes network traffic by avoiding redundant unsubscribe requests
  • Message processors (callbacks) are always unregistered regardless of platform unsubscribe

Parameters

unsub : Subscription
The subscription object to unsubscribe from. This should be the subscription object returned from a previous subscribe() call.

Raises

RuntimeError
If the subscription ID is not found in the active subscriptions for the topic.

Example

# Subscribe to data
sub1 = ks.subscribe("Price", callback=lambda msg: print(f"Sub1: {msg}"))
sub2 = ks.subscribe("Price", callback=lambda msg: print(f"Sub2: {msg}"))

# Unsubscribe first subscription - no platform unsubscribe sent
ks.unsubscribe(sub1) # Only removes processor, topic still has sub2

# Unsubscribe last subscription - platform unsubscribe sent
ks.unsubscribe(sub2) # Removes processor AND sends platform unsubscribe

See Also

subscribe - Method to create subscriptions

publish

publish(self,
data_type: str,
id: str,
data: dict,
schema: str = '',
source: str = '',
group: str = 'NONE',
timeout: float = 10,
raise_error_on_failure: bool = True,
key_header: KeyHeader = None) -> dict

Publish data to the KeySquare platform.

Sends data to the KeySquare platform for the specified data type. The method blocks until a publication acknowledgment is received from the platform, ensuring reliable delivery. Your application must have publish permissions for the specified data type.

Notes

  • Application must have permission to publish data for the requested type
  • Data type must be defined on the platform (cannot create new types via Python)
  • Method blocks until publication acknowledgment is received
  • Data validation is the responsibility of the publisher
  • Use sample() to understand expected data structure

Parameters

data_type : str
The name of the data type to publish. Must match an existing type definition on the platform. Use catalog() to see available types.

id : str
The unique identifier for this data record. This becomes the topic ID and is used to uniquely identify the data on the platform. Any business identifiers should be included in the data dictionary.

data : dict
The data payload to publish as a dictionary of key-value pairs. Structure should match the schema definition for the data type. Use sample() to see the expected structure.

schema : str, optional
The semantic schema name of the data type. Specify this if multiple schemas contain data types with the same name. Default is empty string.

source : str, optional
The source identifier for this data. Defaults to the application name if not specified. Used for data provenance and routing.

group : str, optional
The group identifier for organizing related data. Default is "NONE". Used for logical grouping of data from the same source.

timeout : float, optional
Maximum time in seconds to wait for publication acknowledgment. Default is 10 seconds.

raise_error_on_failure : bool, optional
Whether to raise an exception if publication fails. If False, errors will be logged but not raised. Default is True.

key_header : KeyHeader, optional
Additional header fields to include with the published message. Allows customization of message metadata beyond defaults.

Returns

dict
The publication acknowledgment received from the platform, containing status information and any error details.

Raises

ValueError
If the data type and schema combination is not found in platform schema definitions.

RuntimeError
If the session is not started, or if publication fails when raise_error_on_failure is True.

TimeoutError
If publication acknowledgment is not received within timeout period.

PermissionError
If the application lacks permission to publish the specified data type.

Example

ks = KeySession(
application_name="trading-app",
api_key="your-api-key",
url="localhost:6800"
)

# Basic price publication
price_data = {
"symbol": "AAPL",
"price": 150.25,
"currency": "USD",
"timestamp": "2024-01-15T10:30:00Z"
}
ack = ks.publish("Price", "AAPL", price_data)
print(f"Publication status: {ack.get('status', 'Unknown')}")
# Publication status: OK

# Publication with custom grouping
trade_data = {
"tradeId": "TRD123456",
"symbol": "MSFT",
"quantity": 100,
"price": 380.50
}
ack = ks.publish(
data_type="Trade",
id="TRD123456",
data=trade_data,
group="EQUITY_TRADES",
source="TRADING_DESK_1"
)

See Also

sample - Get sample message structure for a data type
catalog - Get available data types and schemas
subscribe - Subscribe to live data updates