camel aggregator EIP

Camel With Aggregator EIP with simple steps.


In this tutorial we learn about aggregator EIP in camel.In Aggregator EIP related messages are combined and created one single aggregate message.In Aggregator we need to configure three settings.

  • Correlation identifier : By this camel identify which messages are related.
  • Completion condition : A condition that determines when aggregate message is to be sent
  • Aggregation strategy : Its an AggregationStrategy that specify how messages should be combined.

Example

In our example we will send message one, two, three, four, five and will combine those message in single message.From test case we will validate our output.So let us start

  • In eclipse go to MyFirstCamelProject that we created in earlier page and right click on src/main/java to create package.New -> package



  • Give any name for example com.preparationforinterview and click on Finish



  • Right click on package and create a java class for example MyRoute.



  • Add following dependency in pom.xml file.you can use any latest version.

  •  	
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>MyFirstCamelProject</groupId>
    	<artifactId>MyFirstCamelProject</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.camel</groupId>
    			<artifactId>camel-test</artifactId>
    			<version>2.5.0</version>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>4.8.1</version>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    </project>
    
  • Run mvn:install from command prompt or if you are using Eclipse right click on pom.xml and run mvn install.It will download camel jars

  • Copy below code in MyRoute.

  •  	
    package com.preparationforinterview;
    
    import org.apache.camel.builder.RouteBuilder;
    
    public class MyRoute extends RouteBuilder {
    
    	@Override
    	public void configure() throws Exception {
    		from("direct:start")
    				.log("Sending  with correlation key ")
    				.aggregate(header("Id"), new MyAggregationStrategy())
    				.completionSize(4)
    				.log("After completion condition,message is sent to out ")
    				.to("mock:output");
    
    	}
    
    }
    
    

    Explain

    To create a route in camel we need to extend RouteBuilder class and override configure method.In configure method we define our route.In above route we are accepting message from in memory route and logging it to see the content of the message.After that we are calling aggregate method where we are passing correlation identifier.Correlation identifier describes which messages are related.As a second parameter we are passing AggregationStrategy where we define our rule how message should be combined.In aggregator we have to define completion condition by which aggregator knows when message should be sent to out.

  • Right click on package and create one more java class for example MyAggregationStrategy.



  • Copy below code in MyAggregationStrategy.

  •  	
    package com.preparationforinterview;
    
    import org.apache.camel.Exchange;
    import org.apache.camel.processor.aggregate.AggregationStrategy;
    
    public class MyAggregationStrategy implements AggregationStrategy {
    	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    		if (oldExchange == null) {
    			return newExchange;
    		}
    		String oldMessageBody = oldExchange.getIn().getBody(String.class);
    		String newMessageBody = newExchange.getIn().getBody(String.class);
    		String combinedBody = oldMessageBody +" " + newMessageBody;
    		oldExchange.getIn().setBody(combinedBody);
    		return oldExchange;
    	}
    }
    

    Explain

    To create a AggregationStrategy in camel we need to implements AggregationStrategy and override aggregate method.In aggregate method we define how message should be combined.

    To test above aggregator we will create one class AggregatorTest.java.



  • At the end we will have following structure


  • Copy below code in AggregatorTest.java.

  •  	
    package com.preparationforinterview;
    
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.mock.MockEndpoint;
    import org.apache.camel.test.junit4.CamelTestSupport;
    import org.junit.Test;
    
    public class AggregatorTest extends CamelTestSupport {
    	@Override
    	protected RouteBuilder createRouteBuilder() throws Exception {
    		return new MyRoute();
    	}
    
    	@Test
    	public void testFileCopy() throws Exception {
    		MockEndpoint mock = getMockEndpoint("mock:output");
    		mock.expectedBodiesReceived("one two four five");
    		template.sendBodyAndHeader("direct:start", "one", "Id", 1);
    		template.sendBodyAndHeader("direct:start", "two", "Id", 1);
    		template.sendBodyAndHeader("direct:start", "three", "Id", 2);
    		template.sendBodyAndHeader("direct:start", "four", "Id", 1);
    		template.sendBodyAndHeader("direct:start", "five", "Id", 1);
    		assertMockEndpointsSatisfied();
    
    
    	}
    }
    

    Explain

    To get camel test kit advantage we need to extend CamelTestSupport.CamelTestSupport class is wrapped on Junit.When we extend CamelTestSupport then we need to override one method createRouteBuilder.In that method create an object of route class.In our case we have done MyRoute.To test router we created method annotated with @Test. In that method we are getting mock endpoint and we have set expectation for example Expected expectedBodiesReceived.After expectation has been set,we are sending message to direct:start queue and setting header as well.To Test execution with our result we fire assertion.template, we get from CamelTestSupport.

    To check the output right click on AggregatorTest.java and run it.It will pass test case and we will get following output.


    To run the program from command prompt,open command prompt and go to project location and run below command.
    mvn compile exec:java -Dexec.mainClass=com.preparationforinterview.AggregatorTest

Download source code

Camel With Aggregator EIP



Visit Others Links

Camel Introduction
Camel Setup
File Copy Example Using Camel
File Copy Example In Camel Using Spring
JMS Message Transfer Example Using Camel
JMS Message Transfer Example In Camel Using Spring
Processor In Camel
Object to JSON Transfer Example Using Spring In Camel
Camel With Bean Injection Example Using Spring DSL
Camel With Bean Injection Registry Example Using Java DSL
Camel Bean Parameter Binding Example Using Java DSL
Camel Error Handling With Redelivery Example Using Java DSL
Camel Exception Handling With Example Using Java DSL
Camel JUnit Test With Example Using Java DSL
Camel JUnit Test With Example Using Spring DSL
Camel Unit Test With Mock Component Example
Camel With Stream Component Example
Camel In-Memory Messaging Example Using Direct
Camel With Timer And Quartz Example
Camel Aggregator EIP Example
Camel Splitter EIP Example
Camel Routing Slip EIP Example
Camel Dynamic Router EIP Example
Camel Load Balancer EIP Example
Top Camel Interview Question
© 2019 PreparationForInterview.com