Java nio: неполная передача файла

Я пытаюсь передать большие видеофайлы с клиентов на сервер с помощью java NIO. Кажется, мне нужно использовать NIO, потому что файлы, которые я хочу отправить, намного больше, чем кажущийся предел размера файла с обычным вводом-выводом, который составляет около 2 ГБ ... мои видеофайлы были размером до 50 ГБ каждый. Прямо сейчас я просто пытаюсь создать небольшую программу, чтобы понять концепции. позже он будет добавлен в более крупную программу.

Моя проблема заключается в том, что на сервере сохраняются только первые несколько сотен килобайт файла. Каждый раз, когда я запускаю его, на сервере сохраняются разные данные. Может ли кто-нибудь помочь мне с решением? (и любые другие предложения, которые у вас могут быть... NIO для меня новичок) СПАСИБО!

Вот как это работает:

У клиента будет набор файлов для отправки на сервер. Клиент установит соединение с сервером, и сервер ответит, что готов. Клиент отправляет информацию о заголовке файла. Затем сервер говорит, что готов принять содержимое файла. Затем клиент отправляет содержимое файла. Когда файл полностью передан, он повторяется со следующим файлом до тех пор, пока больше не потребуется отправлять файлы.

Основной клиент

public static void main(String[] args) throws Throwable {
    FileSender fileSender = new FileSender("localhost", 7146);
    fileSender.addFileToSend(new File("C:\\url\\to\\file1.jpg"));
    fileSender.addFileToSend(new File("C:\\url\\to\\file2.jpg"));
    fileSender.addFileToSend(new File("C:\\url\\to\\file3.jpg"));
    fileSender.sendFiles();
}

FileSender

private static String serverAddress;
private static int port;
private static Charset charSet = Charset.forName(System.getProperty("file.encoding"));

private SocketChannel server = null;
private File file;
private RandomAccessFile aFile;
private FileChannel fileChannel;
private long filesize, transmittedSoFar;
private int current;
private ByteBuffer buffer = ByteBuffer.allocate(131072); //128k
private ByteBuffer responseBuffer;
private CharBuffer charBuffer;
private CharsetDecoder charDecoder = charSet.newDecoder();
private Selector selector;
private ArrayList<File> filesToSend = new ArrayList<>(0);
private int fileCountTracker = 0;

FileSender(String serverAddress, int port) {
    FileSender.serverAddress = serverAddress;
    FileSender.port = port;
}

public void sendFiles() {
    try {
        server = SocketChannel.open();
        server.connect(new InetSocketAddress(serverAddress, port));
        server.configureBlocking(false);
        System.out.println("Connected to Server");
        selector = Selector.open();
        server.register(selector, SelectionKey.OP_READ);
        waitForResponse();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

void waitForResponse() throws Exception {
    //TODO: track time. abort loop after 10 sec? 30 sec?
    while (true) {
        System.out.println("waiting for a response from server");
        selector.select();
        Set<SelectionKey> readyKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            iterator.remove();
            if (key.isReadable()) {
                responseBuffer = ByteBuffer.allocate(16);
                server.read(responseBuffer);
                responseBuffer.flip();

                try {
                    charBuffer = charDecoder.decode(responseBuffer);
                    responseBuffer.clear();
                    String response = charBuffer.toString();
                    System.out.println(response);
                    if (response.startsWith("[readyForHeader]")) {
                        System.out.println("Received response: ready for header");
                        sendHeader();
                    }
                    else if (response.startsWith("[readyForBody]")) {
                        System.out.println("Received response: ready for body");
                        sendData();
                    }
                    else {
                        System.out.println("unknown response");
                        System.out.println(response);
                    }
                } catch(Exception e) {
                    System.out.println("error decoding file info");
                    System.out.println(e.getMessage());
                    return;
                }
            }
        }
    }
}

public void addFileToSend(File file) {
    filesToSend.add(file);
}

void sendHeader() {
    System.out.println("Tracker: "+fileCountTracker);
    try {
        if (filesToSend.size() > fileCountTracker) { //still more files to send
            System.out.println("a file exists at this array index");
            this.file = filesToSend.get(fileCountTracker);
            filesize = file.length();
            aFile = new RandomAccessFile(file, "r");
            transmittedSoFar = 0;

            //generate file info buffers to send to server
            byte[] fileInfoBytes = getFileMeta(file);
            ByteBuffer lengthBuffer = ByteBuffer.allocate(4); //length of file info
            lengthBuffer.putInt(0, fileInfoBytes.length);
            System.out.println("Source info length: "+fileInfoBytes.length);
            ByteBuffer infoBuffer = ByteBuffer.wrap(fileInfoBytes); //file info data

            //send file info buffers
            sendByteBuffer(lengthBuffer);
            sendByteBuffer(infoBuffer);
        } else {
            System.out.println("sending zero to indicate no more files");
            ByteBuffer lengthBuffer = ByteBuffer.allocate(4); //length of file info
            lengthBuffer.putInt(0, 0); //tell server sending zero bytes. server will end connection
            sendByteBuffer(lengthBuffer);
            terminate();
        }


    }
    catch (Exception e) {
        e.getMessage();
        terminate();
    }
}

void sendData() {
    try {
        fileChannel = aFile.getChannel();

        while ((current = fileChannel.read(buffer)) > 0 || buffer.position() > 0) {
            transmittedSoFar = transmittedSoFar + (long)current;
            System.out.println(Math.round(transmittedSoFar*100/filesize)+" "+transmittedSoFar);
            buffer.flip();
            server.write(buffer);
            buffer.compact();
        }
        System.out.println("End of file reached..");
        aFile.close();
    } catch (FileNotFoundException e) {
        System.out.println("FILE NOT FOUND EXCEPTION");
        e.getMessage();
    } catch (IOException e) {
        System.out.println("IO EXCEPTION");
        e.getMessage();
    }
    fileCountTracker++;
}

byte[] getFileMeta(File file) throws IOException {
    StringBuffer fileInfo = new StringBuffer();

    BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);

    fileInfo.append(file.getName() + "\n");
    fileInfo.append(file.length() + "\n");
    fileInfo.append(attr.creationTime() + "\n");

    byte[] infoBytes = fileInfo.toString().getBytes();

    return infoBytes;
}

void sendByteBuffer(ByteBuffer bb) throws IOException {
    System.out.println("sending: "+bb.toString());
    server.write(bb);
    bb.rewind();
}

void terminate() {
    try {
        server.close();
        System.out.println("Connection closed");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Главный сервер

public static void main(String[] args) throws Throwable {
    FileReceiver fileReceiver = new FileReceiver(7146);
    fileReceiver.initReceive();
}

FileReceiver

static Charset charSet = Charset.forName(System.getProperty("file.encoding"));
static final Pattern pattern = Pattern.compile("[\n]");//new line
static int port;
static BytesTypeToReceive bytesType;

ServerSocketChannel server;
SocketChannel client;

ByteBuffer byteBuffer, responseBuffer;
CharBuffer charBuffer;
CharsetDecoder charDecoder = charSet.newDecoder();
RandomAccessFile aFile = null;
String fileInfo[];
int headerLength;
long remaining;
Selector selector;

public FileReceiver(int port) {
    FileReceiver.port = port;
}

public void initReceive() {
    try {
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.socket().bind(new InetSocketAddress(port));
        selector = Selector.open();
        server.register(selector, SelectionKey.OP_ACCEPT);
        waitForResponse();
    } catch (Exception e) {
        close();
        e.printStackTrace();
    }
}

void waitForResponse() throws Exception {
    while (true) {
        System.out.println("Waiting for data from client");
        int selCount = selector.select();
        System.out.println("selector count: "+selCount);
        Set<SelectionKey> readyKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            iterator.remove();
            if (key.isReadable()) {
                if (bytesType == BytesTypeToReceive.HEADER) {
                    receiveHeader();
                } else {
                    receiveBody();
                }
            } else if (key.isAcceptable()) {
                client = server.accept();
                System.out.println("Connection established...." + client.getRemoteAddress());
                client.configureBlocking(false);
                bytesType = BytesTypeToReceive.HEADER;
                client.register(selector, SelectionKey.OP_READ);
                sendResponse("[readyForHeader]");
            }
        }
        Thread.sleep(250);
    }
}

private void receiveHeader() {
    System.out.println("Receiving header data");
    byteBuffer = ByteBuffer.allocate(4);

    try {
        //read length
        while (byteBuffer.remaining() > 0) client.read(byteBuffer);
        System.out.println("what is this? "+byteBuffer.toString());
        byteBuffer.rewind();
        System.out.println("and this? "+byteBuffer.toString());
        System.out.println("Info length is " + byteBuffer.getInt(0));

        if (byteBuffer.getInt(0) == 0) {
            System.out.println("no more files. end connection");
            throw new IOException();
        }

        //resize to size indicated in first buffer
        byteBuffer = ByteBuffer.allocate(byteBuffer.getInt(0));

        //read file info
        while (byteBuffer.remaining() > 0) client.read(byteBuffer);
        byteBuffer.flip();

        //decode file info
        try {
            charBuffer = charDecoder.decode(byteBuffer);
            byteBuffer.clear();
             System.out.println(charBuffer.toString());
        } catch(Exception e) {
            System.out.println("error decoding file info");
            return;
        }
        fileInfo = pattern.split(charBuffer);

        System.out.println("info0: "+fileInfo[0]);
        System.out.println("info1: "+fileInfo[1]);

        remaining = Long.parseLong(fileInfo[1]);

        bytesType = BytesTypeToReceive.BODY;
        //tell client ready for file data
        sendResponse("[readyForBody]");
    } catch (Exception e) {
        System.out.println("Exception for checkForData. No more data?");
        System.out.println(e.getMessage());
    }
}

/**
 * Reads the bytes from socket and writes to file
 *
 * @param socketChannel
 */
//private void readFileFromSocket(SocketChannel socketChannel, int infoLength) {
private void receiveBody() throws Exception {
    int current;
    System.out.println("About to receive "+remaining+" bytes.");
    try {
        //read file data
        aFile = new RandomAccessFile("C:\\folder\\to\\save\\to\\"+fileInfo[0], "rw");
        byteBuffer = ByteBuffer.allocate(131072);
        FileChannel fileChannel = aFile.getChannel();
        while (((current = client.read(byteBuffer)) > 0 || byteBuffer.position() > 0) && remaining > 0) {
            remaining = remaining - (long)current;
            System.out.println(current+" "+remaining);  
            byteBuffer.flip();
            fileChannel.write(byteBuffer);
            byteBuffer.compact();
        }
        fileChannel.close();
        aFile.close();      
        System.out.println(current +" - End of file");
        bytesType = BytesTypeToReceive.HEADER;
        sendResponse("[readyForHeader]");
    } catch (FileNotFoundException e) {
        System.out.println("FILE NOT FOUND EXCEPTION");
        e.getMessage();
    } catch (IOException e) {
        System.out.println("IO EXCEPTION");
        e.getMessage();
    } catch (InterruptedException e) {
        System.out.println("INTERRUPTED EXCEPTION");
        e.getMessage();
    }
}
void sendResponse(String response) throws Exception {
    System.out.println("Sending response: "+response);
    byte[] data = response.getBytes("UTF-8");
    responseBuffer = ByteBuffer.wrap(data);
    client.write(responseBuffer);
    responseBuffer.rewind();

}
public void close() {
    try {
        client.close();
        server.close();
        System.out.println("connection closed");
    } catch (IOException e) {
        e.printStackTrace();
    }

}

person kevinric7    schedule 20.10.2015    source источник
comment
Вы можете копировать файлы размером более 2 г со стандартными классами ввода-вывода, но вам нужно делать это частями, а не пытаться читать/записывать весь буфер в один большой массив, как это делалось до NIO.   -  person MadProgrammer    schedule 20.10.2015
comment
@MadProgrammer В моем кругу такого никогда не было :-)   -  person user207421    schedule 20.10.2015
comment
@EJP Что? Вы никогда не использовали InputStream/OutputStream и не читали/не записывали их в byte[]? Вам повезло, вы, должно быть, пропустили Java 1.3 тогда   -  person MadProgrammer    schedule 20.10.2015
comment
@MadProgrammer Непонятно, о чем именно вы говорите. Я никогда не использовал InputStream для чтения всего файла в память. Я не понимаю, при чем здесь Java 1.3. InputStream API не изменился по крайней мере с версии 1.1.2, и в нем нет ничего, что считывало бы целые файлы в память без цикла.   -  person user207421    schedule 20.10.2015
comment
@EJP Я не говорил о чтении всего файла в память (это какое-то сумасшествие: P), я говорил об использовании небольшого буфера byte[] (массив) для чтения/записи небольших фрагментов, а NIO API был представлен в версии 1.4. , так что до этого это был один из способов чтения/записи файлов. Используя этот метод, вы можете читать/записывать файлы любого размера, это было моей точкой зрения. Но, если OP хочет использовать NIO API, у меня нет проблем с этим, я просто хотел исправить замечание о том, что другие API были ограничены 2 ГБ.   -  person MadProgrammer    schedule 20.10.2015
comment
@MadProgrammer Ну, это все еще так же ясно, как грязь. Все точки доступа ограничены 2 ГБ за раз, так как это максимальный размер массива byte[]. Вы должны зацикливаться, будь то java.io,, NIO, асинхронный ввод-вывод, что угодно. Единственным исключением являются API-интерфейсы transferTo/From(), где вам все еще нужно зацикливаться, но единицей передачи является long, а не int, или, аналогично, методы scatter/gather NIO. И OP зацикливается и не использует массивы размером с файл. Действительно суть ускользает от меня.   -  person user207421    schedule 20.10.2015
comment
@ejp Сделайте byte[], может быть, 4k byte[] buffer = new byte[4096];, используйте InputStream#read(byte[]), int bytesRead = is.read(buffer), запишите указанный буфер через OutputStream(byte[], int, int), повторите до тех пор, пока bytesRead не станет -1 ... или в этом случае вы достигнете известного размера файла: P - который вы указали, был отправил неправильно :Р   -  person MadProgrammer    schedule 20.10.2015
comment
спасибо за предложение разбить файл. Однако я также слышал, что каналы nio работают быстрее, чем потоки, поскольку потоки идут побайтно, а канал отправляет кусками. если есть заметная потеря производительности без использования nio, то я действительно предпочел бы не использовать его ... тем более, что мы говорим об огромных видеофайлах.   -  person kevinric7    schedule 20.10.2015
comment
@MadProgrammer Код, который вы разместили, точно соответствует описанию в моем предыдущем комментарии. Я до сих пор понятия не имею, что вы пытаетесь сделать здесь. Мы, кажется, в яростном согласии, что вы всегда должны копировать файлы кусками.   -  person user207421    schedule 16.11.2015
comment
@EJP Круто, я буду жить с этим: P   -  person MadProgrammer    schedule 16.11.2015


Ответы (3)


Это может быть проблемой:

while (((current = client.read(byteBuffer)) > 0

Поскольку ваш сокет настроен неблокирующим, и у вас нет выбора на входе, он быстро потребляет входящие данные и останавливается, поскольку чтение возвращает -1 или 0.

На самом деле клиент имеет аналогичную проблему, но он просто сжигает процессор при попытке отправить данные в перегруженный сокет.

person Zbynek Vyskovsky - kvr000    schedule 20.10.2015
comment
я заменил условие while на простое while (осталось › 0) и поместил client.read внутрь цикла. мне придется играть с клавишами выбора чтения / записи, чтобы он не сжигал процессор, но, по крайней мере, файлы, скопированные полностью. СПАСИБО! - person kevinric7; 21.10.2015

Единственным ограничением размера файла здесь является ваш собственный код. Вы отправляете размер файла как int, в 4 байта.

Используйте длинный.

NB вам не нужно выделять огромные буферы. Вам даже не нужно отправлять размер файла, кроме как для проверки. Вы можете использовать размеры буфера порядка 32-64 КБ. Вы копируете код — это хорошо.

person user207421    schedule 20.10.2015
comment
4-байтовый буфер содержит информацию о длине заголовка, а не сам размер файла. размер файла смешивается с другой информацией, такой как имя файла ... и в будущем я, вероятно, добавлю больше метаданных. В прошлом я пробовал меньшие буферы вместо 128 КБ, но тогда результирующий файл еще меньше. - person kevinric7; 20.10.2015

Проблема здесь:

server.configureBlocking (ложь);

сервер должен находиться в режиме блокировки, потому что вы не должны записывать данные в буфер, когда сокет все еще читает буфер формы данных.

person MADAO    schedule 30.12.2016