Convert Figma logo to code with AI

elastic logoelasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop

1,930
988
1,930
109

Top Related Projects

40,184

Apache Spark - A unified analytics engine for large-scale data processing

5,524

Apache Hive

14,703

Apache Hadoop

23,929

Apache Flink

7,828

Apache Beam is a unified programming model for Batch and Streaming data processing.

Quick Overview

Elasticsearch for Apache Hadoop is a library that allows Hadoop jobs to interact with Elasticsearch. It provides native integration between Elasticsearch and Apache Hadoop, Apache Hive, Apache Spark, and Apache Storm, enabling big data processing and analytics with Elasticsearch as both source and sink.

Pros

  • Seamless integration with popular big data frameworks
  • High performance and scalability for large datasets
  • Support for various data formats and types
  • Easy-to-use API and configuration options

Cons

  • Requires understanding of both Elasticsearch and Hadoop ecosystems
  • May introduce additional complexity to data pipelines
  • Limited documentation for advanced use cases
  • Potential version compatibility issues between Elasticsearch and Hadoop components

Code Examples

  1. Reading data from Elasticsearch in Spark:
import org.elasticsearch.spark._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("ES-Spark Example").getOrCreate()
val esRDD = spark.sparkContext.esRDD("index/type")
esRDD.take(10).foreach(println)
  1. Writing data to Elasticsearch in Hadoop MapReduce:
import org.elasticsearch.hadoop.mr.EsOutputFormat;

job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputValueClass(MapWritable.class);
EsOutputFormat.setResource(conf, "index/type");
  1. Querying Elasticsearch in Hive:
CREATE EXTERNAL TABLE es_table (id STRING, name STRING, age INT)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index/type', 'es.query' = '?q=age:[30 TO 40]');

SELECT * FROM es_table LIMIT 10;

Getting Started

  1. Add the Elasticsearch-Hadoop dependency to your project:
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>7.17.9</version>
</dependency>
  1. Configure Elasticsearch connection in your Hadoop/Spark/Hive job:
es.nodes=localhost
es.port=9200
es.resource=index/type
  1. Use the appropriate API for your framework (e.g., JavaEsSparkSQL for Spark, EsInputFormat/EsOutputFormat for Hadoop MapReduce) to read from or write to Elasticsearch.

Competitor Comparisons

40,184

Apache Spark - A unified analytics engine for large-scale data processing

Pros of Spark

  • More versatile and general-purpose big data processing framework
  • Supports a wider range of data sources and sinks
  • Offers advanced analytics capabilities like machine learning and graph processing

Cons of Spark

  • Steeper learning curve and more complex setup
  • Higher resource requirements for cluster deployment
  • May be overkill for simpler Elasticsearch-specific tasks

Code Comparison

Elasticsearch-Hadoop (Reading from Elasticsearch):

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "index/type");

Spark (Reading from Elasticsearch):

val df = spark.read
  .format("org.elasticsearch.spark.sql")
  .load("index/type")

Summary

Elasticsearch-Hadoop is specifically designed for Elasticsearch integration, offering seamless connectivity and optimized performance for Elasticsearch-related tasks. It's simpler to set up and use for Elasticsearch-specific operations.

Spark, on the other hand, is a more comprehensive data processing framework with broader capabilities. It can work with various data sources, including Elasticsearch, and provides advanced analytics features. However, it may require more resources and expertise to set up and use effectively.

Choose Elasticsearch-Hadoop for focused Elasticsearch integration, and Spark for more diverse and complex big data processing needs that may include Elasticsearch alongside other data sources and advanced analytics requirements.

5,524

Apache Hive

Pros of Hive

  • Mature and widely adopted data warehousing solution for Hadoop ecosystems
  • Supports a SQL-like query language (HiveQL) for easier data analysis
  • Integrates well with other Apache Hadoop projects

Cons of Hive

  • Can be slower for real-time queries compared to Elasticsearch
  • Requires more setup and configuration for Hadoop ecosystem
  • Less flexible for handling unstructured or semi-structured data

Code Comparison

Hive query example:

SELECT customer_id, SUM(order_total)
FROM orders
GROUP BY customer_id
HAVING SUM(order_total) > 1000;

Elasticsearch-Hadoop query example:

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "orders");
JavaPairRDD<String, Double> customerTotals = esRDD.mapToPair(tuple -> {
    Map<String, Object> source = tuple._2;
    return new Tuple2<>((String) source.get("customer_id"), (Double) source.get("order_total"));
}).reduceByKey((a, b) -> a + b).filter(tuple -> tuple._2 > 1000);

Summary

Hive is a robust data warehousing solution for Hadoop ecosystems, offering SQL-like querying capabilities. Elasticsearch-Hadoop, on the other hand, provides faster real-time queries and better handling of unstructured data. The choice between the two depends on specific use cases, existing infrastructure, and performance requirements.

14,703

Apache Hadoop

Pros of Hadoop

  • More comprehensive ecosystem for big data processing and storage
  • Wider industry adoption and community support
  • Supports a broader range of data processing use cases beyond Elasticsearch integration

Cons of Hadoop

  • Steeper learning curve and more complex setup
  • Potentially slower performance for specific Elasticsearch-related tasks
  • Requires more resources and infrastructure to run effectively

Code Comparison

Hadoop (Java):

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);

Elasticsearch-Hadoop (Java):

Configuration conf = new Configuration();
conf.set("es.nodes", "localhost:9200");
conf.set("es.resource", "index/type");
Job job = Job.getInstance(conf);
job.setOutputFormatClass(EsOutputFormat.class);

Both repositories provide Java-based APIs for working with big data, but Elasticsearch-Hadoop focuses specifically on integrating Hadoop with Elasticsearch, while Hadoop offers a more general-purpose framework for distributed data processing. Elasticsearch-Hadoop simplifies the process of reading from and writing to Elasticsearch within Hadoop jobs, whereas Hadoop requires additional configuration and code to achieve similar functionality.

23,929

Apache Flink

Pros of Flink

  • More versatile for general-purpose stream and batch processing
  • Supports a wider range of data sources and sinks
  • Offers advanced features like stateful computations and event time processing

Cons of Flink

  • Steeper learning curve due to its broader scope
  • May be overkill for simple Elasticsearch integration tasks
  • Requires more setup and configuration for basic use cases

Code Comparison

Flink (Java):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("input.txt");
DataStream<Tuple2<String, Integer>> counts = text
    .flatMap(new Tokenizer())
    .keyBy(0)
    .sum(1);
counts.print();

Elasticsearch-Hadoop (Java):

JavaPairRDD<String, String> esRDD = JavaEsSpark.esRDD(sc, "index/type");
JavaRDD<Map<String, Object>> javaRDD = esRDD.map(new Function<Tuple2<String, String>, Map<String, Object>>() {
    public Map<String, Object> call(Tuple2<String, String> tuple) throws Exception {
        return new ObjectMapper().readValue(tuple._2(), Map.class);
    }
});

Summary

While Elasticsearch-Hadoop focuses specifically on Elasticsearch integration with Hadoop ecosystems, Flink offers a more comprehensive data processing framework. Flink is better suited for complex stream processing tasks but may be excessive for simple Elasticsearch operations. Elasticsearch-Hadoop provides a more straightforward solution for Elasticsearch-specific use cases within Hadoop environments.

7,828

Apache Beam is a unified programming model for Batch and Streaming data processing.

Pros of Beam

  • Supports multiple programming languages (Java, Python, Go)
  • Provides a unified programming model for batch and streaming data processing
  • Offers a wide range of built-in transforms and I/O connectors

Cons of Beam

  • Steeper learning curve due to its more complex architecture
  • May have higher overhead for simpler data processing tasks
  • Less specialized for Elasticsearch integration compared to Elasticsearch-Hadoop

Code Comparison

Elasticsearch-Hadoop (Scala):

import org.elasticsearch.spark._
val rdd = sc.esRDD("index/type")

Beam (Java):

import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
PCollection<String> output = pipeline
    .apply(ElasticsearchIO.read().withConnectionConfiguration(config));

Summary

Beam is a more versatile data processing framework supporting multiple languages and processing models, while Elasticsearch-Hadoop is specifically tailored for Elasticsearch integration. Beam offers greater flexibility but may be more complex for simple tasks, whereas Elasticsearch-Hadoop provides a more straightforward solution for Elasticsearch-specific operations.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Elasticsearch Hadoop Build Status

Elasticsearch real-time search and analytics natively integrated with Hadoop. Supports Map/Reduce, Apache Hive, and Apache Spark.

See project page and documentation for detailed information.

Requirements

Elasticsearch (1.x or higher (2.x highly recommended)) cluster accessible through REST. That's it! Significant effort has been invested to create a small, dependency-free, self-contained jar that can be downloaded and put to use without any dependencies. Simply make it available to your job classpath and you're set. For a certain library, see the dedicated chapter.

ES-Hadoop 6.x and higher are compatible with Elasticsearch 1.X, 2.X, 5.X, and 6.X

ES-Hadoop 5.x and higher are compatible with Elasticsearch 1.X, 2.X and 5.X

ES-Hadoop 2.2.x and higher are compatible with Elasticsearch 1.X and 2.X

ES-Hadoop 2.0.x and 2.1.x are compatible with Elasticsearch 1.X only

Installation

Stable Release (currently 8.15.1)

Available through any Maven-compatible tool:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>8.15.1</version>
</dependency>

or as a stand-alone ZIP.

Development Snapshot

Grab the latest nightly build from the repository again through Maven:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>9.0.0-SNAPSHOT</version>
</dependency>
<repositories>
  <repository>
    <id>sonatype-oss</id>
    <url>http://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
  </repository>
</repositories>

or build the project yourself.

We do build and test the code on each commit.

Supported Hadoop Versions

Running against Hadoop 1.x is deprecated in 5.5 and will no longer be tested against in 6.0. ES-Hadoop is developed for and tested against Hadoop 2.x and YARN. More information in this section.

Feedback / Q&A

We're interested in your feedback! You can find us on the User mailing list - please append [Hadoop] to the post subject to filter it out. For more details, see the community page.

Online Documentation

The latest reference documentation is available online on the project home page. Below the README contains basic usage instructions at a glance.

Usage

Configuration Properties

All configuration properties start with es prefix. Note that the es.internal namespace is reserved for the library internal use and should not be used by the user at any point. The properties are read mainly from the Hadoop configuration but the user can specify (some of) them directly depending on the library used.

Required

es.resource=<ES resource location, relative to the host/port specified above>

Essential

es.query=<uri or query dsl query>              # defaults to {"query":{"match_all":{}}}
es.nodes=<ES host address>                     # defaults to localhost
es.port=<ES REST port>                         # defaults to 9200

The full list is available here

Map/Reduce

For basic, low-level or performance-sensitive environments, ES-Hadoop provides dedicated InputFormat and OutputFormat that read and write data to Elasticsearch. To use them, add the es-hadoop jar to your job classpath (either by bundling the library along - it's ~300kB and there are no-dependencies), using the DistributedCache or by provisioning the cluster manually. See the documentation for more information.

Note that es-hadoop supports both the so-called 'old' and the 'new' API through its EsInputFormat and EsOutputFormat classes.

'Old' (org.apache.hadoop.mapred) API

Reading

To read data from ES, configure the EsInputFormat on your job configuration along with the relevant properties:

JobConf conf = new JobConf();
conf.setInputFormat(EsInputFormat.class);
conf.set("es.resource", "radio/artists");
conf.set("es.query", "?q=me*");             // replace this with the relevant query
...
JobClient.runJob(conf);

Writing

Same configuration template can be used for writing but using EsOuputFormat:

JobConf conf = new JobConf();
conf.setOutputFormat(EsOutputFormat.class);
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
...
JobClient.runJob(conf);

'New' (org.apache.hadoop.mapreduce) API

Reading

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists");
conf.set("es.query", "?q=me*");             // replace this with the relevant query
Job job = new Job(conf)
job.setInputFormatClass(EsInputFormat.class);
...
job.waitForCompletion(true);

Writing

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
Job job = new Job(conf)
job.setOutputFormatClass(EsOutputFormat.class);
...
job.waitForCompletion(true);

Apache Hive

ES-Hadoop provides a Hive storage handler for Elasticsearch, meaning one can define an external table on top of ES.

Add es-hadoop-.jar to hive.aux.jars.path or register it manually in your Hive script (recommended):

ADD JAR /path_to_jar/es-hadoop-<version>.jar;

Reading

To read data from ES, define a table backed by the desired index:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists', 'es.query' = '?q=me*');

The fields defined in the table are mapped to the JSON when communicating with Elasticsearch. Notice the use of TBLPROPERTIES to define the location, that is the query used for reading from this table.

Once defined, the table can be used just like any other:

SELECT * FROM artists;

Writing

To write data, a similar definition is used but with a different es.resource:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists');

Any data passed to the table is then passed down to Elasticsearch; for example considering a table s, mapped to a TSV/CSV file, one can index it to Elasticsearch like this:

INSERT OVERWRITE TABLE artists
    SELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture) FROM source s;

As one can note, currently the reading and writing are treated separately but we're working on unifying the two and automatically translating HiveQL to Elasticsearch queries.

Apache Spark

ES-Hadoop provides native (Java and Scala) integration with Spark: for reading a dedicated RDD and for writing, methods that work on any RDD. Spark SQL is also supported

Scala

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument:

import org.elasticsearch.spark._

..
val conf = ...
val sc = new SparkContext(conf)
sc.esRDD("radio/artists", "?q=me*")

Spark SQL

import org.elasticsearch.spark.sql._

// DataFrame schema automatically inferred
val df = sqlContext.read.format("es").load("buckethead/albums")

// operations get pushed down and translated at runtime to Elasticsearch QueryDSL
val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016)))

Writing

Import the org.elasticsearch.spark._ package to gain savetoEs methods on your RDDs:

import org.elasticsearch.spark._

val conf = ...
val sc = new SparkContext(conf)

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

Spark SQL

import org.elasticsearch.spark.sql._

val df = sqlContext.read.json("examples/people.json")
df.saveToEs("spark/people")

Java

In a Java environment, use the org.elasticsearch.spark.rdd.java.api package, in particular the JavaEsSpark class.

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument.

import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");

Spark SQL

SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().format("es").load("buckethead/albums");
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016)))

Writing

Use JavaEsSpark to index any RDD to Elasticsearch:

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");

Spark SQL

import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

DataFrame df = sqlContext.read.json("examples/people.json")
JavaEsSparkSQL.saveToEs(df, "spark/docs")

Building the source

Elasticsearch Hadoop uses Gradle for its build system and it is not required to have it installed on your machine. By default (gradlew), it automatically builds the package and runs the unit tests. For integration testing, use the integrationTests task. See gradlew tasks for more information.

To create a distributable zip, run gradlew distZip from the command line; once completed you will find the jar in build/libs.

To build the project, JVM 8 (Oracle one is recommended) or higher is required.

License

This project is released under version 2.0 of the Apache License

Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright
ownership. Elasticsearch licenses this file to you under
the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.