在MapReduce处理数据时根据ip获取国家名称和国家码等信息

在MapReduce处理数据时根据ip获取国家名称和国家码等信息

问题描述:

在Windows下可以获取数据,在linux开发环境下获取不到数据是为什么?代码如下:

package com.ctsig.cdn.log.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.Reader;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

  • @copyright:
  • @create: 2018-05-14 17:54
    */
    public class IP2CCUtil {
    private static Logger logger = LoggerFactory.getLogger(IpAddressService.class);

    public String getCountry(String ipAddress) {
    File database = new File("/home/hadoop/GeoLite2-City.mmdb");

    if(isIP(ipAddress)) {
        try {
            InetAddress address = InetAddress.getByName(ipAddress);
            Reader reader = new Reader(database);
            JsonNode response = reader.get(address);
            JsonNode country = response.get("country");
            reader.close();
            return String.format("%s %s",
                    // 国家编码
                    country.get("iso_code").asText(),
                    // 国家英文名
                    country.get("names").get("en").asText());
        }catch (UnknownHostException e1) {
            e1.printStackTrace();
        }catch (IOException e) {
            logger.debug("未查到地址ip为:"+ipAddress+"的国家码和国家名等信息!", e);
        } 
    }
    return null;
    

    }

    /**

    • 判断是否是有效的IP *
    • @param ip IP
    • @return true or false */ public static boolean isIP(String ip) { return ip.matches("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"); }

    public static void main(String[] args) {
    String result = new IP2CCUtil().getCountry("83.83.205.108") ;
    if (result != null) {
    String code = result.split(" ")[0] ;
    String name = result.split(" ")[1] ;
    System.out.println(result);
    System.out.println("Country Code: "+code);
    System.out.println("Country Name: "+name);
    }

    }
    }

package com.ctsig.cdn.log;

import com.ctsig.cdn.log.util.DealDate;
import com.ctsig.cdn.log.util.IP2CCUtil;
import com.ctsig.cdn.log.util.PropsUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;

public class CleanData extends Configured implements Tool {
private static Logger logger = LoggerFactory.getLogger(CleanData.class);

public static class CleanMapper extends Mapper<Object, Text, Text, CleanBean> {

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        //获取文件名称,根据文件名称获取域名
        InputSplit inputSplit = context.getInputSplit();
        String filename = ((FileSplit) inputSplit).getPath().getName();
        String[] str = filename.split("_");
        String fileName = str[0];

        String line = fileName + " " + value.toString();
        context.write(new Text(), new CleanBean(line));
    }
}

public static class CleanReducer extends Reducer<Text, CleanBean, Text, CleanBean> {
    private MultipleOutputs<Text, CleanBean> multipleOutputs;
    private static IP2CCUtil ip2CCUtil;

    @Override
    protected void setup(Context context) {
        multipleOutputs = new MultipleOutputs<Text, CleanBean>(context);
        ip2CCUtil = new IP2CCUtil();
    }

    @Override
    protected void reduce(Text key, Iterable<CleanBean> Values, Context context) {

        for (CleanBean value : Values) {
            String[] line = value.getLine().split("\\s+");
            String ipvalue = "";
            String hit = "-1";
            String logInfo = "";
            String name = "";
            String UserAgent = "-";
            String doamin = "-";
            String countryCode = "-";
            String countryName = "-";
            String timeTaken ="0";
            String referer = "-";

            if (line[0].equals("swiftserve")) {
                //判断该行数据的长度是否满足需求
                if(line.length <12 && line.length>0) {
                    logger.info("错误的日志数据格式:"+value.getLine());
                } else {
                    //调用ss清洗程序
                    String datetime = line[4];
                    ipvalue = line[1];
                    try {
                        //根据ip获取国家码和国家名称
                        String couStr = ip2CCUtil.getCountry(ipvalue);
                        if(couStr != null) {
                            countryCode = couStr.split(" ")[0];
                            countryName = couStr.split(" ")[1];
                        }

                        //开始处理日志格式
                        //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
                        datetime = DealDate.Swiftservedate(datetime);
                        try {
                            Date d = sdf.parse(datetime.toString());
                            Date date = new Date(d.getTime());
                            SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            //时区处理完毕
                            String newdatetime = newsdf.format(date);
                            String d1[] = newdatetime.split(" ");
                            String namedata[] = newdatetime.split(" ")[0].split("-");
                            name = namedata[0] + namedata[1] + namedata[2];
                            String[] datavalue = value.toString().split("\"");
                            //判断截取的数据是否符合要求
                            if(datavalue.length<12 && datavalue.length >0) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            } else {
                                //处理cs(User-Agent)参数
                                UserAgent = datavalue[5];
                                //判断该行是否命中
                                String hitStr = datavalue[11];
                                if (hitStr.indexOf("HIT") != -1) {
                                    hit = "HIT";
                                } else {
                                    hit = "MISS";
                                }
                            }

                            String [] doStr = line[6].split("/");
                            if(doStr.length <3 && doStr.length>0) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            } else {
                                doamin = line[6].split("/")[2];
                                logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + doamin +
                                        "\001" + line[5].replace("\"", "") + "\001" + line[6].replace("\"", "") + "\001" + line[7] +
                                        "\001" + line[10] + "\001" + hit + "\001" + line[8] + "\001" + line[11].replace("\"", "") +
                                        "\001" + UserAgent;
                                multipleOutputs.write(key, new CleanBean(logInfo), name);
                            }
                        } catch (ParseException e) {
                            logger.info("错误的日志数据格式:"+value.getLine());
                        }
                    } catch (Exception e) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    }
                }

            } else if (line[0].equals("tata")) {
                //判断该行数据是否符合要求
                if(line.length >0 && line.length <12) {
                    logger.info("错误的日志数据格式:"+value.getLine());
                } else {
                    //调用tata清洗程序
                    String datetime = line[4];
                    String timeZone = line[5].replace("]", "");
                    ipvalue = line[1];
                    try {
                        //根据ip获取国家码和国家名称
                        String couStr = ip2CCUtil.getCountry(ipvalue);
                        if(couStr != null) {
                             countryCode = couStr.split(" ")[0];
                             countryName = couStr.split(" ")[1];
                        }

                        //开始处理日志格式
                        //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        sdf.setTimeZone(TimeZone.getTimeZone("GMT" + timeZone)); // 设置北京时区
                        datetime = DealDate.Tatadate(datetime);

                        try {
                            Date d = sdf.parse(datetime.toString());
                            Date date = new Date(d.getTime());
                            SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            //时区处理完毕
                            String newdatetime = newsdf.format(date);
                            String d1[] = newdatetime.split(" ");
                            String namedata[] = newdatetime.split(" ")[0].split("-");
                            name = namedata[0] + namedata[1] + namedata[2];
                            String[] datavalue = value.toString().split("\"");
                            //判断截取的数据是否符合要求
                            if(datavalue.length >0 && datavalue.length <7) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            } else {
                                //处理cs(User-Agent)参数
                                timeTaken = datavalue[6];
                                UserAgent = datavalue[5];
                                logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + line[2] + "\001"
                                        + line[6].replace("\"", "") + "\001" + line[7] + "\001" + line[9] + "\001"
                                        + line[10] + "\001" + "-1" + "\001" + timeTaken.replaceAll(" ", "") + "\001"
                                        + line[11].replace("\"", "") + "\001" + UserAgent;
                                multipleOutputs.write(key, new CleanBean(logInfo), name);
                            }
                        } catch(ParseException e) {
                            logger.info("错误的日志数据格式:"+value.getLine());
                        }
                    } catch (Exception e) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    }
                }

            } else {
                //判断该行数据是否符合要求
                if(line.length >0 && line.length <9) {
                    logger.info("错误的日志数据格式:"+value.getLine());
                } else {
                    //循环一调参数根据下标确定列
                    String datetime = line[1] + " " + line[2];
                    ipvalue = line[3];
                    //判断该行是否命中
                    String hitStr = line[line.length - 2];
                    String hitlast = hitStr.substring(hitStr.length() - 1, hitStr.length());
                    if (hitlast.equals("0") || hitlast.equals("3")) {
                        hit = "MISS";
                    } else if (hitlast.equals("1") || hitlast.equals("2")) {
                        hit = "HIT";
                    } else {
                        hit = "-1";
                    }
                    try {
                        //根据ip获取国家码和国家名称
                        String couStr = ip2CCUtil.getCountry(ipvalue);
                        if(couStr != null) {
                            countryCode = couStr.split(" ")[0];
                            countryName = couStr.split(" ")[1];
                        }

                        if (ipvalue.length() > 0) {
                            //开始处理日志格式
                            //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
                            try {
                                Date d = sdf.parse(datetime.toString());
                                Date date = new Date(d.getTime());
                                SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                //时区处理完毕
                                String newdatetime = newsdf.format(date);
                                String d1[] = newdatetime.split(" ");

                                String namedata[] = newdatetime.split(" ")[0].split("-");
                                name = namedata[0] + namedata[1] + namedata[2];
                                //处理cs(User-Agent)参数
                                String[] datavalue = value.toString().split("\"");
                                if(datavalue.length >0 && datavalue.length <4) {
                                    logger.info("错误的日志数据格式:"+value.getLine());
                                } else {
                                    UserAgent = datavalue[3];
                                    referer = datavalue[1];
                                    logInfo = d1[0] + "\001" + d1[1] + "\001" + line[3] + "\001" + countryName +
                                            "\001" + countryCode + "\001" + line[0] + "\001" + line[4] +
                                            "\001" + line[5] + "\001" + line[6] + "\001" + line[7] +
                                            "\001" + hit + "\001" + line[8] + "\001" + referer + "\001" + UserAgent;
                                    multipleOutputs.write(key, new CleanBean(logInfo), name);
                                }

                            } catch (ParseException e) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            }
                        }
                    } catch (Exception e) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    }
                }
            }
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}


@Override
public int run(String[] args) throws Exception {
    //读取配置文件
    Configuration conf = new Configuration();
    // 解决java.io.IOException: No FileSystem for scheme: hdfs异常
    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    //判断该天的数据是否已经清洗完成(即是否有_SUCCESS文件)
    Path fileSucc = new Path(args[1]+"/_SUCCESS");
    FileSystem fsSucc = fileSucc.getFileSystem(conf);
    if(fsSucc.exists(fileSucc)) {
        logger.info("该天的数据已经清洗完成!");
        return 0;
    } else {
        //判断目录是否存在,如果存在,则删除
        Path output = new Path(args[1]);
        FileSystem fs = output.getFileSystem(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
    }

    //新建一个任务
    Job job = Job.getInstance(conf);

    //主类
    job.setJarByClass(CleanData.class);

    //输入路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    //输出路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //Mapper
    job.setMapperClass(CleanMapper.class);
    //Reducer
    job.setReducerClass(CleanReducer.class);

    //key输出类型
    job.setOutputKeyClass(Text.class);
    //value输出类型
    job.setOutputValueClass(CleanBean.class);

    //去掉job设置outputFormatClass,改成通过LazyOutputFormat设置
    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

    return job.waitForCompletion(true) ? 0 : 1;
}


public static class CleanBean implements Writable {

    /*
     * 成员变量
     */
    private String line;  //每行的数据

    public CleanBean() {
        super();
    }

    public CleanBean(String line) {
        super();
        this.line = line;
    }

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(line);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        line = in.readUTF();
    }

    @Override
    public String toString() {
        return new String(this.line);
    }
}

public static void main(String[] args) throws Exception {
    // 从配置文件中读取属性
    /**  服务器运行报:java.lang.NullPointerException
     PropertyReader reader = new PropertyReader("config.properties");
     String hdfsUrl = reader.getProperty("hdfs.default.url") ;
     String originPath = reader.getProperty("hdfs.default.origin.path") ;
     String cleanedPath = reader.getProperty("hdfs.default.cleaned.path");
     logger.debug("HDFS URL: {}",hdfsUrl);
     **/
    PropsUtil r = new PropsUtil("config.properties");
    String hdfsUrl = r.getString("hdfs.default.url");
    String originPath = r.getString("hdfs.default.origin.path");
    String cleanedPath = r.getString("hdfs.default.cleaned.path");

    //数据输入路径和输出路径
    String[] ag = {hdfsUrl + originPath + "/tata/" + args[0],
            hdfsUrl + cleanedPath + "/" + args[0]};

    int ec = ToolRunner.run(new Configuration(), new CleanData(), ag);
    System.exit(ec);
}

}

在linux下单独执行IP2CCUtil 方法可以获取数据,不知为什么一执行CleanData ,countryCode 和countryName 两个变量值没有获取到,还是默认的“-”