Subscription
An application can subscribe to data by creating a topic and the using the subscribe method on the session.
Subscribing to Data
session.subscribe(<subscription-type>, <subscription-listener>, <topics>);
| Method Argument | Description |
|---|---|
| subscription type | One of the following options: LIVE, CACHED, CACHED_AND_LIVE (see details below) |
| subscription listener | A callback informing that all data is received |
| topics | One or more topics for which data is being requested |
Details of Subscription Types:
- LIVE - live data, which is data being published on the platform at the present time
- CACHED - a snapshot of data that has been stored in the cache
- CACHED_AND_LIVE - both live and cached data will be provided. The api will automatically ensure that live data takes precedence over cached data, in that in-flight live messages will always take precedence over cached data.
Examples:
session.subscribe(SubscriptionType.LIVE, new LoggingSubscriptionListener("data"), topic);
session.subscribe(SubscriptionType.CACHED_AND_LIVE, topic1, topic2);
Subscription Listener
A SubscriptionListener interface will be called back. This interface has one method
void complete(long count);
This signals to the application that the subscription has been carried out and in the case where cached data was requested, that all cached data has been provided. The message contains a count field that indicates the count of data provided where a cache interaction took place.
This event can be used as a trigger to perform further work. For example, it may be necessary to subscribe to all Trade data when an application is starting up and hold back its processing until all available Trades are received. In this case, when the complete method is called, the application can use this event to trigger the next phase of work.
Topics
Topics.create(<data-type>, <source>, <group>, <id>);
| Method Argument | Description |
|---|---|
| data-type (optional) | DTO class type or SBE Decoder class type. Not required for Flexible Messaging (Maps, POJOs). |
| source | The source of interest. This can be wildcarded using Topic.SOURCE_WILDCARD |
| group | The group of interest. This can be wildcarded using Topic.GROUP_WILDCARD |
| id | The id of interest. This can be wildcarded using Topic.ID_WILDCARD |
Examples:
// notice that no type needs to be provided when using Flexible Messaging
Topic topic = Topics.create("source", "group", "id");
// notice that the DTO type is specified as the first argument
Topic topic = Topics.create(Price.class, "price-source", "price-group", "price-id");
// notice that the SBE decoder type is specified as the first argument
Topic topic = Topics.create(RfqDecoder.class, "rfq-source", "rfq-group", "rfq-id");
Topic Wildcards
In the examples above, very specific topics are being created, which would identify a single item of data. It is possible to subscribe to data more generically by supplying wildcards at any level.
Topic topic = Topics.create(Price.class, Topic.SOURCE_WILDCARD, Topic.GROUP_WILDCARD, Topic.ID_WILDCARD);
Message Routing
When messages are received in a process, they are routed based on their type.
Use the register method on the session to define routing for a given data type.
session.register(<data-type>, <message-processor>);
session.register(<data-type>, <message-processor>, <voided-message-processor>);
| Method Argument | Description |
|---|---|
| data-type | A Java Map, Java POJO, DTO or SBE type |
| message-processor | The processor where messages the specified type should be routed |
| voided-message-processor | The processor where voided messages will be routed. |
In the first case where a voided-message-processor isn't specified, any voided messages will routed to a default handler which logs the message - this avoids accidental processing of voided messages. See Message Flags for more details on voided messages.
Java Map Routing
// register a handler for a Java Map messages (all derivatives of java.util.Map)
session.register(Map.class, this::processMap);
...
// all derivatives of type java.util.Map will be routed here...
private void processMap(Map<String, Object> map, KeyHeader keyHeader) {
log.info("Received Map: {}, keyHeader: {}", map, keyHeader);
}
Java Object Routing
// register a handler for a Java POJO called SamplePojo
session.register(SamplePojo.class, this::processSamplePojo);
...
// all messages of type SamplePojo will be routed here...
private void processSamplePojo(SamplePojo samplePojo, KeyHeader keyHeader) {
log.info("Received Pojo: {}, keyHeader: {}", samplePojo, keyHeader);
}
DTO Routing
// register a handler for Price messages
session.register(Price.class, this::processPrice);
...
// all messages of type PriceDto will be routed here...
private void processPrice(Price price) {
log.info("Received Price DTO: {}", price);
}
SBE Routing
// register a handler for SBE Rfq messages
session.register(RfqDecoder.class, this::processRfq);
...
// all messages of type SBE Rfq will be routed here...
private void processRfq(RfqDecoder rfq) {
log.info("Received RFQ SBE: {}", rfq);
}