数据集的划分

数据集的划分

因为sqoop是将数据的迁移任务转化为相应的Haoop任务的,Hadoop任务是数据集划分的,即每个map任务的数据集不一样。
那么在sqoop中是如何划分数据集的呢?
这个由以下的类实现。

/**
 * This allows connector to define how input data from the FROM source can be partitioned.
 * The number of data partitions also determines the degree of parallelism.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Partitioner<LinkConfiguration, FromJobConfiguration> {

  /**
   * Partition input data into partitions.
   *
   * Each partition will be then processed in separate extractor.
   *
   * @param context Partitioner context object
   * @param linkConfiguration link configuration object
   * @param jobConfiguration job configuration object
   * @return
   */
  public abstract List<Partition> getPartitions(PartitionerContext context,
      LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration);
}

下面是jdbc的 相关的划分的函数。


public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {

  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);


  private long numberPartitions;
  private String partitionColumnName;
  private int partitionColumnType;
  private String partitionMinValue;
  private String partitionMaxValue;
  private Boolean allowNullValueInPartitionColumn;

  @Override
  public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfig,
      FromJobConfiguration fromJobConfig) {
    List<Partition> partitions = new LinkedList<Partition>();

    numberPartitions = context.getMaxPartitions();
    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);

    allowNullValueInPartitionColumn = fromJobConfig.fromJobConfig.allowNullValueInPartitionColumn;
    if (allowNullValueInPartitionColumn == null) {
      allowNullValueInPartitionColumn = false;
    }

    if (partitionMinValue == null && partitionMaxValue == null) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(partitionColumnName + " IS NULL");
      partitions.add(partition);
      return partitions;
    }

    if (allowNullValueInPartitionColumn) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(partitionColumnName + " IS NULL");
      partitions.add(partition);
      numberPartitions -= 1;
    }

    switch (partitionColumnType) {
    case Types.TINYINT:
    case Types.SMALLINT:
    case Types.INTEGER:
    case Types.BIGINT:
      // Integer column
      partitions.addAll(partitionIntegerColumn());
      break;

    case Types.REAL:
    case Types.FLOAT:
    case Types.DOUBLE:
      // Floating point column
      partitions.addAll(partitionFloatingPointColumn());
      break;

    case Types.NUMERIC:
    case Types.DECIMAL:
      // Decimal column
      partitions.addAll(partitionNumericColumn());
      break;

    case Types.BIT:
    case Types.BOOLEAN:
      // Boolean column
      return partitionBooleanColumn();

    case Types.DATE:
    case Types.TIME:
    case Types.TIMESTAMP:
      // Date time column
      partitions.addAll(partitionDateTimeColumn());
      break;

    case Types.CHAR:
    case Types.VARCHAR:
    case Types.LONGVARCHAR:
      // Text column
      partitions.addAll(partitionTextColumn());
      break;

    default:
      throw new SqoopException(
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
          String.valueOf(partitionColumnType));
    }

    return partitions;
  }

  protected List<Partition> partitionDateTimeColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    long minDateValue = 0;
    long maxDateValue = 0;
    SimpleDateFormat sdf = null;
    switch(partitionColumnType) {
      case Types.DATE:
        sdf = new SimpleDateFormat("yyyy-MM-dd");
        minDateValue = Date.valueOf(partitionMinValue).getTime();
        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
        break;
      case Types.TIME:
        sdf = new SimpleDateFormat("HH:mm:ss");
        minDateValue = Time.valueOf(partitionMinValue).getTime();
        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
        break;
      case Types.TIMESTAMP:
        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
        break;
    }


    minDateValue += TimeZone.getDefault().getOffset(minDateValue);
    maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);

    sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

    long interval =  (maxDateValue - minDateValue) / numberPartitions;
    long remainder = (maxDateValue - minDateValue) % numberPartitions;

    if (interval == 0) {
      numberPartitions = (int)remainder;
    }

    long lowerBound;
    long upperBound = minDateValue;

    Object objLB = null;
    Object objUB = null;

    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;
      upperBound += (i <= remainder) ? 1 : 0;

      switch(partitionColumnType) {
        case Types.DATE:
          objLB = new Date(lowerBound);
          objUB = new Date(upperBound);
          break;
        case Types.TIME:
          objLB = new Time(lowerBound);
          objUB = new Time(upperBound);

          break;
        case Types.TIMESTAMP:
          objLB = new Timestamp(lowerBound);
          objUB = new Timestamp(upperBound);
          break;
      }

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructDateConditions(sdf, objLB, objUB, false));
      partitions.add(partition);
    }

    switch(partitionColumnType) {
      case Types.DATE:
        objLB = new Date(upperBound);
        objUB = new Date(maxDateValue);
        break;
      case Types.TIME:
        objLB = new Time(upperBound);
        objUB = new Time(maxDateValue);
        break;
      case Types.TIMESTAMP:
        objLB = new Timestamp(upperBound);
        objUB = new Timestamp(maxDateValue);
        break;
    }


    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructDateConditions(sdf, objLB, objUB, true));
    partitions.add(partition);
    return partitions;
  }

  protected List<Partition> partitionTextColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    String minStringValue = null;
    String maxStringValue = null;

    // Remove common prefix if any as it does not affect outcome.
    int maxPrefixLen = Math.min(partitionMinValue.length(),
        partitionMaxValue.length());
    // Calculate common prefix length
    int cpLen = 0;

    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
      char c1 = partitionMinValue.charAt(cpLen);
      char c2 = partitionMaxValue.charAt(cpLen);
      if (c1 != c2) {
        break;
      }
    }

    // The common prefix has length 'sharedLen'. Extract it from both.
    String prefix = partitionMinValue.substring(0, cpLen);
    minStringValue = partitionMinValue.substring(cpLen);
    maxStringValue = partitionMaxValue.substring(cpLen);

    BigDecimal minStringBD = textToBigDecimal(minStringValue);
    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);

    // Having one single value means that we can create only one single split
    if(minStringBD.equals(maxStringBD)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructTextConditions(prefix, 0, 0,
        partitionMinValue, partitionMaxValue, true, true));
      partitions.add(partition);
      return partitions;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();

    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
        new BigDecimal(numberPartitions));
    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
      splitSize = NUMERIC_MIN_INCREMENT;
    }

    BigDecimal curVal = minStringBD;

    int parts = 0;

    while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
      splitPoints.add(curVal);
      curVal = curVal.add(splitSize);
      // bigDecimalToText approximates to next comparison location.
      // Make sure we are still in range
      String text = bigDecimalToText(curVal);
      curVal = textToBigDecimal(text);
      ++parts;
    }

    if (splitPoints.size() == 0
        || splitPoints.get(0).compareTo(minStringBD) != 0) {
      splitPoints.add(0, minStringBD);
    }

    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
        || splitPoints.size() == 1) {
      splitPoints.add(maxStringBD);
    }

    // Turn the split points into a set of string intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructTextConditions(prefix, start, end,
        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
      partitions.add(partition);

      start = end;
    }

    return partitions;
  }


  protected List<Partition> partitionIntegerColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    long minValue = partitionMinValue == null ? Long.MIN_VALUE
      : Long.parseLong(partitionMinValue);
    long maxValue = Long.parseLong(partitionMaxValue);

    long interval =  (maxValue - minValue) / numberPartitions;
    long remainder = (maxValue - minValue) % numberPartitions;

    if (interval == 0) {
      numberPartitions = (int)remainder;
    }

    long lowerBound;
    long upperBound = minValue;
    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;
      upperBound += (i <= remainder) ? 1 : 0;

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructConditions(lowerBound, upperBound, false));
      partitions.add(partition);
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructConditions(upperBound, maxValue, true));
    partitions.add(partition);

    return partitions;
  }

  protected List<Partition> partitionFloatingPointColumn() {
    List<Partition> partitions = new LinkedList<Partition>();


    double minValue = partitionMinValue == null ? Double.MIN_VALUE
      : Double.parseDouble(partitionMinValue);
    double maxValue = Double.parseDouble(partitionMaxValue);

    double interval =  (maxValue - minValue) / numberPartitions;

    double lowerBound;
    double upperBound = minValue;
    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructConditions(lowerBound, upperBound, false));
      partitions.add(partition);
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructConditions(upperBound, maxValue, true));
    partitions.add(partition);

    return partitions;
  }

  protected List<Partition> partitionNumericColumn() {
    List<Partition> partitions = new LinkedList<Partition>();
    // Having one end in null is not supported
    if (partitionMinValue == null || partitionMaxValue == null) {
      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
    }

    BigDecimal minValue = new BigDecimal(partitionMinValue);
    BigDecimal maxValue = new BigDecimal(partitionMaxValue);

    // Having one single value means that we can create only one single split
    if(minValue.equals(maxValue)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructConditions(minValue));
      partitions.add(partition);
      return partitions;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();

    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));

    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
      splitSize = NUMERIC_MIN_INCREMENT;
    }

    BigDecimal curVal = minValue;

    while (curVal.compareTo(maxValue) <= 0) {
      splitPoints.add(curVal);
      curVal = curVal.add(splitSize);
    }

    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
      splitPoints.remove(splitPoints.size() - 1);
      splitPoints.add(maxValue);
    }

    // Turn the split points into a set of intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
      partitions.add(partition);

      start = end;
    }

    return partitions;
  }

  protected  List<Partition> partitionBooleanColumn() {
    List<Partition> partitions = new LinkedList<Partition>();


    Boolean minValue = parseBooleanValue(partitionMinValue);
    Boolean maxValue = parseBooleanValue(partitionMaxValue);

    StringBuilder conditions = new StringBuilder();

    // Having one single value means that we can create only one single split
    if(minValue.equals(maxValue)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();

      conditions.append(partitionColumnName).append(" = ")
          .append(maxValue);
      partition.setConditions(conditions.toString());
      partitions.add(partition);
      return partitions;
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();

    if (partitionMinValue == null) {
      conditions = new StringBuilder();
      conditions.append(partitionColumnName).append(" IS NULL");
      partition.setConditions(conditions.toString());
      partitions.add(partition);
    }
    partition = new GenericJdbcPartition();
    conditions = new StringBuilder();
    conditions.append(partitionColumnName).append(" = TRUE");
    partition.setConditions(conditions.toString());
    partitions.add(partition);
    partition = new GenericJdbcPartition();
    conditions = new StringBuilder();
    conditions.append(partitionColumnName).append(" = FALSE");
    partition.setConditions(conditions.toString());
    partitions.add(partition);
    return partitions;
  }

  private Boolean parseBooleanValue(String value) {
    if (value == null) {
      return null;
    }
    if (value.equals("1")) {
      return Boolean.TRUE;
    } else if (value.equals("0")) {
      return Boolean.FALSE;
    } else {
      return Boolean.parseBoolean(value);
    }
  }

  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
    try {
      return numerator.divide(denominator);
    } catch (ArithmeticException ae) {
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
    }
  }

  protected String constructConditions(
      Object lowerBound, Object upperBound, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    conditions.append(lowerBound);
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append(upperBound);
    return conditions.toString();
  }

  protected String constructConditions(Object value) {
    return new StringBuilder()
      .append(partitionColumnName)
      .append(" = ")
      .append(value)
      .toString()
     ;
  }

  protected String constructDateConditions(SimpleDateFormat sdf,
      Object lowerBound, Object upperBound, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
    return conditions.toString();
  }

  protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
      String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
    conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
    return conditions.toString();
  }

  /**
   *  Converts a string to a BigDecimal representation in Base 2^21 format.
   *  The maximum Unicode code point value defined is 10FFFF.  Although
   *  not all database system support UTF16 and mostly we expect UCS2
   *  characters only, for completeness, we assume that all the unicode
   *  characters are supported.
   *  Given a string 's' containing characters s_0, s_1,..s_n,
   *  the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
   *  This can be split and each split point can be converted back to
   *  a string value for comparison purposes.   The number of characters
   *  is restricted to prevent repeating fractions and rounding errors
   *  towards the higher fraction positions.
   */
  private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
  private static final int MAX_CHARS_TO_CONVERT = 4;

  private BigDecimal textToBigDecimal(String str) {
    BigDecimal result = BigDecimal.ZERO;
    BigDecimal divisor = UNITS_BASE;

    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);

    for (int n = 0; n < len; ) {
      int codePoint = str.codePointAt(n);
      n += Character.charCount(codePoint);
      BigDecimal val = divide(new BigDecimal(codePoint), divisor);
      result = result.add(val);
      divisor = divisor.multiply(UNITS_BASE);
    }

    return result;
  }

  private String bigDecimalToText(BigDecimal bd) {
    BigDecimal curVal = bd.stripTrailingZeros();
    StringBuilder sb = new StringBuilder();

    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
      curVal = curVal.multiply(UNITS_BASE);
      int cp = curVal.intValue();
      if (0 >= cp) {
        break;
      }

      if (!Character.isDefined(cp)) {
        int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
        // We are guaranteed to find at least one character
        while(!Character.isDefined(t_cp)) {
          ++t_cp;
          if (t_cp == cp) {
            break;
          }
          if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0)  {
            t_cp = 1;
          }
        }
        cp = t_cp;
      }
      curVal = curVal.subtract(new BigDecimal(cp));
      sb.append(Character.toChars(cp));
    }

    return sb.toString();
  }

}

在上面的类中,划分Partitioner是根据the partition column data type. 上面支持的类别有以下几种:

TINYINT
SMALLINT
INTEGER
BIGINT
REAL
FLOAT
DOUBLE
NUMERIC
DECIMAL
BIT
BOOLEAN
DATE
TIME
TIMESTAMP
CHAR
VARCHAR
LONGVARCHAR

默认的 partition column是主键。
划分的方式是:
(upper boundary - lower boundary) / (max partitions)
最后得到每个Partitioner的上下边界,这个条件用于Extractor对数据进行抽取时使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容