Overview
This is an interview style question. However it is harder than you would get in an interview as it would take you a while to think the problem through.
The task is to determine how you might sort a very large file which does not fit in memory, but takes into account the impact of IO bandwidth on the solution.
The Question
Write a utility to sort a file with up to 1 trillion 64-bit floating point values. Assume there is 12 TB free space in the file system which the property "java.io.tmpdir" is set to.
Consider how your solution might take advantage of a server with more CPUs or memory. The number of open files will be limited. e.g. 1024.
Show how you might estimate the time it would take to sort this file
An answer.
In this solution, the file which is 8 TB (1 trillion times 8-bytes) will be sorted in memory in parts. In this solution, there are too many parts to open at once, so these files are merge sorted into a smaller number of large files and merge sorted into the final sorted file.
This means that all the data is read and written three times. If you have a high performance disk subsystem which can sustain 2 GB/sec reads and 1 GB/sec writes and a server which can process 60 GB/sec of data. The total time taken might be 200 mins (reads), 400 mins (writes) and 400 sec (processing). If these can be perform concurrently, the sort will take about 400 mins, however it is more likely these times will add up and it will take about 10 hours. The only way to know is to test it.
| How the data is shuffled in and out of memory and the performance of the disks are more important than the processing done in the CPU. For data sets in memory, the number of comparisons is critical, however for larger data sets, its is disk usage which matters. |
import java.lang.management.ManagementFactory; import java.util.Collection; public class DoubleSort { private static final int THREADS_PER_PROCESSOR = Integer.getInteger("threads.per.processor", 4); private static final int OPEN_FILES_LIMIT = Integer.getInteger("open.files.limit", 900); public static void main(String... args) { if (args.length < 1 || args.length > 2) { System.err.println("Usage: java " + DoubleSort.class.getName() + " filename [work directory]"); } String filename = args[0]; String workdir = args.length > 1 ? args[1] : System.getProperty("java.io.tmpdir"); // use the number of processors to determine how many jobs to perform concurrently. int processors = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); // as the application will spend most of its time reading and writing data, use about 4 threads per CPU. int concurrentJobs = processors * THREADS_PER_PROCESSOR; // determine how much of the file to load into memory for each job. long memorySize = Runtime.getRuntime().maxMemory(); long targetBlockSize = memorySize / concurrentJobs / 2; // to ensure we use no more than half of max memory. int blockSize = 256; for (int i = 8; i < 31; i++) { blockSize = 1 << i; if (blockSize * 2 > targetBlockSize) break; } Collection<String> sortedPartitions = sortFileIntoBlocks(filename, workdir, concurrentJobs, blockSize); int openFilesPerThread = OPEN_FILES_LIMIT / processors; while(sortedPartitions.size() > OPEN_FILES_LIMIT) sortedPartitions = mergeSortInBatches(sortedPartitions, openFilesPerThread, processors); mergeSort(sortedPartitions, filename, processors); } // pseudo-code. private static Collection<String> sortFileIntoBlocks(String filename, String workdir, int concurrentJobs, long blockSize) { // Create an Executor with a thread pool of concurrent jobs. // Each task should read and sort a partition of the file with a block size of blockSize/8 double values. // return a collection of the sorted partition files created. // shutdown the executor. throw new UnsupportedOperationException(); } // pseudo-code. private static Collection<String> mergeSortInBatches(Collection<String> sortedPartitions, int openFilesPerThread, int concurrentJobs) { // Create an Executor with a thread pool of concurrent jobs. // Each task should read and sort openFilesPerThread files at a time, deleting files which have been sorted. // return a collection of the sorted partition files created. // shutdown the executor. throw new UnsupportedOperationException(); } // pseudo-code. private static void mergeSort(Collection<String> sortedPartitions, String filename, int concurrentJobs) { // for each concurrent job, merge sort double values from a portion of the sortedPartitions. // have another thread taking results via a Queue to write these to the filename. // this is done so that the application is unlikely to stop while one of the partitions is waiting to be read. // this part is likely to be limited by the speed the disk subsystem can push data to disk once the write-cache has been exhausted. // delete all sources. throw new UnsupportedOperationException(); } }
If anyone is interested, I might have time to write the full implementation.
Comments (2)
Dec 23, 2008
Anonymous says:
yes, please write all codeyes, please write all code
Jan 29, 2009
Anonymous says:
Have you find time to write the full implementation. Can you please post the ful...Have you find time to write the full implementation. Can you please post the full implementation
Add Comment