Skip to content

take full advantage of the advantage of the concurrency of FileChannel #560

@jixuan1989

Description

@jixuan1989

Because File Channel supports concurrency read. we can use this feature and decouple the file read module.

The attachment is a test for FileChannel:

  1. File Channel native concurrent read;
  2. MappedByteBuffer
  3. Open many file channels for one file.

Results show that File Channel's native concurrency is good enough.

import org.apache.commons.lang3.RandomStringUtils;
import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test2 {
    static String filename = "test.txt";
    public static void main(String[] args) throws IOException, InterruptedException {
        //write();
        int round1 = 5;
        Path path = Paths.get(filename);
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ);
        String[] result=new String[2000];
        for(int i=0; i<result.length; i++){
            result[i]= read1000(i*1000,fileChannel);
        }


        //for mapped byte buffer
        long time1 = System.currentTimeMillis();
        MappedByteBuffer data = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 2000*1000000);
        System.out.println("map 2GB data, time cost:"+ (System.currentTimeMillis()-time1));
        ExecutorService service2 = Executors.newFixedThreadPool(100);
        long time2 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service2.submit(new MyThread2(j, data, result[j]));
            }
        }
        service2.shutdown();
        service2.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time2));


        //for concurrent file channel
        ExecutorService service = Executors.newFixedThreadPool(100);
        long time = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service.submit(new MyThread(j, fileChannel, result[j]));
            }
        }
        service.shutdown();
        service.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time));

        //for open many file channels
        ExecutorService service3 = Executors.newFixedThreadPool(100);
        FileChannel[] channels = new FileChannel[100*2000];
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                channels[i*2000+j] = FileChannel.open(path, StandardOpenOption.READ);
            }
        }
        long time3 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service3.submit(new MyThread(j, channels[i*2000+j], result[j]));
            }
        }
        service3.shutdown();
        service3.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time3));
        for(FileChannel channel: channels){
            channel.close();
        }


        time1 = System.currentTimeMillis();
        Cleaner cl = ((DirectBuffer)data).cleaner();
        if (cl != null) {
            cl.clean();
        }
        System.out.println("clean 2GB data, time cost:"+ (System.currentTimeMillis()-time1));

        fileChannel.close();
    }

    public static String read1000(int offset, FileChannel fileChannel)throws IOException{
        ByteBuffer buffer = ByteBuffer.allocate(1000);
        fileChannel.read(buffer,offset);
        buffer.flip();
        byte[] bytes=new byte[1000];
        buffer.get(bytes, 0, 1000);
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static String read1000fromByteBuffer(int offset, ByteBuffer data)throws IOException{
        byte[] bytes=new byte[1000];
        data.duplicate().get(bytes, offset, 1000 );
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static void write() throws IOException {
        Path path = Paths.get(filename);
        if(!Files.exists(path)){
            Files.createFile(path);
        }
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        for(int i = 0 ; i <2000; i++) {
            fileChannel.write(ByteBuffer.wrap(RandomStringUtils.randomAlphabetic(1000000).getBytes()));
            System.out.println(i);
        }
        fileChannel.close();
    }


    public static class MyThread implements  Runnable{
        int num;
        FileChannel channel;
        String result;
        MyThread(int num, FileChannel channel, String result){
            this.num=num;
            this.channel=channel;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000(num * 1000, channel);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * for mapped byte buffer
     */
    public static class MyThread2 implements  Runnable{
        int num;
        ByteBuffer data;
        String result;
        MyThread2(int num, ByteBuffer data, String result){
            this.num=num;
            this.data=data;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000fromByteBuffer(num * 1000, data);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>

</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>

</build>
</project>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions