博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
6.Hadoop的I/O操作
阅读量:3899 次
发布时间:2019-05-23

本文共 10581 字,大约阅读时间需要 35 分钟。

文章目录

前言

该篇博客记录了Hadoop在使用I/O操作时涉及到的相关Java代码,案例均来自于《Hadoop权威指南》

通过CompressionCodec对数据流进行压缩

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionOutputStream;import org.apache.hadoop.util.ReflectionUtils;public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0]; Class
codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); }}

通过CompressionCodec对数据流进行解压

import java.io.InputStream;import java.io.OutputStream;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) {
System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try {
in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally {
IOUtils.closeStream(in); IOUtils.closeStream(out); } }}

使用压缩池(CodecPool)对读取自标准输入的数据进行压缩,然后将其写到标准输出

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.compress.*;import org.apache.hadoop.util.ReflectionUtils;public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0]; Class
codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); /*[*/Compressor compressor = null; try {
compressor = CodecPool.getCompressor(codec);/*]*/ CompressionOutputStream out = codec.createOutputStream(System.out, /*[*/compressor/*]*/); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); /*[*/} finally {
CodecPool.returnCompressor(compressor); }/*]*/ }}

在MapReduce中使用压缩

这里使用的MapReduce作业,是对查找最高气温作业(博客地址为:

import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperatureWithCompression {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCompression " + ""); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /*[*/FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);/*]*/ job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

遍历Text对象中的字符

import java.nio.ByteBuffer;import org.apache.hadoop.io.Text;public class TextIterator {
public static void main(String[] args) {
Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength()); int cp; while (buf.hasRemaining() && (cp = Text.bytesToCodePoint(buf)) != -1) {
System.out.println(Integer.toHexString(cp)); } } }

用Writable实现存储一对Text对象

import java.io.*;import org.apache.hadoop.io.*;public class TextPair implements WritableComparable
{
private Text first; private Text second; public TextPair() {
set(new Text(), new Text()); } public TextPair(String first, String second) {
set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) {
set(first, second); } public void set(Text first, Text second) {
this.first = first; this.second = second; } public Text getFirst() {
return first; } public Text getSecond() {
return second; } @Override public void write(DataOutput out) throws IOException {
first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException {
first.readFields(in); second.readFields(in); } @Override public int hashCode() {
return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() {
return first + "\t" + second; } @Override public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first); if (cmp != 0) {
return cmp; } return second.compareTo(tp.second); } public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() {
super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) {
return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) {
throw new IllegalArgumentException(e); } } } static {
WritableComparator.define(TextPair.class, new Comparator()); } public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() {
super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) {
throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } }}

写入SequenceFile对象

import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;public class SequenceFileWriteDemo {
private static final String[] DATA = {
"One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException {
String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) {
key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally {
IOUtils.closeStream(writer); } }}

读取SequenceFile对象

import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.ReflectionUtils;public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try {
reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); } } finally {
IOUtils.closeStream(reader); } }}

转载地址:http://bpcen.baihongyu.com/

你可能感兴趣的文章
PAT---A1019. General Palindromic Number (20)
查看>>
PAT---A1027. Colors in Mars (20)
查看>>
PAT---1058. A+B in Hogwarts (20)
查看>>
PAT---A1001. A+B Format (20)
查看>>
PAT---A1005. Spell It Right (20)
查看>>
PAT---A1035. Password (20)
查看>>
PAT---A1077. Kuchiguse (20)
查看>>
PAT---A1062. Talent and Virtue (25)
查看>>
PAT---A1012. The Best Rank (25)
查看>>
数据库SQL语言语法总结3---查询语句
查看>>
数据库SQL语言语法总结4---数据更新
查看>>
数据库SQL语言语法总结5---视图
查看>>
数据库SQL语言语法总结6---数据控制
查看>>
数据库SQL语言语法总结1---表操作
查看>>
Numpy中stack(),hstack(),vstack()函数详解
查看>>
基于3D卷积神经网络的行为识别
查看>>
K.function用法
查看>>
keras -- multi-loss
查看>>
pytorch数据增强的具体细节
查看>>
pytorch专题 --- load模型
查看>>