logo logo

Event Driven Architecture: How to Perform Contract Testing in Kafka/PubSub

Event Driven Architecture: How to Perform Contract Testing in Kafka/PubSub

In the previous article, we discussed how to use the Spring Cloud Contract to write contract testing. In this article, we will look at how to do contract testing in the Event Driven Architecture system.

Table of Contents

  1. Introduction to Consumer Contract Testing
  2. Consumer-Driven Contract Testing using Pact.js
  3. Consumer-Driven Contract Testing using Pact Java
  4. Consumer-Driven Contract Testing using Spring Cloud Contracts
  5. You’re here → Event Driven Architecture: How to Perform Contract Testing in Kafka/PubSub
  6. Integrating Contract Testing in Build Pipelines

What is Event Driven Architecture?

Event Driven Architecture is a Software Architecture and model for application design. It is a highly popular distributed asynchronous architecture pattern used to produce highly scalable applications. An event driven architecture is loosely coupled because event producers don’t know which event consumers are listening for an event, and the event doesn’t know what the consequences are of its occurrence.

What’s An Event?

An Event is any significant occurrence or change in the state of system hardware or software. Event is a message or notification sent by the system to notify other parts of the system that an event takes place.

Event Driven Architecture has three key components:

  1. Event producer
  2. Event router
  3. Event consumer.

A producer publishes the event to the event router which then filters the event and pushes it to appropriate consumers. Both producer and consumer are highly decoupled so it is highly scalable, testable, high performant, and deployed independently.

How Does Event Driven Architecture Work?

Here is an example of a wonderful illustration by AWS on how Event-Driven Architecture works for an eCommerce site:

Event-Driven-Architecture_Diagram

Event Driven Architecture Models

An event-driven architecture may be based on either a pub/sub model or an event stream model.

Pub/sub model

Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events. More details about the Pub/Sub model can be read here.

Event streaming model

Consumers read from any part of the event stream which is basically a log and can join the stream at any time. More details about streaming architecture can be read here.

In a distributed asynchronous architecture pattern different message queues use different protocols, whereas in HTTP based micro-services all the micro-services only communicated in HTTP protocol. We need to ensure that the service communication over message queue between producer and consumer needs to be compliant in terms of the contract messages exchanged. The contract test at the consumer end generates a pact file and the same is verified by the message provider which generates the correct message.

Sample Project Using Spring Kafka

To demonstrate the consumer-driven contract test in the asynchronous event-driven application we developed a sample producer and consumer using Spring Kafka.

Apache Kafka

Apache Kafka is a distributed data streaming platform that is a popular event processing choice. It can handle publishing, subscribing to, storing, and processing event streams in real-time.

Spring Kafka

The Spring for Apache Kafka (spring-Kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions.

It provides a ‘template’ as a high-level abstraction for sending messages. It also contains support for Message-driven POJOs with @KafkaListener annotations and a listener container.

Contract Testing in Kafka - Message Queue

To start the server, we can follow the instructions mentioned here. Download the latest version of Kafka from here.

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with Kafka to get a quick-and-dirty single-node ZooKeeper instance.

> bin/zookeeper-server-start.sh config/zookeeper.properties

Now start the Kafka server:

> bin/kafka-server-start.sh config/server.properties

❗ Note: Topics are created automatically from Spring Kafka modules.

Date Producer Spring Kafka module produces a message and publishes the same in Kafka’s topic and the same is being consumed by a Date Consumer Spring Kafka module. To perform the consumer-driven contract testing between date producer and date consumer modules we once again picked Pact to write consumer-driven contracts. Feel free to give it a try with other streaming platforms and do share your feedback in the comments section. Spring Cloud Contract also supports performing contract tests when Kafka is used for streaming messages between producer and consumer.

Producing a Contract

Consumer-Driven Contract testing begins with a consumer defining the contract. Before we start writing code, we have to add the following dependency to our project:

<dependency>
    <groupId>au.com.dius</groupId>
    <artifactId>pact-jvm-consumer-junit5</artifactId>
    <version>4.0.9</version>
    <scope>test</scope>
</dependency>

Consumer tests start with creating message expectations. Below is the message we are expecting to receive from the queue where the message is published by the producer. Everything is self-explanatory in the below MessagePactBuilder.

@Pact(consumer = "dateConsumerKafka")
public MessagePact validDateMessageFromKafkaProvider(MessagePactBuilder builder) {
    return builder
            .expectsToReceive("valid date from kafka provider")
            .withContent(LambdaDsl.newJsonBody((object) -> {
                object.dateExpression("localDate", "^\\d{4}-\\d{2}-\\d{2}$","yyyy-MM-dd");
                object.booleanType("isLeapYear", true);
            }).build())
            .toPact();
}

Below is the sample test that de-serialize the message from the handler and validates the expectations. It uses ObjectMapper from Jackson library to read the value from the messages and de-serialize into the expected class. This is the same way the actual message gets de-serialized.

@SneakyThrows
@Test
@PactTestFor(pactMethod = "validDateMessageFromKafkaProvider")
public void testValidDateFromProvider(List<Message> messages) {
    assertThat(messages).isNotEmpty();
    assertThat(new ObjectMapper().readValue(new String(messages.get(0).contentsAsBytes()), ConsumerDateInfo.class))
            .hasFieldOrProperty("localDate")
            .hasFieldOrPropertyWithValue("isLeapYear", true);
}

The last thing we need to add before we run the tests is the annotations used to let the test class know that we want to bring up the Spring context and enable Pact. ProviderType needs to be set ASYNCH in @PactTestFor annotation along with the actual provider name.

@ExtendWith(PactConsumerTestExt.class)
@PactTestFor(providerName = "dateProviderKafka", providerType = ProviderType.ASYNCH)
public class DateConsumerTest {

Maven command to execute DateConsumerTest is below:

mvn clean -Dtest=DateConsumerTest test -pl date-consumer-kafka

Results are below:

[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.example.consumer.pact.DateConsumerTest
21:26:00.136 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - Found @PactTestFor annotation on test method
21:26:00.140 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - Found @PactTestFor annotation on test class
21:26:00.141 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - providerInfo = ProviderInfo(providerName=dateProviderKafka, hostInterface=, port=, pactVersion=null, providerType=ASYNCH)
21:26:00.144 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - Looking for @Pact method named 'validDateMessageFromKafkaProvider' for provider 'dateProviderKafka'
21:26:00.146 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - Invoking method 'validDateMessageFromKafkaProvider' to get Pact for the test 'testValidDateFromProvider'
21:26:00.457 [main] DEBUG au.com.dius.pact.consumer.junit5.PactConsumerTestExt - Writing pact dateConsumerKafka -> dateProviderKafka to file target/pacts/dateConsumerKafka-dateProviderKafka.json
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.401 s - in com.example.consumer.pact.DateConsumerTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  6.434 s
[INFO] Finished at: 2020-05-17T21:26:00+05:30
[INFO] ------------------------------------------------------------------------
➜  ContractTestingBoilerplate git:(master) ✗

Apart from the verification of our test case, the JSON file containing a contract has been generated in the target directory (target/pacts).

{
  "consumer": {
    "name": "dateConsumerKafka"
  },
  "provider": {
    "name": "dateProviderKafka"
  },
  "messages": [
    {
      "description": "valid date from kafka provider",
      "metaData": {
        "contentType": "application/json"
      },
      "contents": {
        "isLeapYear": true,
        "localDate": "2000-01-31"
      },
      "matchingRules": {
        "body": {
          "$.localDate": {
            "matchers": [
              {
                "match": "date",
                "date": "yyyy-MM-dd"
              }
            ],
            "combine": "AND"
          },
          "$.isLeapYear": {
            "matchers": [
              {
                "match": "type"
              }
            ],
            "combine": "AND"
          }
        }
      }
    }
  ],
  "metadata": {
    "pactSpecification": {
      "version": "3.0.0"
    },
    "pact-jvm": {
      "version": "4.0.9"
    }
  }
}

Every message entry has:

  • Description
  • Metadata
  • Contents – actual contents of the message produced by the producer.
  • Matching rules

Everything happened at the consumer end so far. We need to share the contract with the producer of the message to validate it.

Like any messaging based application, consumers need to create a receiver that will handle the published messages. The Consumer is nothing more than a simple POJO that defines a method for receiving messages. In the below example we named the method receive().

The @KafkaListener annotation creates a ConcurrentMessageListenerContainer message listener container behind the scenes for each annotated method. To do so, a factory bean with name kafkaListenerContainerFactory is expected that we will configure in the next section. KafkaListener takes the name of the topic to listen to.

@Service
public class Consumer {

    public ConsumerDateInfo consumerDateInfo;

    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    @SneakyThrows
    @KafkaListener(topics = "testing")
    public void receive(ConsumerDateInfo consumerDateInfo) {
        String consumerInfo = new ObjectMapper().writeValueAsString(consumerDateInfo);
        LOGGER.info("received consumer='{}'", consumerInfo);
        this.consumerDateInfo = consumerDateInfo;
    }
}

ConsumerConfiguration is the class where we set initial configuration and de-serialization parameters.

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}

Now it’s time for the producers to verify the contract messages shared via pact broker.

Verifying the Contract

In our case, the provider is a simple Spring Kafka application. For sending messages we will be using the KafkaTemplate which wraps a Producer and provides convenient methods to send data to Kafka topics. The template provides asynchronous send methods which return a ListenableFuture.

@Autowired
private KafkaTemplate<String, ProducerDateInfo> kafkaTemplate;

@Value(value = "${kafka.topic}")
private String topicName;

public void send() {
    LocalDate localDate = LocalDate.now();
    kafkaTemplate.send(topicName, generateMessage(localDate));
}

@NotNull
public ProducerDateInfo generateMessage(LocalDate localDate) {
    return new ProducerDateInfo(localDate.toString(), localDate.isLeapYear());
}

ProducerConfiguration is the class where we set initial configuration and serialization parameters.

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value(value = "${kafka.topic}")
private String topicName;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return props;
}

Date Producer Spring Kafka module also exposes endpoints to publish messages through it.

@RestController
@RequestMapping(value = "/kafka")
public class DateProducerController {

    private final Producer producer;

    @Autowired
    DateProducerController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic() {
        this.producer.send();
    }
}

First, we have to add a dependency to the Pact provider library:

<dependency>
    <groupId>au.com.dius</groupId>
    <artifactId>pact-jvm-provider-junit5</artifactId>
    <version>4.0.10</version>
</dependency>

The pact will pretend to be a message queue and get the producer to publish the appropriate message. The same will be matched against the published pact file. Instead of HttpTarget we use AmqpTarget to drive the behavior said above. Pact-JVM will look for @PactVerifyProvider that has the matching description as that of the pact file. Sample Producer test will look like below:

@RunWith(PactRunner.class)
@Provider("dateProviderKafka")
@Consumer("dateConsumerKafka")
@PactBroker(host = "localhost", port = "8282")
public class DateProducerTest {

    @TestTarget
    public final Target target = new AmqpTarget();

    @SneakyThrows
    @PactVerifyProvider("valid date from kafka provider")
    public String verifyDateInformationMessage() {
        return new ObjectMapper().writeValueAsString(new Producer().generateMessage(LocalDate.now()));
    }
}

Maven command to execute above DateProducerTest is below:

mvn clean -Dtest=DateProducerTest test -pl date-producer-kafka
[INFO] Running com.example.producer.pact.DateProducerTest

Verifying a pact between dateConsumerKafka and dateProviderKafka
  valid date from kafka provider
21:22:55.193 [main] DEBUG au.com.dius.pact.provider.junit.target.AmqpTarget - Classloader = null
21:22:55.194 [main] DEBUG au.com.dius.pact.provider.ProviderVerifier - projectClasspath = []
21:22:57.635 [main] DEBUG au.com.dius.pact.provider.ProviderVerifier - found class public class com.example.producer.pact.DateProducerTest
21:22:57.637 [main] DEBUG au.com.dius.pact.provider.ProviderVerifier - found method [@au.com.dius.pact.provider.PactVerifyProvider("valid date from kafka provider") public java.lang.String verifyDateInformationMessage()]
21:22:57.648 [main] DEBUG au.com.dius.pact.provider.ProviderVerifier - Found methods = [public java.lang.String com.example.producer.pact.DateProducerTest.verifyDateInformationMessage()]
    generates a message which
21:22:57.814 [main] DEBUG au.com.dius.pact.core.matchers.JsonBodyMatcher - compareValues: Matcher defined for path [$, isLeapYear]
21:22:57.817 [main] DEBUG au.com.dius.pact.core.matchers.MatcherExecutor - comparing type of true to true at [$, isLeapYear]
21:22:57.817 [main] DEBUG au.com.dius.pact.core.matchers.JsonBodyMatcher - compareValues: Matcher defined for path [$, localDate]
21:22:57.817 [main] DEBUG au.com.dius.pact.core.matchers.MatcherExecutor - comparing "2020-05-17" to date pattern yyyy-MM-dd at [$, localDate]
      has a matching body (OK)
      has matching metadata (OK)
21:22:57.831 [main] DEBUG au.com.dius.pact.provider.DefaultTestResultAccumulator - Received test result 'au.com.dius.pact.core.pactbroker.TestResult$Ok@502f8b57' for Pact dateProviderKafka-dateConsumerKafka and valid date from kafka provider
21:22:57.833 [main] DEBUG au.com.dius.pact.provider.DefaultTestResultAccumulator - Number of interactions #1 and results: [au.com.dius.pact.core.pactbroker.TestResult$Ok@502f8b57]
21:22:57.833 [main] DEBUG au.com.dius.pact.provider.DefaultTestResultAccumulator - All interactions for Pact dateProviderKafka-dateConsumerKafka have a verification result
21:22:57.833 [main] WARN au.com.dius.pact.provider.DefaultTestResultAccumulator - Skipping publishing of verification results as it has been disabled (pact.verifier.publishResults is not 'true')
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.305 s - in com.example.producer.pact.DateProducerTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  12.809 s
[INFO] Finished at: 2020-05-17T21:22:58+05:30
[INFO] ------------------------------------------------------------------------
➜  ContractTestingBoilerplate git:(master) ✗

By default, publishing of verification results is disabled and the same can be enabled using maven plugin or through environment variables.

The entire working code snippets can be found here.

In the next chapter, we will see how to integrate contract testing to build pipelines. Stay tuned 🙂

Happy Testing 😎
Srinivasan Sekar & Sai Krishna

About the author

Srinivasan Sekar

Srinivasan Sekar is a Lead Consultant at ThoughtWorks. He loves contributing to Open Source. He is an Appium Member and Selenium Contributor as well. He worked extensively on testing various Mobile and Web Applications. He specializes in building automation frameworks. He has also spoken at various conferences including SeleniumConf, AppiumConf, SLASSCOM, BelgradeTestConf, QuestForQualityConf, and FOSDEM.

Comments

31 1 comment

Leave a Reply

FacebookLinkedInTwitterEmail