HCatalog Input Output Format
The HCatInputFormat is used to read data from HDFS, while the HCatOutputFormat interface is used to write the resultant data to HDFS after processing with a MapReduce job.
HCatInputFormat -
The HCatInputFormat reads data from HCatalog-managed tables using MapReduce jobs. It provides a Hadoop 0.20 MapReduce API for data reading.
The HCatInputFormat API includes the below methods -
- setInput
- setOutputSchema
- getTableSchema
Key methods:
setInput(job, dbName, tableName)
or usingConfiguration
: specify the source table; metadata (partitions) are injected into the job config.setOutputSchema(...)
: define which columns to fetch—if omitted, all are retrieved.getTableSchema(context)
: fetches the full schema of the input table.
How to Use:
- Create an
InputJobInfo
object pointing to the source table (and optionally partitions). - Call
HCatInputFormat.setInput(...)
with that info. - Optionally call
setOutputSchema(...)
to control projection. - In your
Mapper<_, HCatRecord>
, use HCat’s record reader and schema for processing.
HCatOutputFormat -
HCatOutputFormat is used to write data to HCatalog-managed tables with MapReduce jobs. HCatOutputFormat disclosures a Hadoop 0.20 MapReduce API for writing data to a table. MapReduce job uses HCatOutputFormat to write output. The default OutputFormat configured for the table will be used. The new partition is published to the table after the job completes.
The HCatOutputFormat API includes the below methods -
- setOutput
- setSchema
- getTableSchema
Key methods:
setOutput(conf, credentials, OutputJobInfo)
: required first step; configures where and how to write.setSchema(conf, HCatSchema)
: specify schema for output—defaults to table schema if not provided.getRecordWriter(context)
: returns Hadoop’s writer for emitting data.getOutputCommitter(context)
: ensures output is finalized correctly.
The first call on the HCatOutputFormat must be setOutput. If any other call except HCatOutputFormat as a first call will throw an exception saying the output format is not initialized.
Full Example:
This MapReduce job (called GroupByAge) reads from a Hive/HCatalog table where the second column is an integer (e.g., “age”), groups rows by that column, and counts occurrences—equivalent to:
SELECT col1, COUNT(*)
FROM input_table
GROUP BY col1;
Input sample: {3, 3, 5, 5, 7, 9}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
public class GroupByAge extends Configured implements Tool {
public static class Map extends
Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable>
{
@Override protected void
map(WritableComparable key, HCatRecord val, Context ctx)
throws IOException, InterruptedException {
int age = (Integer) val.get(1); // second column
ctx.write(new IntWritable(age), new IntWritable(1));
}
}
public static class Reduce extends
Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord> {
@Override protected void
reduce(IntWritable key, Iterable<IntWritable> vals, Context ctx)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : vals) {
sum++;
}
HCatRecord rec = new DefaultHCatRecord(2);
rec.set(0, key.get());
rec.set(1, sum);
ctx.write(null, rec);
}
}
@Override public int run(String[] args) throws Exception {
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String metastoreURI = args[0]; // e.g. thrift://host:9083
String inputTable = args[1];
String outputTable = args[2];
String db = null;
String princ = System.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (princ != null) {
conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, princ);
}
Job job = Job.getInstance(conf, "GroupByAge");
// ─── Input ───
HCatInputFormat.setInput(job, InputJobInfo.create(db, inputTable, null));
job.setInputFormatClass(HCatInputFormat.class);
// ─── Job setup ───
job.setJarByClass(GroupByAge.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
// ─── Output ───
HCatOutputFormat.setOutput(job, OutputJobInfo.create(db, outputTable, null));
// must be first
HCatSchema outSchema = HCatOutputFormat.getTableSchema(job);
HCatOutputFormat.setSchema(job, outSchema);
job.setOutputFormatClass(HCatOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new GroupByAge(), args);
System.exit(res);
}
}
Deployment Steps:
### A. Upload JARs to HDFS These are required to localize on YARN nodes (version numbers may vary): ```bash hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp ``` :contentReference[oaicite:4]{index=4} Set the `LIB_JARS` env variable: ```bash export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar, hdfs:///tmp/hive-metastore-0.10.0.jar, ...
Compile and Package
javac -classpath
"$HCAT_HOME/share/hcatalog/*:$HADOOP_CLASSPATH" -d .GroupByAge.java
jar cvf GroupByAge.jar com/your/package/path/*.class
Run the Job
hadoop jar GroupByAge.jar \
com.your.package.path.GroupByAge \
thrift://metastore-host:9083 \
input_table \
output_table \
-libjars $LIB_JARS
input_table must exist with data and output_table can exist (it adds partitions) or be pre-created. After finishing, check HDFS or Hive for the output files (part-*).
Output
3, 2 5, 2 7, 1 9, 1