Skip to main content

Subscription

An application can subscribe to data by creating a topic and the using the subscribe method on the session.

Subscribing to Data

Subscribe Syntax
session.subscribe(<subscription-type>, <subscription-listener>, <topics>);
Method ArgumentDescription
subscription typeOne of the following options: LIVE, CACHED, CACHED_AND_LIVE (see details below)
subscription listenerA callback informing that all data is received
topicsOne or more topics for which data is being requested

Details of Subscription Types:

  • LIVE - live data, which is data being published on the platform at the present time
  • CACHED - a snapshot of data that has been stored in the cache
  • CACHED_AND_LIVE - both live and cached data will be provided. The api will automatically ensure that live data takes precedence over cached data, in that in-flight live messages will always take precedence over cached data.

Examples:

Subscribe to live data for a topic applying the identifier 'Data'
session.subscribe(SubscriptionType.LIVE, new LoggingSubscriptionListener("data"), topic);
Subscribe to both cached and live data for topic1 and topic2 with a default NO_OP SubscriptionListener
session.subscribe(SubscriptionType.CACHED_AND_LIVE, topic1, topic2);

Subscription Listener

A SubscriptionListener interface will be called back. This interface has one method

void complete(long count);

This signals to the application that the subscription has been carried out and in the case where cached data was requested, that all cached data has been provided. The message contains a count field that indicates the count of data provided where a cache interaction took place.

This event can be used as a trigger to perform further work. For example, it may be necessary to subscribe to all Trade data when an application is starting up and hold back its processing until all available Trades are received. In this case, when the complete method is called, the application can use this event to trigger the next phase of work.

Topics

Topic Creation
Topics.create(<data-type>, <source>, <group>, <id>);
Method ArgumentDescription
data-type (optional)DTO class type or SBE Decoder class type. Not required for Flexible Messaging (Maps, POJOs).
sourceThe source of interest. This can be wildcarded using Topic.SOURCE_WILDCARD
groupThe group of interest. This can be wildcarded using Topic.GROUP_WILDCARD
idThe id of interest. This can be wildcarded using Topic.ID_WILDCARD

Examples:

Create a topic for Flexible Messaging (e.g. Maps, POJOs)
// notice that no type needs to be provided when using Flexible Messaging
Topic topic = Topics.create("source", "group", "id");
Create a topic for Price Dto messages
// notice that the DTO type is specified as the first argument
Topic topic = Topics.create(Price.class, "price-source", "price-group", "price-id");
Create a topic for Rfq SBE messages
// notice that the SBE decoder type is specified as the first argument
Topic topic = Topics.create(RfqDecoder.class, "rfq-source", "rfq-group", "rfq-id");

Topic Wildcards

In the examples above, very specific topics are being created, which would identify a single item of data. It is possible to subscribe to data more generically by supplying wildcards at any level.

Create a topic for all Price DTO messages
Topic topic = Topics.create(Price.class, Topic.SOURCE_WILDCARD, Topic.GROUP_WILDCARD, Topic.ID_WILDCARD);

Message Routing

When messages are received in a process, they are routed based on their type.

Use the register method on the session to define routing for a given data type.

Message Routing Syntax
session.register(<data-type>, <message-processor>);
session.register(<data-type>, <message-processor>, <voided-message-processor>);
Method ArgumentDescription
data-typeA Java Map, Java POJO, DTO or SBE type
message-processorThe processor where messages the specified type should be routed
voided-message-processorThe processor where voided messages will be routed.

In the first case where a voided-message-processor isn't specified, any voided messages will routed to a default handler which logs the message - this avoids accidental processing of voided messages. See Message Flags for more details on voided messages.

Java Map Routing

// register a handler for a Java Map messages (all derivatives of java.util.Map)
session.register(Map.class, this::processMap);

...

// all derivatives of type java.util.Map will be routed here...
private void processMap(Map<String, Object> map, KeyHeader keyHeader) {
log.info("Received Map: {}, keyHeader: {}", map, keyHeader);
}

Java Object Routing

// register a handler for a Java POJO called SamplePojo
session.register(SamplePojo.class, this::processSamplePojo);

...

// all messages of type SamplePojo will be routed here...
private void processSamplePojo(SamplePojo samplePojo, KeyHeader keyHeader) {
log.info("Received Pojo: {}, keyHeader: {}", samplePojo, keyHeader);
}

DTO Routing

// register a handler for Price messages
session.register(Price.class, this::processPrice);

...

// all messages of type PriceDto will be routed here...
private void processPrice(Price price) {
log.info("Received Price DTO: {}", price);
}

SBE Routing

// register a handler for SBE Rfq messages
session.register(RfqDecoder.class, this::processRfq);

...

// all messages of type SBE Rfq will be routed here...
private void processRfq(RfqDecoder rfq) {
log.info("Received RFQ SBE: {}", rfq);
}