Java 8 - Streams and Collectors (With examples)


Hi, I am Malathi Boggavarapu working at Volvo Group and i live in Gothenburg, Sweden. I have been working on Java since several years and had vast experience and knowledge across various technologies.

This course will teach you about Java 8 feature - Streams and Collectors

Stream and Collectors

Stream : It is a new concept. It gives ways to efficiently process large amounts of data and also smaller amounts of data. Data can be processed automatically in parallel to leverage the computing power of multi core CPU's.

What is a Stream?


- Object on which one can define operations.Operations can be Map, Filter or Reduce operations
- An object that does not hold any data. For example, in Collection i have data and do basic operations on it. On stream i don't hold any data.
- An object that should not change the data it processes. Because it is distributed across multiple CPU's and since we don't want to be bothered with any visibility issues with atomic variable, volatile variable or synchronization.
- Object able to process data in one pass. For example Stream defined on collection of Person and define 3 operations Map, Reduce, Filter and in order to be processed effeciently it should not be stored in any of intermediate collection and should be processed in one Pass.
- Should be optimized from algorithm point of view and able to process data in parallel.

How can we build a Stream?

Stream method is added to collection interface

List<Person> persons = --
Stream<Person> stream = persons.stream();
stream.forEach(c -> System.out.println(c));

Filter operation

List<Person> persons = --
Stream<Person> stream = persons.stream();
Stream<Person> filtered = stream.filter(person -> person.getAge() > 20);
Predicate p = person -> person.getAge() > 20;

Predicate is a FunctionalInterface which exists in java.util.function.
Predicate interface with default methods

@FunctionalInterface
public interface Predicate<T>{
   boolean test(T t);
default Predicate<T> and(..){..}
default Predicate<T> or(..){..}
default Predicate<T> negate(..){..}
static Predicate isEqual(Object o){..}
}

Example

Predicate<String> p = Predicate.isEqual("two");
Stream<String> stream1 = Stream.of("one", "two", "three");
Stream<String> stream2 = stream1.filter(p);

 'of' is static method of Stream interface and is a way to create a Stream.

Example

Stream stream = Stream.of("one", "two", "three", "four", "five");
Predicate<String> p1 = p -> p.length() > 3;
Predicate<String> p2 = p -> p.isEqual("two");
Predicate<String> p3 = p -> p.isEqual("three");

stream.filter(p1).forEach(System.out::prinltln); // prints only the Strings "three", "four", "five"
stream.filter(p2).forEach(System.out::prinltln);// prints only the String "two"
stream.filter(p2.or(p3)).forEach(System.out::prinltln); // prints only the Strings "two" and "three"

- The call to the methods of the Stream are lazy. For example, call to the filter method is lazy because it is declaration of an operation on a given stream and it does not process any data.
- All the methods of the Stream that returns another Stream is lazy because of the reason stated above.
- Stream that is operated using some methods (filter) returns a Stream which is called an intermediary operation. If we check the javadoc of Stream interface, it is said clearly whether each method of the interface is intermediary operation or final/Terminal operation.

peek method - This method returns the stream whereas forEach does not return the stream.

List<String> result = new ArrayList();
List<Person> persons = ...;
persons.stream().peek(System.out::println).filter(person -> person.getAge() > 20).peek(result::add)

The above code does not print anything and does not add elements to ArrayList because peek and filter methods are the intermediary operation and they are just the declaration  of the methods. forEach method is final or terminal operation.

For example, if we change the above piece of code as follows

List<String> result = new ArrayList();
List<Person> persons = ...;
persons.stream().peek(System.out::println).filter(person -> person.getAge() > 20).forEach(result::add)

Here peek method returns a stream and filters the data using filter method and finally adds the data to result arraylist. Because forEach is a terminal or final operation, data is processed here.

Mapping operations

Map operations returns a stream and does not process any data. Takes argument as an object and returns an object.

@FunctionalInterface
public interface Function<T, R>{
  R apply(T t);
}

The interface also have two default methods Compose and andThen. Also have static method called identity.

Example

List<Person> list = ....
Stream<Person> stream = list.stream();
Stream<String> names = stream.map(person -> person.getName());

flatMap operation
-----------------
flatMap operation takes object as a parameter or argument and returns a Stream.
Example

List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
List<Integer> list2 = Arrays.asList(2, 4, 6);
List<Integer> list3 = Arrays.asList(3, 5, 7);

List<List<Integer>> list = Arrays.asList(list1, list2, list3);
Function<List<Integer>, Integer>  size = List::size;
Function<List<Integer>, Stream<Integer>> flatMapper = s -> s.stream();
list.stream().map(flatMapper).forEach(System.out::println);

The above code which uses map returns stream of streams. The output prints out 3 streams. Try it!!

list.stream().flatMap(flatMapper).forEach(System.out::println);

IF we use flatMap instead of map as shown above, all the 3 streams are flattened and made as a single stream. Try the above exmaple.

Reduction

Two kinds of Reduction included in stream API
Basic and classical sql operations like min, max, sum etc

How does it work?

List<Integer> ages = ...;
Stream<Integer> stream = ages.stream();
Integer sum = stream.reduce(0, (age1, age2) -> age1 + age2);

1st argument - Should be the identity element of reduction operation
2nd argument - reduction operation of type BinaryOperator<T>

BinaryOperator is special case of BiFunction. It takes two objects of type T and U and returns of the type R

@FunctionaInterface
public interface BiFunction<T, U, R>{
      R apply(T t, U u);
}

@FunctionaInterface
public interface BinaryOperator<T> extends BiFunction<T, T, T>{
 
}

The BiFunction takes two arguments, so..

what happens if the stream is empty?
 - The reduction of an empty stream is the identity element
what happens if the stream has only one element?
- If the stream has only one element, then the reduction is that element.

Example 1

Stream<Integer> stream = Stream.empty();
BinaryOperation<Integer> sum = (i1, i2) -> i1 + i2;
Integer id = 0;
int red = stream.reduce(id, sum)

The above code prints 0 because the stream is empty and it simply returns and print identity element.

Example 2

Stream<Integer> stream = Stream.of(1);
BinaryOperation<Integer> sum = (i1, i2) -> i1 + i2;
Integer id = 0;
int red = stream.reduce(id, sum)

The above code prints 1 because the stream contains only one value, it returns that one element plus identity element. (1 + 0)

<Integer> stream = Stream.of(1,2,3,4)
BinaryOperation<Integer> sum = (i1, i2) -> i1 + i2;
Integer id = 0;
int red = stream.reduce(id, sum)

The above code prints 10 ofcourse.

Reduction as max operation
--------------------------------

BinaryOperation<Integer> sum = (i1, i2) -> i1 > i2 ? i1 : i2;

The problem is the max operation does not have identity element but we need to provide identity element as the first argument in reduce method. when the stream is empty then it should return some

List<Integer> ages = ...;
Stream<Integer> stream = ages.stream();
Optional<Integer> max = stream.max(Comparator.naturalOrder());

Optional is the new concept in java8. It is just like a wrapper type. of Integer, Float, Double etc. The difference between a wrapper type in java and Optional is that Wrapper type always have some value but Optional means <There might be no result>

How to use Optional?
isPresent() returns true if there is something in the optional.

Optional<String> opt = ...;
if(opt.isPresent()){
   String s = opt.get(); // Used to get the value
}else{
 ....
}

orElse() method encapsulates both the calls isPresent() and get(). If get() does not return any value, it defaults to the value set inside the method.

Example:
String s = opt.orElse("");  // If get() does not return any value, then the empty string is returned as default value

orElseThrow(MyException::new) - lazy construct. we can use this method to throw the exception if get() does not return any value.

Reductions are terminal operations. They trigger processing of the data.

Example of Terminal operation - Reduction

List<Person> persons = ...;
Optional<Integer> minAge = persons.stream()
                                                            .map(person -> person.getAge()) // Stream<Integer>
                                                            .filter(age -> age > 20)                  // Stream<Integer>
                                                            .min(Comparator.naturalOrder()) // Terminal operation

persons.stream().map(person -> person.getLastName())
                          .allMatch(length < 20); // Terminal operation

Collectors


This is the second kind of reduction.
According to javadoc they are called as mutable reduction. Going to reduce the stream in a container that is mutable. i.e we are going to add all the elements of the stream to that container.

Example

List<Person> persons = ...;
String result = persons.stream().filter(person -> person.getAge() > 20)
                                                   .map(Person::getLastName)
                                                   .collect(Collectors.joining(","));


 Result is the String with the lastNames older than 20 seperated by comma. We can also collect in a List, Map

Example for collecting in a Map

Example:1

List<Person> persons = ...;
Map<Integer, List<Person>> result = persons.stream().filter(person -> person.getAge() > 20)
                                                  .collect(Collectors.groupingBy(Person::getAge));

Result is a Map containing the persons older than
    - The keys are the ages the people
    - The values are the list of the people of that age.

Example:2

Map<Integer, Long> result = persons.stream().filter(person -> person.getAge() > 20)
                                                  .collect(Collectors.groupingBy(Person::getAge,                                                                                      Collectors.counting())); // the downstream collector

Collectors.counting() just count the number of persons of each age.

Stream can not be reused. Once the stream has been used to process some set of data, we can not use the same stream to process another set of data. We should create a new stream

Comments

Popular posts from this blog

Bash - Execute Pl/Sql script from Shell script

Gradle Fundamentals

Load Balancing using Spring Cloud Netflix Ribbon