Deleting Elastic Search Records older than X number of Days

Requirements: We need to be able to delete all the records for a specific index, that match a given timestamp.

How? We will use the Java ElasticSearch Client

Lets do it:

Add the proper mvn dependencies to your pom.xml

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.1.1</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.10.1</version>
</dependency>


We will write code in a non-blocking manner because our elasticsearch db is quite big. Your ElasticSearch index db could have huge amount of data, and we were to wait, we might hit the Rest Timeout and never get a response back. So it is best to use Async.

If you are to use Async, keep in mind that the calling @Service, and/or @Component need to be running, otherwise you will never receive your reponse. So definitly DONT call this code from inside main.

In this example, I am using localhost as the address, but this is just an example. If you are using Spring-boot, you would autowire.

   @Autowired
    private RestHighLevelClient client;

We are going to be deleting all records, that have the field @timestamp which match dates (LessThanOrEqual=lte) to 30 days. Effectively anything older than 30 days will be removed for the specific Index you provide.

		RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(HttpHost.create("http://localhost:9200"))); 
		
		//We want to delete records older than 30 days.
		ZonedDateTime ldt = ZonedDateTime.now().minusDays(30);
		
		Supplier<ActionListener<BulkByScrollResponse>> actionListenerSupplier = () -> new ActionListener<BulkByScrollResponse>() {
		    @Override
		    public void onResponse(BulkByScrollResponse bulkResponse) {
		    	
		       System.out.println("cleanupIndexesOlderThan Response:" + bulkResponse);
		       
		    }

		    @Override
		    public void onFailure(Exception e) {
		    	
		        System.out.println("cleanupIndexesOlderThan Exception" + e);
		    }
		};
		
		QueryBuilder rangeQuery = QueryBuilders.rangeQuery("@timestamp").lte(ldt.toInstant().truncatedTo(ChronoUnit.DAYS));
		
		DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest("YourIndexNameHere");
		deleteByQueryRequest.setConflicts("proceed");
		deleteByQueryRequest.setQuery(QueryBuilders.boolQuery().must(rangeQuery));
		deleteByQueryRequest.setBatchSize(200);
		deleteByQueryRequest.setScroll(TimeValue.timeValueMinutes(10));
		
		client.deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, actionListenerSupplier.get());

The Query would look like this when you run:

Query{
   "bool" : {
     "must" : [
       {
         "range" : {
           "@timestamp" : {
             "from" : null,
             "to" : "2020-11-16T00:00:00.000Z",
             "include_lower" : true,
             "include_upper" : true,
             "boost" : 1.0
           }
         }
       }
     ],
     "adjust_pure_negative" : true,
     "boost" : 1.0
   }
 }, BatchSize:200, Context-Alive-Time:10

*** Once this reached production, we saw that we had approximately 60million records per 1 day range, and we had only 1 index only and in this case, the deletion logic above was not good enough.

What we ended up doing was to create daily indexes, nice_index_20201230, nice_index_20201231, etc, then we changed our application logic and updated all search queries to use wildcards. The code would then look for nice_index*. We would delete all the old dated indexes through another lifecycle managment job in AWS.

Leave a comment