HCatalog Input Output Format

The HCatInputFormat is used to read data from HDFS, while the HCatOutputFormat interface is used to write the resultant data to HDFS after processing with a MapReduce job.

HCatInputFormat -

The HCatInputFormat reads data from HCatalog-managed tables using MapReduce jobs. It provides a Hadoop 0.20 MapReduce API for data reading.

The HCatInputFormat API includes the below methods -

  • setInput
  • setOutputSchema
  • getTableSchema

Key methods:

  • setInput(job, dbName, tableName) or using Configuration: specify the source table; metadata (partitions) are injected into the job config.
  • setOutputSchema(...): define which columns to fetch—if omitted, all are retrieved.
  • getTableSchema(context): fetches the full schema of the input table.

How to Use:

  • Create an InputJobInfo object pointing to the source table (and optionally partitions).
  • Call HCatInputFormat.setInput(...) with that info.
  • Optionally call setOutputSchema(...) to control projection.
  • In your Mapper<_, HCatRecord>, use HCat’s record reader and schema for processing.

HCatOutputFormat -

HCatOutputFormat is used to write data to HCatalog-managed tables with MapReduce jobs. HCatOutputFormat disclosures a Hadoop 0.20 MapReduce API for writing data to a table. MapReduce job uses HCatOutputFormat to write output. The default OutputFormat configured for the table will be used. The new partition is published to the table after the job completes.


The HCatOutputFormat API includes the below methods -

  • setOutput
  • setSchema
  • getTableSchema

Key methods:

  • setOutput(conf, credentials, OutputJobInfo): required first step; configures where and how to write.
  • setSchema(conf, HCatSchema): specify schema for output—defaults to table schema if not provided.
  • getRecordWriter(context): returns Hadoop’s writer for emitting data.
  • getOutputCommitter(context): ensures output is finalized correctly.

The first call on the HCatOutputFormat must be setOutput. If any other call except HCatOutputFormat as a first call will throw an exception saying the output format is not initialized.

Full Example:

This MapReduce job (called GroupByAge) reads from a Hive/HCatalog table where the second column is an integer (e.g., “age”), groups rows by that column, and counts occurrences—equivalent to:

SELECT col1, COUNT(*) 
FROM input_table 
GROUP BY col1;

Input sample: {3, 3, 5, 5, 7, 9}

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;

import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

  public static class Map extends 
  Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> 
   {
    @Override protected void 
	 map(WritableComparable key, HCatRecord val, Context ctx)
            throws IOException, InterruptedException {
      int age = (Integer) val.get(1);  // second column
      ctx.write(new IntWritable(age), new IntWritable(1));
    }
  }

  public static class Reduce extends 
  Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord> {
    @Override protected void 
	 reduce(IntWritable key, Iterable<IntWritable> vals, Context ctx)
            throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable v : vals) {
        sum++;
      }
      HCatRecord rec = new DefaultHCatRecord(2);
      rec.set(0, key.get());
      rec.set(1, sum);
      ctx.write(null, rec);
    }
  }

  @Override public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    args = new GenericOptionsParser(conf, args).getRemainingArgs();
    String metastoreURI = args[0];      // e.g. thrift://host:9083
    String inputTable = args[1];
    String outputTable = args[2];
    String db = null;

    String princ = System.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
    if (princ != null) {
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, princ);
    }

    Job job = Job.getInstance(conf, "GroupByAge");

    // ─── Input ───
    HCatInputFormat.setInput(job, InputJobInfo.create(db, inputTable, null));
    job.setInputFormatClass(HCatInputFormat.class);

    // ─── Job setup ───
    job.setJarByClass(GroupByAge.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);

    // ─── Output ───
    HCatOutputFormat.setOutput(job, OutputJobInfo.create(db, outputTable, null)); 
	// must be first
    HCatSchema outSchema = HCatOutputFormat.getTableSchema(job);
    HCatOutputFormat.setSchema(job, outSchema);
    job.setOutputFormatClass(HCatOutputFormat.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new GroupByAge(), args);
    System.exit(res);
  }
}

Deployment Steps:

### A. Upload JARs to HDFS

These are required to localize on YARN nodes (version numbers may vary):

```bash
hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
``` :contentReference[oaicite:4]{index=4}

Set the `LIB_JARS` env variable:

```bash
export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar,
    hdfs:///tmp/hive-metastore-0.10.0.jar, ...
Compile and Package
javac -classpath 
	"$HCAT_HOME/share/hcatalog/*:$HADOOP_CLASSPATH" -d .GroupByAge.java
jar cvf GroupByAge.jar com/your/package/path/*.class
Run the Job
hadoop jar GroupByAge.jar \
  com.your.package.path.GroupByAge \
  thrift://metastore-host:9083 \
  input_table \
  output_table \
  -libjars $LIB_JARS

input_table must exist with data and output_table can exist (it adds partitions) or be pre-created. After finishing, check HDFS or Hive for the output files (part-*).

Output
3, 2  
5, 2  
7, 1
9, 1