Spring Integration using JSON serialization

Posted at — Jul 10, 2024
Riekpil logo
Learn how to test real-world applications with the Testing Spring Boot Applications Masterclass. Comprehensive online course with 8 modules and 130+ video lessons to master well-known Java testing libraries: JUnit 5, Mockito, Testcontainers, WireMock, Awaitility, Selenium, LocalStack, Selenide, and Spring's Outstanding Test Support.

In my previous blog post Transactional Outbox pattern with Spring Boot, I compared Spring Integration with Spring Modulith to implement the microservices outbox pattern. I mentioned that a drawback of Spring Integration is the fact that Java serialization is used so the data in the database is not readable with standard database tooling.

In this blog post, I will show to configure Spring Integration to use Jackson instead so that the data in the database is readable JSON. This can provide convenient for debugging and troubleshooting purposes. The Spring Integration documentation has a small section on Custom Message Insertion that explains the basic idea to support JSON, but I have found it to have some challenges to get it fully working.

For context, this is the setup of Spring Integration using Java serialization:

@Configuration
public class SpringIntegrationConfiguration {

  private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";

  @Bean
  JdbcChannelMessageStore jdbcChannelMessageStore(
      DataSource dataSource) {
    JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());
    return jdbcChannelMessageStore;
  }
}

To support serialization to and deserialization from JSON, we have to create custom implementations of two classes: ChannelMessageStorePreparedStatementSetter and MessageRowMapper

This is the code for the ChannelMessageStorePreparedStatementSetter

private static class JacksonMessageStorePreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { (1)

  private final ObjectMapper objectMapper;

  public JacksonMessageStorePreparedStatementSetter(ObjectMapper objectMapper) {
    this.objectMapper = objectMapper;
  }

  @Override
  public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region,
      boolean priorityEnabled) throws SQLException {
    super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
    try {
      String json = objectMapper.writeValueAsString(requestMessage); (2)
      preparedStatement.setObject(6, json, java.sql.Types.OTHER); (3)
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Unable to store message", e);
    }
  }
}
1 Extend from the default ChannelMessageStorePreparedStatementSetter.
2 Use the injected ObjectMapper to get the JSON.
3 Write the JSON string to the database. The java.sql.Types.OTHER is there to ensure we are using the JSON database type of PostgreSQL.

To support saving JSON instead of bytes to the database, we have to update the Flyway script that generates the Spring Integration tables:

CREATE TABLE _spring_integration_CHANNEL_MESSAGE
(
    MESSAGE_ID           CHAR(36)     NOT NULL,
    GROUP_KEY            CHAR(36)     NOT NULL,
    CREATED_DATE         BIGINT       NOT NULL,
    MESSAGE_PRIORITY     BIGINT,
    MESSAGE_SEQUENCE     BIGINT       NOT NULL DEFAULT NEXTVAL('_spring_integration_MESSAGE_SEQ'),
    MESSAGE_BYTES        JSON, (1)
    REGION               VARCHAR(100) NOT NULL,
    CONSTRAINT _spring_integration_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
1 Using JSON datatype for the MESSAGE_BYTES column.

To read the JSON, we create a subclass of MessageRowMapper:

private static class JacksonMessageRowMapper extends MessageRowMapper {

  private final ObjectMapper objectMapper;

  public JacksonMessageRowMapper(ObjectMapper objectMapper) {
    super(null, null);
    this.objectMapper = objectMapper;
  }

  @Override
  public Message<?> mapRow(ResultSet rs, int rowNum) throws SQLException {
    try {
      String s = rs.getString(rs.findColumn("MESSAGE_BYTES")); (1)
      return objectMapper.readValue(s, new TypeReference<>() {}); (2)
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Unable to read message", e);
    }
  }
}
1 Get the JSON string from the MESSAGE_BYTES column in the database.
2 Convert the string back to a Message<?> object via the ObjectMapper.

After creating both classes, we need to expose them as a Spring Bean and use them into our JdbcChannelMessageStore configuration:

@Configuration
public class SpringIntegrationConfiguration {

  private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
  private final ObjectMapper springIntegrationObjectMapper;

  public SpringIntegrationConfiguration() {
    springIntegrationObjectMapper = JacksonJsonUtils.messagingAwareMapper(
        "com.wimdeblauwe.examples.transactional_outbox_spring_integration_json"); (1)
  }

  @Bean
  JdbcChannelMessageStore jdbcChannelMessageStore(
      DataSource dataSource,
      ChannelMessageStorePreparedStatementSetter preparedStatementSetter,
      MessageRowMapper messageRowMapper) {
    JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());
    jdbcChannelMessageStore.setPreparedStatementSetter(preparedStatementSetter);
    jdbcChannelMessageStore.setMessageRowMapper(messageRowMapper);
    return jdbcChannelMessageStore;
  }

  @Bean
  ChannelMessageStorePreparedStatementSetter channelMessageStorePreparedStatementSetter() {
    return new JacksonMessageStorePreparedStatementSetter(springIntegrationObjectMapper);
  }

  @Bean
  MessageRowMapper messageRowMapper() {
    return new JacksonMessageRowMapper(springIntegrationObjectMapper);
  }

  // inner classes JacksonMessageStorePreparedStatementSetter and JacksonMessageRowMapper omitted
}
1 Spring Integration has a JacksonJsonUtils class that can give a Jackson ObjectMapper that knows how to properly serialize Message objects to JSON. We use the factory method messagingAwareMapper() to create a new instance passing in our root package so classes of that package (or sub-packages) can be deserialized.

Note that I am not exposing the ObjectMapper as a Spring bean as that would override the default ObjectMapper in the Spring Boot application. For that reason, I just create it in the constructor and inject it manually in our two beans.

With this configuration in place, we can again test using the test endpoint. The database has now a JSON version of our message (formatted for clarity):

{
  "@class": "org.springframework.messaging.support.GenericMessage",
  "payload": {
    "@class": "com.wimdeblauwe.examples.transactional_outbox_spring_integration_json.infrastructure.mail.MailMessage",
    "subject": "Order 8 completed",
    "body": "Your order is registered in our system and will be processed.",
    "to": "test@example.com"
  },
  "headers": {
    "@class": "java.util.HashMap",
    "replyChannel": "nullChannel",
    "errorChannel": "",
    "id": [
      "java.util.UUID",
      "a831fd04-72e7-9c4e-f49c-f2ab3f785928"
    ],
    "timestamp": [
      "java.lang.Long",
      1720604864891
    ]
  }
}

Conclusion

It is possible to use Jackson for serialization of the Spring Integration messages in case you like to have a more readable format in your database.

See transactional-outbox-spring-integration-json on GitHub for the full sources of this example.

If you have any questions or remarks, feel free to post a comment at GitHub discussions.

If you want to be notified in the future about new articles, as well as other interesting things I'm working on, join my mailing list!
I send emails quite infrequently, and will never share your email address with anyone else.