`
yjhexy
  • 浏览: 327253 次
  • 性别: Icon_minigender_1
  • 来自: 火星
社区版块
存档分类
最新评论

Hadoop读书笔记----(五)气象站分析演示代码

阅读更多

一,背景

气象站分析一批复杂的数据,演示需要分析的数据

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

存放在input.txt中。

其中包含了年份 和 温度数据

需要把这个年份和温度数据提取出来

 

二,具体执行

1,下载 hadoop-0.20.1

cd hadoop-020.1/conf/ 配置:

core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

 hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

 mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

	<!-- Put site-specific property overrides in this file. -->

<configuration>
	<property>
		<name>mapred.job.tracker</name>
		<value>localhost:9001</value>
	</property>
</configuration>

 配置完毕

 

cd bin

./hadoop namenode -format

./start-all.sh

 

2,我的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<artifactId>balance</artifactId>
		<groupId>com.yajun</groupId>
		<version>1.0-SNAPSHOT</version>
	</parent>
	<groupId>com.yajun.hadoop</groupId>
	<artifactId>balance.hadoop</artifactId>
	<version>1.0-SNAPSHOT</version>
	<name>balance.hadoop</name>
	<url>http://maven.apache.org</url>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.7</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-core</artifactId>
			<version>1.8.2</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.mahout.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
			<version>0.20.1</version>
		</dependency>
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>commons-httpclient</groupId>
			<artifactId>commons-httpclient</artifactId>
			<version>3.0</version>
		</dependency>
		<dependency>
			<groupId>commons-cli</groupId>
			<artifactId>commons-cli</artifactId>
			<version>1.2</version>
		</dependency>
	</dependencies>
</project>

使用以上pom,用maven 构建eclipse开发环境

 

3,写代码

分析代码 (Map部分)

package com.yajun.hadoop.temperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

/**
 * hadoop 书上的例子,提取年份,温度数据
 * 
 * @author txy
 */
public class MaxTemperatureMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
                    Reporter reporter) throws IOException {
        String line = value.toString();
        // 提取年份
        String year = line.substring(15, 19);
        // 提取温度 
        String temp = line.substring(87, 92);
        if (!missing(temp)) {
            int airTemperature = Integer.parseInt(temp);
            output.collect(new Text(year), new IntWritable(airTemperature));
        }
    }

    /**
     * 如果提取出来的温度达到9999,认为是提取不到数据
     * 
     * @param temp
     * @return 是否能正确提取温度数据
     */
    private boolean missing(String temp) {
        return temp.equals("+9999");
    }

}

 

输出结果代码(Reduce部分)

package com.yajun.hadoop.temperature;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * 输出当年最高温度
 * 
 * @author txy
 */
public class MaxTemperatureReducer extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int maxValue = Integer.MIN_VALUE;
        while (values.hasNext()) {
            maxValue = Math.max(maxValue, values.next().get());
        }
        output.collect(key, new IntWritable(maxValue));
    }
}

 

运行整个JOB的代码

package com.yajun.hadoop.temperature;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTemperatureDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass()
                    .getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        JobConf conf = new JobConf(getConf(), getClass());
        conf.setJobName("Max temperature");
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(MaxTemperatureMapper.class);
        conf.setCombinerClass(MaxTemperatureReducer.class);
        conf.setReducerClass(MaxTemperatureReducer.class);
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}

 

4,eclipse环境的hadoop插件配置好(如果没有安装这个插件也很简单:https://issues.apache.org/jira/browse/MAPREDUCE-1262 上面下载,扔到eclipse 的dropins目录里面搞定)
与hadoop的配置一样


 

5,运行代码

现将input.txt拷贝到 hdfs中去

 

./hadoop fs -put /home/txy/work/balanceofworld/balance/balance.hadoop/src/main/resources/temperature/input.txt /user/txy/src/main/resources/temperature/input.txt

 

设置运行 MaxTemperatureDriver 的时候需要两个命令行参数

1,输入文件:src/main/resources/temperature/input.txt (对应到HDFS里面的

/user/txy/src/main/resources/temperature/input.txt )

2,输出文件:src/main/resources/temperature/output.txt(对应到HDFS里面的

/user/txy/src/main/resources/temperature/output.txt

 

然后就在eclipse 里右键在hadoop上运行吧,哈哈。

  • 大小: 39.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics