基于rxjava的集合并发

简介

  • 本文主要是讲基于rxjava包装的一个针对集合做并发操作的工具类
  • rxjava文档地址

依赖

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version></version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version></version>
</dependency>

支持四类function

  • 传入数据List,返回处理之后的List(不保证顺序)
  • 传入数据List,返回处理之后的数据Map
  • 传入数据List,分组大小,返回处理之后的List(不保证顺序)
  • 传入数据List,分组大小,返回处理之后的数据Map

function接口

public interface List2List<I,R> {
    List<R> call(List<I> list);
}

public interface List2Map<I, K, V>{
    Map<K, V> call(List<I> list);
}

public interface Object2List<I, R> {
    List<R> call(I i);
}

public interface Object2Map<I, K, V> {
    Map<K, V> call(I i);
}

线程池及工厂方法

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableHelper {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
            new LinkedBlockingQueue<Runnable>());

    private ObservableHelper(){}

    private static class ClassHolder{
        private static ObservableHelper observableHelper = new ObservableHelper();
    }

    public static ObservableHelper INSTANCE(){
        return ClassHolder.observableHelper;
    }

    public <T> Observable<T> createObservable(final Func0<T> func) {
        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                try {
                    subscriber.onNext(func.call());
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.from(executor)).cache();
    }
}

ObservableExecutor 并行处理器

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableExecutor {

    private ObservableHelper helper = ObservableHelper.INSTANCE();

    private static class ExecutorHolder {
        private static ObservableExecutor executor = new ObservableExecutor();
    }

    private ObservableExecutor(){}

    public static ObservableExecutor INSTANCE() {
        return ExecutorHolder.executor;
    }

    /**
     * 传入List, 对List中的每一项分配一条线程做处理,返回Map结构,自己需要实现Object2Map接口
     * @param input
     * @param func
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final I i) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 传入List,针对List中每一项分配一条线程做处理,返回List结构,自己实现Object2List接口
     * @param input
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
            @Override
            public Observable<List<R>> call(final I i) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override
                    public List<R> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override
                    public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override
            public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }

    /**
     * 传入List数据,分组大小,对List数据做分组以后,为每一组分配一个线程做处理,返回Map结构,自己实现List2Map接口
     * @param input
     * @param partitionSize
     * @param functionRMap
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
            final List2Map<I, K, V> functionRMap) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> lists = Lists.partition(input, partitionSize);
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final List<I> list) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return functionRMap.call(list);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 传入List数据,分组大小,为每一组数据分配一条线程处理,返回List结构数据,自己实现List2List接口
     * @param input
     * @param partitionSize
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
            final List2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> partitions = Lists.partition(input, partitionSize);
        Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
            @Override public Observable<List<R>> call(final List<I> is) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override public List<R> call() {
                        return func.call(is);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }
}

demos

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-23
 * @since mobile-oppkit-server 1.0
 */
public class Demo {
    private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();

    public static void main(String[] args) {

        List<Integer> numbers = Lists.newLinkedList();
        for(int i=500;i<=1000;i++){
            numbers.add(i);
        }

        Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
            new Object2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(Integer integer) {
                    Map<Integer, String> map = Maps.newHashMap();
                    map.put(integer, String.valueOf(integer*integer%1000));
                    return map;
                }
            });

        System.out.println(result1);
        /**
         * 将数字转换为字符串,每个线程处理一个数字转换,并以list的结构返回
         */
        List<String> result2 = observableExecutor.executeObservable(numbers,
            new Object2List<Integer, String>() {
                @Override
                public List<String> call(Integer integer) {
                    List<String> result = Lists.newLinkedList();
                    result.add(String.valueOf(integer*integer%1000));
                    return result;
                }
            });

        System.out.println(result2);

        /**
         * 将数字转换为字符串,对 numbers 分组,每组2个元素,每个线程处理一组,并以map结构返回
         */
        Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
            new List2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(List<Integer> list) {
                    Map<Integer, String> map = Maps.newHashMap();
                    for (Integer integer : list) {
                        map.put(integer, String.valueOf(integer*integer%1000));
                    }
                    return map;
                }
            });
        System.out.println(result3);

        /**
         * 将数字转换为字符串 对 numbers 分组,每组2个元素,每个线程处理一组,并以list结构返回
         */
        List<String> result4 = observableExecutor.executeObservable(numbers, 2,
            new List2List<Integer, String>() {
                @Override
                public List<String> call(List<Integer> list) {
                    List<String> result = Lists.newLinkedList(Lists.transform(list,
                        new Function<Integer, String>() {
                            @Override
                            public String apply(Integer input) {
                                return String.valueOf(input*input%1000);
                            }
                        }));
                    return result;
                }
            });
        System.out.println(result4);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,559评论 18 139
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,825评论 0 10
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,257评论 25 707
  • 商婷婷曾经是我为之骄傲的一个学生。 商婷婷是10级学生,大学期间叱咤风云,她让我骄傲的不是成绩。学的会计专业,却对...
    古古飞阅读 1,783评论 13 35
  • 一、保证不旷课,不缺交作业,课前认真准备,上课认真听讲,课下认真自习! 二、确保人身安全,没事不出校门,不单独...
    少村阅读 165评论 0 0