通过两个表的关联字段最为Mapper输出的key值,来保证reducer中的values是对应的order与production的记录
package top.gujm.reducejoin;
import lombok.*;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Writable {
private String orderId;
private String productionId;
private String productionName;
private Integer amount;
private Flag flag;
public void write(DataOutput dataOutput) throws IOException {
//首先写入产品和订单表都存在的数据
dataOutput.writeUTF(flag.name());
dataOutput.writeUTF(productionId);
//预防null值
if(flag == Flag.ALL){
dataOutput.writeUTF(orderId);
dataOutput.writeInt(amount);
dataOutput.writeUTF(productionName);
}else if(flag == Flag.ORDER){
dataOutput.writeUTF(orderId);
dataOutput.writeInt(amount);
}else{
dataOutput.writeUTF(productionName);
}
}
public void readFields(DataInput dataInput) throws IOException {
//先读取都存在的字段
flag = Flag.valueOf(dataInput.readUTF());
productionId = dataInput.readUTF();
//根据标识读取相应字段
if(flag == Flag.ALL){
orderId = dataInput.readUTF();
amount = dataInput.readInt();
productionName = dataInput.readUTF();
}else if(flag == Flag.ORDER){
orderId = dataInput.readUTF();
amount = dataInput.readInt();
}else {
productionName = dataInput.readUTF();
}
}
@Override
public String toString() {
if(flag == Flag.ALL) {
return orderId + "\t" + productionName + "\t" + amount;
}else if(flag == Flag.ORDER){
return orderId + "\t" + productionId + "\t" + amount;
}else{
return productionId + "\t" + productionName;
}
}
//标识当前数据中的内容
public enum Flag{
/**
* 订单数据
*/
ORDER,
/**
* 产品数据
*/
PRODUCTION,
/**
* 所有数据
*/
ALL
}
}
package top.gujm.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Order> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取数据文件的文件名
FileSplit fs = (FileSplit) context.getInputSplit();
String fileName = fs.getPath().getName();
//分割行数据
String[] fields = value.toString().split("\t");
//构造对象
Order order = new Order();
if(fileName.startsWith("order")){
//订单
order.setOrderId(fields[0]);
order.setProductionId(fields[1]);
order.setAmount(Integer.parseInt(fields[2]));
order.setFlag(Order.Flag.ORDER);
}else {
//产品
order.setProductionId(fields[0]);
order.setProductionName(fields[1]);
order.setFlag(Order.Flag.PRODUCTION);
}
//输出
context.write(new Text(order.getProductionId()), order);
}
}
package top.gujm.reducejoin;
import org.apache.avro.Schema;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.math3.geometry.partitioning.BSPTreeVisitor;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ReduceJoinReduce extends Reducer<Text, Order, Order, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException {
List<Order> orders = new ArrayList<Order>();
Order production = null;
Iterator<Order> os = values.iterator();
while (os.hasNext()){
Order o = os.next();
if(o.getFlag() == Order.Flag.ORDER){
//每次循环os.next()返回的都是同一个对象,所以需要new一个Order
Order ot = new Order();
try {
BeanUtils.copyProperties(ot, o);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orders.add(ot);
}else{
production = new Order();
try {
BeanUtils.copyProperties(production, o);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
for(Order o : orders){
o.setProductionName(production.getProductionName());
o.setFlag(Order.Flag.ALL);
context.write(o, NullWritable.get());
}
}
}