How to implement the splitter and aggregator patterns with Apache Camel

I have found that Apache Camel is a good way to load data from log files into a database. Read on to see how I did this using the splitter and aggregator patterns with Apache Camel.


Problem

We have huge logging files for a project (months of system logging), the one file can be 12 mg.
These files have a special common format [comma delimited format (csv)], and every record inside the one file represents some logging data about the login user and the actions they performed inside the system.

Example:

username1,fname1,lname1,email1,action1,start-time1,end-time1
username2,fname2,lname2,email2,action2,start-time2,end-time2
username3,fname3,lname3,email3,action3,start-time3,end-time3
username4,fname4,lname4,email4,action4,start-time4,end-time4
username5,fname5,lname5,email5,action5,start-time5,end-time5
username6,fname6,lname6,email6,action6,start-time6,end-time6
.
.
.
//This can go 150000 records

Now what we want is to transfer these huge records from these big text files into some sort of database to make the process of searching the data faster and more comprehensive.

Solution

The first thing came into my mind was the Camel framework from Apache.
Camel supports most of the “Enterprise Integration Patterns” from the excellent book by Gregor Hohpe and Bobby Woolf, I recommend reading this book which can be found on Amazon here.

In this solution I used two of these patterns:

The application basically reads the files one by one from a configured path to a directory, then process and marshals each line in the file into java class and at the end saves these objects into a database.The Spring framework provids beans wiring and the inversion of control so the developer can switch between different implementations, for example injecting “MysqlPersistService” or “MongoPersistService” into “PersistService”.

To download a sample of the solution please click here.

Understanding Important Classes

 

LogRoute.java

package com.logger.rout;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.BindyType;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import com.logger.rout.service.CsvToModelMapper;
import com.logger.rout.service.PersistService;

@Component
public class LogRoute extends RouteBuilder {
        private static final long BATCH_TIME_OUT = 3000L;

        private static final int MAX_RECORDS = 900;

        @Value("${log.file.input}")
        private String folderPath;

        @Value("${log.file.output}")
        private String folderPathout;

        @Autowired
        private CsvToModelMapper csvToModelConverter;

        @Autowired
        @Qualifier("mysqlPersistService")
        private PersistService persistService;

        @Override
        public void configure() throws Exception {
                from("file:" + folderPath)
                .log("Start processing ....")
                .multicast()
                .parallelProcessing()
                .to("direct:database-save", "direct:move-to-out")
                .end();

                from("direct:database-save")
                                .log("Start saving to database ....")
                                .split()
                                .tokenize("\n")
                                .streaming()
                                .filter(simple("${body.length} > 30"))
                                .unmarshal()
                                .bindy(BindyType.Csv, CSVRecord.class)
                                .bean(csvToModelConverter, "convertToMysqlModel")
                                .aggregate(constant(true), batchAggregationStrategy())
                                .completionPredicate(batchSizePredicate())
                                .completionTimeout(BATCH_TIME_OUT)
                                .bean(persistService)
                                .log("End saving to database ....")
                                .end();

                from("direct:move-to-out")
                                .setHeader(Exchange.FILE_NAME,
                                                simple("${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.${file:ext}"))
                                .to("file:" + folderPathout)
                                .end();

        }

        @Bean
        private AggregationStrategy batchAggregationStrategy() {
                return new ArrayListAggregationStrategy();
        }

        @Bean
        public Predicate batchSizePredicate() {
                return new BatchSizePredicate(MAX_RECORDS);
        }

}

This class contains the definition of the route and the flow of data from the beginning of the route till the end, it includes:
1- Reading the csv file from “IN” directory.
2- Split and stream the contents into sequence of messages using the splitter pattern.
3- Marshall every incoming csv message into Java class format.
4- Aggregate every 900 messages into a List of objects using the aggregator pattern.
5- Save these messages into database as a batch.
6- Move the processed files into the “OUT” directory and mark as processed.

ArrayListAggregationStrategy.java

package com.logger.rout;

import java.util.ArrayList;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class ArrayListAggregationStrategy implements AggregationStrategy {

        public ArrayListAggregationStrategy() {
                super();
        }

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                Message newIn = newExchange.getIn();
                Object newBody = newIn.getBody();
                ArrayList list = null;
                if (oldExchange == null) {
                        list = new ArrayList();
                        list.add(newBody);
                        newIn.setBody(list);
                        return newExchange;
                } else {
                        Message in = oldExchange.getIn();
                        list = in.getBody(ArrayList.class);
                        list.add(newBody);
                        return oldExchange;
                }
        }

}

The responsibility of this class is to aggregate the incoming messages from the new Exchange into a List of messages then save that List into the old Exchange with no conditions.
Please click here to read more about the Camel Exchanges.

BatchSizePredicate.java

package com.logger.rout;

import java.util.ArrayList;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.commons.collections.CollectionUtils;

public class BatchSizePredicate implements Predicate {

        public int size;

        public BatchSizePredicate(int size) {
                this.size = size;
        }

        @Override
        public boolean matches(Exchange exchange) {
                if (exchange != null) {
                        ArrayList list = exchange.getIn().getBody(ArrayList.class);
                        if (CollectionUtils.isNotEmpty(list) && list.size() == size) {
                                return true;
                        }
                }
                return false;
        }

}

The responsibility of this class is to end the current aggregation when the size of the aggregated List reaches a certain number and then start a new aggregation.
If this condition was not satisfied for the remaining messages that have aggregate size less than the batch required size another predicate will be used to finish the route.
This predicate is provided by the camel: “completionTimeout(BATCH_TIME_OUT)”; please check LogRoute.java for more information.

RouteConfiguration.java

package com.logger.rout;

import java.util.ArrayList;
import java.util.List;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.javaconfig.CamelConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RouteConfiguration extends CamelConfiguration {
        @Autowired
        LogRoute logRoute;

        @Override
        public List routes() {
                List routeBuilders = new ArrayList();
                routeBuilders.add(logRoute);
                return routeBuilders;
        }

}

Java based configuration to auto wire Spring components and start Camel.

service classes
These classes are used in the route to save the messages List into a database (Mysql or Mongo).

Conclusion

In this blog I provided a sample solution for data conversion from a csv file into records saved in a database using a web application that is pulling files from a specific directory. Apache Camel made it easy to implement this solution with few lines of code using the Route Builder and different Processors implementation like Aggregators and Splitters.

3 comments

  1. Hi !

    I have the same use case. But how do you implement a transaction by file ? I have tried many configurations but it only rollback the last records commited and not all the records of the file.

    Thanks for you answer

  2. In this case I don’t implement transaction on file level, the transaction is implemented on the Service level, but I think you can add “.transacted(ref)” before you split the file into lines and provide your transaction reference.

    the other thing you can do rather than rolling back you can send the failing messages list to a dead letter destination like using onException.

Comments are closed.