当前位置:  开发笔记 > 编程语言 > 正文

Java流并行化的可视化

如何解决《Java流并行化的可视化》经验,为你挑选了2个好方法。

通常,并不十分清楚并行流如何将输入分成块以及块连接的顺序.有没有办法可视化任何流源的整个过程,以更好地了解正在发生的事情?假设我创建了一个这样的流:

Stream stream = IntStream.range(0, 100).boxed().parallel();

我想看到一些树状的结构:

             [0..99]
         _____/   \_____
        |               |
     [0..49]         [50..99]
    __/   \__        __/  \__
   |         |      |        |
[0..24]  [25..49] [50..74] [75..99]

这意味着整个输入范围[0..99]被拆分为范围,[0..49][50..99]范围又进一步分裂.当然这样的图应该反映Stream API的实际工作,所以如果我用这样的流执行一些实际操作,则应该以相同的方式执行拆分.



1> Tagir Valeev..:

当前流API实现使用收集器组合器以与先前拆分的方式完全相同的方式组合中间结果.另外,拆分策略取决于源和公共池并行水平,但不依赖于使用精确还原操作(同为reduce,collect,forEach,count,等).依靠这一点,创建可视化收集器并不是很困难:

public static Collector> parallelVisualize() {
    class Range {
        private String first, last;
        private Range left, right;

        void accept(Object obj) {
            if (first == null)
                first = obj.toString();
            else
                last = obj.toString();
        }

        Range combine(Range that) {
            Range p = new Range();
            p.first = first == null ? that.first : first;
            p.last = Stream
                    .of(that.last, that.first, this.last, this.first)
                    .filter(Objects::nonNull).findFirst().orElse(null);
            p.left = this;
            p.right = that;
            return p;
        }

        String pad(String s, int left, int len) {
            if (len == s.length())
                return s;
            char[] result = new char[len];
            Arrays.fill(result, ' ');
            s.getChars(0, s.length(), result, left);
            return new String(result);
        }

        public List finish() {
            String cur = toString();
            if (left == null) {
                return Collections.singletonList(cur);
            }
            List l = left.finish();
            List r = right.finish();
            int len1 = l.get(0).length();
            int len2 = r.get(0).length();
            int totalLen = len1 + len2 + 1;
            int leftAdd = 0;
            if (cur.length() < totalLen) {
                cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
            } else {
                leftAdd = (cur.length() - totalLen) / 2;
                totalLen = cur.length();
            }
            List result = new ArrayList<>();
            result.add(cur);

            char[] dashes = new char[totalLen];
            Arrays.fill(dashes, ' ');
            Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
                    + leftAdd, '_');
            int mid = totalLen / 2;
            dashes[mid] = '/';
            dashes[mid + 1] = '\\';
            result.add(new String(dashes));

            Arrays.fill(dashes, ' ');
            dashes[len1 / 2 + leftAdd] = '|';
            dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
            result.add(new String(dashes));

            int maxSize = Math.max(l.size(), r.size());
            for (int i = 0; i < maxSize; i++) {
                String lstr = l.size() > i ? l.get(i) : String.format("%"
                        + len1 + "s", "");
                String rstr = r.size() > i ? r.get(i) : String.format("%"
                        + len2 + "s", "");
                result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
            }
            return result;
        }

        public String toString() {
            if (first == null)
                return "(empty)";
            else if (last == null)
                return "[" + first + "]";
            return "[" + first + ".." + last + "]";
        }
    }
    return Collector.of(Range::new, Range::accept, Range::combine,
            Range::finish);
}

这是使用4核机器的这个收集器获得的一些有趣的结果(结果将在机器上具有不同数量的不同availableProcessors()).

拆分简单范围:

IntStream.range(0, 100)
        .boxed().parallel().collect(parallelVisualize())
        .forEach(System.out::println);

甚至分成16个任务:

                                                                  [0..99]                                                                   
                                   ___________________________________/\________________________________                                    
                                  |                                                                     |                                   
                              [0..49]                                                               [50..99]                                
                 _________________/\______________                                     _________________/\________________                  
                |                                 |                                   |                                   |                 
            [0..24]                           [25..49]                            [50..74]                            [75..99]              
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______         
       |               |                 |                 |                 |                 |                 |                 |        
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]     
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

拆分两个流串联:

IntStream
        .concat(IntStream.range(0, 10), IntStream.range(10, 100))
        .boxed().parallel().collect(parallelVisualize())
        .forEach(System.out::println);

如您所见,首先拆分取消连接流:

                                                                           [0..99]                                                                           
       _______________________________________________________________________/\_____                                                                        
      |                                                                              |                                                                       
   [0..9]                                                                        [10..99]                                                                    
    __/\__                                        ___________________________________/\__________________________________                                    
   |      |                                      |                                                                       |                                   
[0..4] [5..9]                                [10..54]                                                                [55..99]                                
                                _________________/\________________                                     _________________/\________________                  
                               |                                   |                                   |                                   |                 
                           [10..31]                            [32..54]                            [55..76]                            [77..99]              
                       ________/\_______                   ________/\_______                   ________/\_______                   ________/\_______         
                      |                 |                 |                 |                 |                 |                 |                 |        
                  [10..20]          [21..31]          [32..42]          [43..54]          [55..65]          [66..76]          [77..87]          [88..99]     
                   ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
                  |        |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
              [10..14] [15..20] [21..25] [26..31] [32..36] [37..42] [43..48] [49..54] [55..59] [60..65] [66..70] [71..76] [77..81] [82..87] [88..93] [94..99]

在串联之前执行中间操作(boxed())的两个流连接的拆分:

Stream.concat(IntStream.range(0, 50).boxed().parallel(), IntStream.range(50, 100).boxed())
        .collect(parallelVisualize())
        .forEach(System.out::println);

如果其中一个输入流在连接之前没有变为并行模式,则它根本拒绝拆分:

                                   [0..99]                                   
                                   ___/\_________________________________    
                                  |                                      |   
                              [0..49]                                [50..99]
                 _________________/\______________                           
                |                                 |                          
            [0..24]                           [25..49]                       
        ________/\_____                   ________/\_______                  
       |               |                 |                 |                 
   [0..11]         [12..24]          [25..36]          [37..49]              
    ___/\_          ___/\___          ___/\___          ___/\___             
   |      |        |        |        |        |        |        |            
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49]         

拆分平面图:

Stream.of(0, 50)
        .flatMap(start -> IntStream.range(start, start+50).boxed().parallel())
        .parallel().collect(parallelVisualize())
        .forEach(System.out::println);

平面映射从不在嵌套流内并行化:

    [0..99]     
    ____/\__    
   |        |   
[0..49] [50..99]

来自7000个元素的未知大小的迭代器的流(请参阅上面的答案):

StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(
                IntStream.range(0, 7000).iterator(),
                Spliterator.ORDERED), true)
        .collect(parallelVisualize()).forEach(System.out::println);

分裂真的很糟糕,每个人都在等待最大的部分[3072..6143]:

                       [0..6999]                        
     _______________________/\___                       
    |                            |                      
[0..1023]                  [1024..6999]                 
                 ________________/\____                 
                |                      |                
          [1024..3071]           [3072..6999]           
                              _________/\_____          
                             |                |         
                       [3072..6143]     [6144..6999]    
                                           ___/\____    
                                          |         |   
                                    [6144..6999] (empty)

已知大小的迭代器源:

StreamSupport
        .stream(Spliterators.spliterator(IntStream.range(0, 7000)
                .iterator(), 7000, Spliterator.ORDERED), true)
        .collect(parallelVisualize()).forEach(System.out::println);

提供尺寸可以更好地解锁进一步的分裂:

                                                                                                    [0..6999]                                                                                                     
           ______________________________________________________________________________________________/\________                                                                                               
          |                                                                                                        |                                                                                              
     [0..1023]                                                                                               [1024..6999]                                                                                         
     _____/\__                                 ____________________________________________________________________/\________________________                                                                     
    |         |                               |                                                                                              |                                                                    
[0..511] [512..1023]                    [1024..3071]                                                                                   [3072..6999]                                                               
                                  ____________/\___________                                                                  ________________/\__________________________________________________                 
                                 |                         |                                                                |                                                                    |                
                           [1024..2047]              [2048..3071]                                                     [3072..6143]                                                         [6144..6999]           
                            _____/\_____              _____/\_____                                 _________________________/\________________________                                        ___/\___________    
                           |            |            |            |                               |                                                   |                                      |                |   
                     [1024..1535] [1536..2047] [2048..2559] [2560..3071]                    [3072..4607]                                        [4608..6143]                           [6144..6999]        (empty)
                                                                                      ____________/\___________                           ____________/\___________                     _____/\_____              
                                                                                     |                         |                         |                         |                   |            |             
                                                                               [3072..3839]              [3840..4607]              [4608..5375]              [5376..6143]        [6144..6571] [6572..6999]        
                                                                                _____/\_____              _____/\_____              _____/\_____              _____/\_____                                        
                                                                               |            |            |            |            |            |            |            |                                       
                                                                         [3072..3455] [3456..3839] [3840..4223] [4224..4607] [4608..4991] [4992..5375] [5376..5759] [5760..6143]                                  

这种收集器的进一步改进可以生成图形图像(如svg),跟踪处理每个节点的线程,显示每个组的元素数量等等.如果你愿意,可以使用它.


很好的分析和图形.

2> Holger..:

我想通过一个解决方案来增强Tagir的优秀答案,该解决方案用于监视端的分割,甚至是中间操作(当前流API实现强加了一些限制):

public static  Stream proxy(Stream src) {
    Class> sClass=(Class)Stream.class;
    Class> spClass=(Class)Spliterator.class;
    return proxy(src, sClass, spClass, StreamSupport::stream);
}
public static IntStream proxy(IntStream src) {
    return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);
}
public static LongStream proxy(LongStream src) {
    return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);
}
public static DoubleStream proxy(DoubleStream src) {
    return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);
}
static final Object EMPTY=new StringBuilder("empty");
static , Sp extends Spliterator> S proxy(
        S src, Class sc, Class spc, BiFunction f) {

    final class Node implements InvocationHandler,Runnable,
        Consumer, IntConsumer, LongConsumer, DoubleConsumer {
        final Class type;
        Spliterator src;
        Object first=EMPTY, last=EMPTY;
        Node left, right;
        Object currConsumer;
        public Node(Spliterator src, Class type) {
            this.src = src;
            this.type=type;
        }
        private void value(Object t) {
            if(first==EMPTY) first=t;
            last=t;
        }
        public void accept(Object t) {
            value(t); ((Consumer)currConsumer).accept(t);
        }
        public void accept(int t) {
            value(t); ((IntConsumer)currConsumer).accept(t);
        }
        public void accept(long t) {
            value(t); ((LongConsumer)currConsumer).accept(t);
        }
        public void accept(double t) {
            value(t); ((DoubleConsumer)currConsumer).accept(t);
        }
        public void run() {
            System.out.println();
            finish().forEach(System.out::println);
        }
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Node curr=this; while(curr.right!=null) curr=curr.right;
            if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {
                curr.currConsumer=args[0];
                args[0]=curr;
            }
            if(method.getName().equals("trySplit")) {
                Spliterator s=curr.src.trySplit();
                if(s==null) return null;
                Node pfx=new Node<>(s, type);
                pfx.left=curr.left; curr.left=pfx;
                curr.right=new Node<>(curr.src, type);
                src=null;
                return pfx.create();
            }
            return method.invoke(curr.src, args);
        }
        Object create() {
            return Proxy.newProxyInstance(null, new Class[]{type}, this);
        }
        String pad(String s, int left, int len) {
            if (len == s.length())
                return s;
            char[] result = new char[len];
            Arrays.fill(result, ' ');
            s.getChars(0, s.length(), result, left);
            return new String(result);
        }
        public List finish() {
            String cur = toString();
            if (left == null) {
                return Collections.singletonList(cur);
            }
            List l = left.finish();
            List r = right.finish();
            int len1 = l.get(0).length();
            int len2 = r.get(0).length();
            int totalLen = len1 + len2 + 1;
            int leftAdd = 0;
            if (cur.length() < totalLen) {
                cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
            } else {
                leftAdd = (cur.length() - totalLen) / 2;
                totalLen = cur.length();
            }
            List result = new ArrayList<>();
            result.add(cur);

            char[] dashes = new char[totalLen];
            Arrays.fill(dashes, ' ');
            Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
                    + leftAdd, '_');
            int mid = totalLen / 2;
            dashes[mid] = '/';
            dashes[mid + 1] = '\\';
            result.add(new String(dashes));

            Arrays.fill(dashes, ' ');
            dashes[len1 / 2 + leftAdd] = '|';
            dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
            result.add(new String(dashes));

            int maxSize = Math.max(l.size(), r.size());
            for (int i = 0; i < maxSize; i++) {
                String lstr = l.size() > i ? l.get(i) : String.format("%"
                        + len1 + "s", "");
                String rstr = r.size() > i ? r.get(i) : String.format("%"
                        + len2 + "s", "");
                result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
            }
            return result;
        }
        private Object first() {
            if(left==null) return first;
            Object o=left.first();
            if(o==EMPTY) o=right.first();
            return o;
        }
        private Object last() {
            if(right==null) return last;
            Object o=right.last();
            if(o==EMPTY) o=left.last();
            return o;
        }
        public String toString() {
            Object o=first(), p=last();
            return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");
        }
    }
    Node n=new Node<>(src.spliterator(), spc);
    Sp sp=(Sp)Proxy.newProxyInstance(null, new Class[]{n.type}, n);
    return f.apply(sp, true).onClose(n);
}


它允许使用代理包装spliterator,该代理将监视拆分操作和遇到的对象.块处理的逻辑类似于Tagir,事实上,我复制了他的结果打印例程.

您可以传入流的源或已附加相同操作的流.(在后一种情况下,您应该尽早申请.parallel()流).正如Tagir所解释的,在大多数情况下,拆分行为取决于源和配置的并行性,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:

try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {
    is.filter(i -> i/20%2==0)
      .mapToObj(ix->"\""+ix+'"')
      .forEach(s->{});
}

将打印

                                                                  [0..99]                                                                   
                                   ___________________________________/\________________________________                                    
                                  |                                                                     |                                   
                              [0..49]                                                               [50..99]                                
                 _________________/\______________                                     _________________/\________________                  
                |                                 |                                   |                                   |                 
            [0..24]                           [25..49]                            [50..74]                            [75..99]              
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______         
       |               |                 |                 |                 |                 |                 |                 |        
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]     
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

try(Stream s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)
      .mapToObj(ix->"\""+ix+'"'))) {
    s.forEach(str->{});
}

将打印

                                                                                   ["0".."99"]                                                                                    
                                              ___________________________________________/\___________________________________________                                            
                                             |                                                                                        |                                           
                                       ["0".."49"]                                                                              ["50".."99"]                                      
                         ____________________/\______________________                                           ______________________/\___________________                       
                        |                                            |                                         |                                           |                      
                  ["0".."19"]                                  ["40".."49"]                              ["50".."59"]                                ["80".."99"]                 
            ____________/\_________                      ____________/\______                           _______/\___________                   ____________/\________             
           |                       |                    |                    |                         |                    |                 |                      |            
     ["0".."11"]             ["12".."19"]            (empty)           ["40".."49"]              ["50".."59"]            (empty)        ["80".."86"]           ["87".."99"]       
      _____/\___              _____/\_____           ___/\__            _____/\_____              _____/\_____           ___/\__         _____/\__              _____/\_____      
     |          |            |            |         |       |          |            |            |            |         |       |       |         |            |            |     
["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]

正如我们在这里看到的,我们正在监视结果,.filter(…).mapToObj(…)但是块明确地由源确定,可能根据过滤器的条件在下游产生空块.

请注意,我们可以将源监控与Tagir的收集器监控结合起来:

try(IntStream s=proxy(IntStream.range(0, 100))) {
    s.parallel().filter(i -> i/20%2==0)
     .boxed().collect(parallelVisualize())
     .forEach(System.out::println);
}

这将打印(请注意collect首先打印输出):

                                                              [0..99]                                                               
                                  ________________________________/\_______________________________                                 
                                 |                                                                 |                                
                             [0..49]                                                           [50..99]                             
                 ________________/\______________                                   _______________/\_______________                
                |                                |                                 |                                |               
            [0..19]                          [40..49]                          [50..59]                         [80..99]            
        ________/\_____                  ________/\______                   _______/\_______                ________/\_____         
       |               |                |                |                 |                |              |               |        
   [0..11]         [12..19]          (empty)         [40..49]          [50..59]          (empty)       [80..86]        [87..99]     
    ___/\_          ___/\___         ___/\__          ___/\___          ___/\___         ___/\__        ___/\_          ___/\___    
   |      |        |        |       |       |        |        |        |        |       |       |      |      |        |        |   
[0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99]

                                                                  [0..99]                                                                   
                                   ___________________________________/\________________________________                                    
                                  |                                                                     |                                   
                              [0..49]                                                               [50..99]                                
                 _________________/\______________                                     _________________/\________________                  
                |                                 |                                   |                                   |                 
            [0..24]                           [25..49]                            [50..74]                            [75..99]              
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______         
       |               |                 |                 |                 |                 |                 |                 |        
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]     
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

我们可以清楚地看到处理的块如何匹配,但是在过滤之后,一些块具有较少的元素,其中一些是完全空的.

这是展示的地方,两种监测方式可以产生显着差异:

try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {
    is.boxed()
      .collect(parallelVisualize())
      .forEach(System.out::println);
}
                                                                                                [0.0..99.0]                                                                                                 
                                                   ___________________________________________________/\________________________________________________                                                    
                                                  |                                                                                                     |                                                   
                                            [0.0..49.0]                                                                                           [50.0..99.0]                                              
                         _________________________/\______________________                                                     _________________________/\________________________                          
                        |                                                 |                                                   |                                                   |                         
                  [0.0..24.0]                                       [25.0..49.0]                                        [50.0..74.0]                                        [75.0..99.0]                    
            ____________/\_________                           ____________/\___________                           ____________/\___________                           ____________/\___________             
           |                       |                         |                         |                         |                         |                         |                         |            
     [0.0..11.0]             [12.0..24.0]              [25.0..36.0]              [37.0..49.0]              [50.0..61.0]              [62.0..74.0]              [75.0..86.0]              [87.0..99.0]       
      _____/\___              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____      
     |          |            |            |            |            |            |            |            |            |            |            |            |            |            |            |     
[0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]

                             [0.0..10239.0]                              
       _____________________________/\_____                              
      |                                    |                             
[0.0..1023.0]                      [1024.0..10239.0]                     
                       ____________________/\_______                     
                      |                             |                    
              [1024.0..3071.0]             [3072.0..10239.0]             
                                        ____________/\______             
                                       |                    |            
                               [3072.0..6143.0]     [6144.0..10239.0]    
                                                         ___/\_______    
                                                        |            |   
                                                [6144.0..10239.0] (empty)

这证明了Tagir已经解释过的,未知大小的流分裂得很差,甚至事实limit(…)提供了良好估计的可能性(实际上,无限+限制在理论上是可预测的),实现并没有利用它.

使用批量大小将源拆分为块1024,1024在每次拆分后增加,创建超出范围的块limit.我们还可以看到每次分离前缀的方式.

但是当我们查看终端分割输出时,我们可以看到这些多余的块之间已经被丢弃,并且第一个块的另一个分裂发生了.由于这个块是由第一个拆分中的默认实现填充的中间数组的后端,我们在源处没有注意到它,但我们可以在终端操作中看到该数组已被拆分(不出所料)很平衡.

所以我们需要两种监控方式来全面了解......


非常好的补充,非常感谢!
推荐阅读
夏晶阳--艺术
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有