Typed messaging on Kafka without SchemaRegistry using Medium

Bithaus Medium is an abstraction layer to Kafka (an other messaging brokers) that allows you to use simple message classes to communicate between your services without the need of additional schema definition nor registry.

Github: https://github.com/bithauschile/bithaus-medium

This project is the result of more of a decade of experience working with distributed systems in real world situations considering development, integration and maintanance of critic mision solutions in terms of Time To Market and Technology Complexity.

Features:

  • Allows topics to move more than 1 type of message.
  • Simple message definition in a Java Class is all that is needed to define de schema.
  • No SERDES to deal with.
  • JSON format allows to interoperate easily with almost every technology.

Total order among different types of messages

In most stream processing designs, only a single message type is used in a topic by binding a single schema. While this is good to some use cases, such as data analytics, there are other real world example where you need total ordering among different types of events.

Medium allows for total order within a partition with different types of events.

One common scenario that require this design are the stock exchange trading systems. In real time trading systems the outcome of the trading depends on the order of execution of the instructions given by brokers and clients.

Trading outcome depends on total ordering

In this example, Client 1 is sending a new buy order that will trade with the present sell order in the market. But if the system cannot guarantee the total ordering of the instructions, Client 2 could win the trade resulting in an unfair situation.

Total ordering is also required for a correct execution of the instructions.

Another more common scenario that can be seen in other use cases, is that the order of the instructions is crucial for a correct execution of the client instructions. You can only cancel an order that has already been placed otherwise you will receive an error and your order will still be alive.

When a single message type allowed per topic, a common hack is to use an envelope message that can contain multiple message types. While this design pattern is good for rule checking systems (ie: monitoring tool) because it allows to create complex formulas, it is not very good for transactional pipelines where simple event definition allows for simple processing logic.

Small teams benefit from replacing the Schema Registry with the Project’s Message Library

When we started in the Kafka world, we were amazed by total ordering within a topic and message retention. That allowed us to imagine all kinds of new applications that with little coding could solve complex problems.

But one thing delayed us in the development and deployment of projects that were constantly changing: Schema Registry. In Java, you have to define the schema first in a non-functional file. From there, you generate the Java Class that has the attributes that have been defined and also the SERDES (serializer/deserializer). The result is an unreadable Java class that had to be regenerated every time a change was made, resulting in a new schema registry ID. Do enough times this process and the risk of getting messages rejected because of schema problemas increment dramatically.

In Bithaus and other teams that I have worked on, usually there is a Java project with all the messages that a system handles. It works like an SDK forcing every application and developer to use the same semantics. Messages look like this.

class NewOrder extends MediumMessage {

  private String orderId;
  private String symbol;
  private Side side;
  private BigDecimal quantity;
  private BigDecimal price;
  
  (...) // Constructor, getters, setters
}

This way of representing the schema of the data that the system in handling is all that team needs. Is it simple, straight forward and present on every application that need to interact with the messaging. No additional schema definition nor SERDEs creation is required.

Kafka Producer/Consumer API with Medium

Medium Messaging Service allows you to interact with the Producer/Consumer APIs in a very simple way.

Map configMap = ... // Uses Kafka style configuration
MediumMessagingServiceConfig config = new MediumMessagingServiceConfig(configMap);
MediumMessagingService instance = new MediumMessagingService(config);

instance.addMessageListener(NewOrder.class, new MediumMessageListener<NewOrder>() {
    @Override
    public String getName() {
        return "name-for-debug";
    }

    @Override
    public void onMessage(NewOrder message) throws MediumMessageListenerException {          
        System.out.println("NewOrder: " + message.toString() );
    } 
});

NewOrder newOrder = new NewOrder(...);
instance.send(newOrder);

// or if you know the topic name
instance.send(newOrder, "topic1");   

Using Kafka Streams

Medium also provides Kafka Streams interaction to help manage SERDES on stream processing and State Stores. The following example declares a stream processor that read “TestMessages” from the upstream topology and writes “OtherMessage” to the downstream topology.

 public static class TestMediumProcessor extends MediumProcessor<TestMessage,OtherMessage> {

    private KeyValueStore<String,String> stateStore;         

    @Override
    public void init(Map<String, Object> configMap) {
        this.stateStore = this.getStateStore(storeName);
        
        this.setBadDataConsumer((t) -> { badDataSet.add(t); });
        this.setDeadLetterConsumer((t) -> { deadLetterSet.add(t); });
    }
    
    @Override
    public Collection<OtherMessage> onMessage(TestMessage message) {
        
        this.stateStore.put(message.getMetadata().getKey(), message.getData());
        this.stateStore.flush();
        
        OtherMessage other = new OtherMessage(message.getData() + "-OK");
        
        return Collections.singleton(other);
    }
    
    public static ProcessorSupplier<String,String,String,String> supplier() {            
        return TestMediumProcessor::new;            
    }

    @Override
    public Class<TestMessage> getInputMessageClass() {
        return TestMessage.class;
    }        
}        

Not only Kafka

Medium Messaging Service can interact with all kind of underlying message/event technology. If you are using Azure Service Bus, you can use the following driver.

https://github.com/bithauschile/bithaus-medium-azure

Conclusion

Having schemas on our data streams is a good thing because it allows us to coordinate teams better and improve data quality. Medium provides a simple way of defining data structure just by declaring the message structure, don’t need a custom SERDE for every schema and allows multiple message types to flow within a topic.

If you are trying Medium, please let us know! 🙂 Feedback and feature request is very welcome.

Publicado por jmakuc

Coder for life, solution architect, founder of Bithaus Software and Datadog partner for Chile. Come in, let's have an expresso!

A %d blogueros les gusta esto: