5864 words
29 minutes
Real-Time Processing of Tens of Millions of Big Data
NOTE

All sensitive information has been removed, and some corporate technologies are not open-source, only non-confidential portions are described as examples within the business context.

Foreign Function & Memory API#

The Java Foreign Function & Memory API (FFM API) was introduced as a preview feature in JDK 19 under JEP 424 and became stable starting with JDK 22. From this point on, the FFM API will remain consistent without significant changes. Historically, the Java Platform has offered a wealth of Java APIs to facilitate interaction between the JVM and other platforms for library and application developers. Whether accessing remote data (JDBC), invoking network services (HttpClient), serving remote clients (NIO), or communicating with local processes (Unix domain sockets), Java APIs have consistently provided convenient and reliable non-intrusive resources. However, Java developers have long faced significant challenges when accessing off-heap memory.

Off-heap Memory#

Memory data stored outside the Java runtime is referred to as off-heap memory, contrasted with on-heap memory, which is managed by the Garbage Collector (GC). While the performance of on-heap memory access and garbage collection is already highly optimized for most typical Java applications, the performance of accessing and managing off-heap memory is critical for popular Java libraries like gRPC, TensorFlow, Ignite, Lucene, and Netty.

Using off-heap memory eliminates GC overhead and unpredictability, while enabling serialization and deserialization of file data structures through memory-mapped files (e.g., MappedByteBuffer). For instance, gRPC maintains off-heap memory to cache thread and global data at a low level. Channel data is stored in off-heap memory for the entire TCP connection lifecycle, releasing memory only when the connection closes. This design avoids GC-induced stalls in high-concurrency and long-lived connection scenarios, preventing numerous strong references from accumulating on the heap, which could trigger frequent and disruptive GC cycles. By reducing heap memory pressure and GC pauses under high concurrency, performance and resource utilization are significantly improved.

However, this approach introduces a challenge: when a connection malfunctions (e.g., hangs or experiences network failure), gRPC requests sent through the connection may not receive TCP acknowledgments. This causes subsequent requests to pile up in TCP and off-heap memory queues. Eventually, thread-local caches for the TCP connection fill up, triggering the allocation of global caches. The result is unbounded off-heap memory growth, which, when fully exhausted, leads to DirectOOM errors for new requests. Before Java 19, there was no satisfactory solution for accessing off-heap data.

In earlier versions of Java, developers could allocate off-heap memory using the ByteBuffer API to create direct byte buffers. However, these buffers were constrained by a 2GB size limit and were not promptly released. These limitations stemmed from the API’s original design purpose: not only for off-heap memory access but also for exchanging large data chunks in character encoding, decoding, and some I/O operations. This design compromise led to numerous enhancement requests for off-heap memory (4496703, 6558368, 4837564, 5029431) being unmet for years.

Unsafe#

The sun.misc.Unsafe API offers operations for accessing both on-heap and off-heap memory data. Its memory access operations are defined as intrinsic functions in the HotSpot JVM and benefit from JIT optimizations, making them highly efficient. However, using Unsafe is extremely risky because it allows unrestricted access to any memory location. This can lead to issues like JVM crashes caused by accessing freed memory (e.g., Dangling Pointer). As a result, Java strongly advises against using Unsafe.

Some developers resort to using JNI (Java Native Interface) to call native libraries for off-heap memory access. However, this approach incurs significant performance overhead because JNI method calls cannot benefit from JIT optimizations (such as inlining). The transition from Java to JNI is orders of magnitude slower than direct memory access.

This dilemma leaves Java developers with a tough choice when dealing with off-heap data: should they opt for the safe but inefficient approach (ByteBuffer), or compromise safety for higher performance (Unsafe)? What developers truly need is a native API that provides safe access to off-heap memory while leveraging JIT optimizations for performance.

Foreign Functions#

Since Java 1.1, JNI (Java Native Interface) has supported calling native code, but it comes with several limitations:

  • Complex Artifacts: JNI involves multiple cumbersome artifacts: the Java API (methods), C header files derived from the Java API, and the C implementation required for invoking the native library. Java developers must juggle multiple toolchains to keep platform-specific artifacts in sync, which becomes particularly burdensome when the native library evolves rapidly.
  • Limited Interoperability: JNI can only interoperate with libraries using the operating system and CPU conventions, typically written in C or C++. It cannot call functions using alternative calling conventions.
  • Type System Inconsistency: JNI does not unify the Java and C type systems. Aggregated data in Java is represented as objects, while in C, it is represented as structs. As a result, any Java object passed to a native method must have its fields parsed manually by native code. To address this, developers sometimes compress data into a single object (e.g., a byte array or direct byte buffer). However, more often than not, developers use the Unsafe API to allocate off-heap memory and pass its address to native methods to avoid JNI’s slower Java object handling—an approach fraught with safety risks.

Although several frameworks, such as JNA, JNR, and JavaCPP, have emerged to address JNI’s shortcomings, they still fall short compared to languages that offer native interoperation more seamlessly. For example, Python’s ctypes module dynamically wraps functions from native libraries without requiring additional code. Similarly, Rust provides tools that can automatically generate native bindings from C/C++ header files.

The Java team believes that developers should have access to a supported API that allows them to directly utilize any native library without the need for cumbersome “glue code” or JNI’s bulky design. Method handles were introduced in Java 7 to support dynamic languages on the JVM, and they envisioned leveraging method handles to expose native code. This approach would significantly simplify the task of writing, building, and distributing Java libraries that depend on native libraries.

Moreover, an API capable of modeling external functions (native code) and external memory (off-heap data) would provide a robust foundation for third-party native interoperability frameworks.

Different from Truffle Framework#

Anyone familiar with GraalVM’s cross-platform compilation capabilities is likely acquainted with the Truffle Framework, which also supports interoperability between Java and external languages such as C or C++. However, the Truffle Framework operates by processing other languages through locally installed gu packages to create lexical tree structures. These structures are then transformed into abstract syntax trees (ASTs) that GraalVM can effectively utilize. GraalVM compiles these ASTs into highly optimized machine code.

Unlike embedding interpreters for other languages directly (e.g., embedding a C interpreter in CPython to execute Python code), Truffle avoids the overhead of cross-language interpreter execution. This approach allows GraalVM to not only run Java efficiently but also execute code written in other languages while enabling cross-language optimization, ultimately converting everything into high-performance native machine code.

From a functional perspective, Truffle operates fundamentally differently from the Foreign Function mechanism.

FFM API#

The Foreign Function and Memory API (FFM API) defines a set of classes and interfaces that allow client code in libraries and applications to enable various functionalities:

  • Allocate external memory (MemorySegmentMemoryAddressSegmentAllocator)
  • Manipulate and access structured external memory (MemoryLayoutVarHandle)
  • Control allocation and deallocation of external memory (MemorySession)
  • Invoke external functions (LinkerFunctionDescriptorSymbolLookup)

Basic Business Context#

Business Background#

The Medical Research Platform is a specialized system designed for leading domestic hospitals to facilitate the sampling and analysis of scientific research files. It aims to support data collection, storage, and analysis requirements in hospital research activities. The platform integrates real-time collection of imaging or monitoring files such as CT, DR, MRI, and ECG from hospital systems, linking them with corresponding operator, physician, or graduate student identities before securely storing them in a unified database. This ensures robust data traceability and secure management. Furthermore, the platform offers end-to-end functions for the aggregation, summarization, and analysis of research materials and data, providing researchers with an efficient solution for handling various types of medical data.

For patient case files, the platform middleware extracts each characteristic of the case as independent data entries before storage and writes them to local files (e.g., in .txt or .csv formats). Researchers can submit tasks via the platform’s frontend to perform feature summarization and analysis of the locally stored case data. These tasks demand real-time responses, requiring the platform’s backend to exhibit the fastest possible processing capabilities.

For a typical large-scale hospital, approximately 15,000 new cases are recorded daily. Each case contains 5 to 20 data points, resulting in 75,000 to 300,000 data entries generated daily. Annually, the number of data entries grows by approximately 68.44 million.

When the system experiences peak QPS while processing such massive data, the server’s load capacity is heavily strained. This necessitates urgent optimization and upgrades to the data analysis solution to enhance the platform’s performance and ensure rapid response times.

The following steps outline the team’s efforts to address these challenges:

  1. Architecture Review and Optimization
    • Comprehensive review of existing data processing workflows.
    • Re-architecting the data pipeline to handle high-concurrency scenarios.
  2. Backend Optimization
    • Implementing more efficient algorithms for feature extraction and analysis.
    • Introducing memory-mapped file processing for faster access to local case data.
  3. Scaling for Growth
    • Designing scalable storage solutions to accommodate increasing data volumes.
    • Employing caching mechanisms to reduce query latency during high-traffic periods.
  4. Performance Enhancements
    • Leveraging multi-threading and parallel processing to improve throughput.
    • Deploying rate-limiting mechanisms to prevent overload during peak usage.

By adopting these measures, the platform is expected to achieve faster response times, maintain stable operations during peak usage, and provide researchers with reliable, efficient tools for medical data analysis.

Solution Selection#

For a big data processing scenario with high real-time requirements, choosing between a Hadoop cluster or FFM API + Unsafe + GraalVM Native Image (Native Plan) each has its pros and cons. The team needs a detailed analysis from the perspectives of cost, development difficulty, future maintenance difficulty, and system stability:

  1. Hadoop Cluster (preferably 3 nodes)
    • Economic Cost:
      • Hardware Costs: If the node is configured with 64GB memory, 8-core CPU, and 2TB storage, the cost per node ranges from 3000 to 8000 CNY. If high data persistence is required, the number of storage nodes may increase.
      • Network Costs: Data communication between the Hadoop cluster’s data nodes is frequent, and to ensure fast data transfer within the cluster, it’s typically recommended to use networks with gigabit or higher bandwidth. The cost for each node’s switch and networking equipment is around 800 CNY.
      • Operational Costs: Hadoop requires dedicated operations personnel to manage cluster health, node status, and expansion plans, with an estimated monthly salary of 8000 to 15000 CNY per person.
    • Implementation Difficulty:
      • Architecture Difficulty: Hadoop clusters typically use the open-source Hadoop framework, which has extensive community support. Enterprise-grade Hadoop distributions (such as Cloudera, Hortonworks) can be chosen for higher stability.
      • Development Difficulty: The development difficulty is moderate. Hadoop and its ecosystem components (such as Hive, Spark, HBase) provide comprehensive tools and APIs for distributed data processing, with mature development documentation and examples.
      • Maintenance Difficulty: Cluster management may require a dedicated operations team, especially in areas like expansion, performance optimization, and ensuring high availability. Maintenance also involves monitoring the health of the cluster and data node status. While challenging, there are many stable monitoring and management tools available in the Hadoop ecosystem.
    • System Stability: Hadoop clusters are relatively stable in big data processing, suitable for scenarios requiring high throughput rather than extremely low latency. For high-concurrency write and read operations, Hadoop’s distributed storage and computing framework ensures good scalability and stability.
    • Real-Time Limitations: Hadoop clusters (especially MapReduce) are more suitable for batch processing tasks and are less ideal for real-time processing with extremely low latency. Although frameworks like Spark Streaming can provide near-real-time processing, the latency is still higher than in-memory processing.
  2. Native Plan
    • Economic Cost:
      • Hardware Costs: Requires multi-core, large memory support. A configuration with 32GB memory, 12-core 3 GHz Intel CPU, and 1TB SSD (for storing real-time computation cache files) costs around 5000 CNY.
      • Network Costs: Integrated within the program, no frequent network I/O, thus no network cost.
      • Operational Costs: Maintained by personnel responsible for this program. Since the FFM API is stable and unlikely to undergo significant changes, the maintenance cost is relatively low.
    • Implementation Difficulty:
      • Architecture Difficulty: Runs on a single machine, with no architecture complexity.
      • Development Difficulty: FFM API and Unsafe require developers to have high-level knowledge of low-level memory management and concurrency control, making development more difficult. Incorrect use of Unsafe and FFM API can lead to memory leaks or system crashes, raising the development threshold.
      • Maintenance Difficulty: Involves off-heap memory management and manual memory release, which may cause hard-to-trace issues like memory leaks or dangling pointers. Full control over memory allocation and release is necessary, and regular performance and security checks are required.
    • System Stability: The FFM API is stabilized in JDK 22, with the team developing using JDK 23. The FFM API now supports both active and passive memory release, reducing the memory management difficulty compared to older JDK versions, ensuring good system stability.
    • Real-Time Limitations: This solution excels in real-time performance and response speed but comes with higher development and maintenance difficulty, making it suitable for teams with rich low-level technical experience.

Given the team’s background, as all business operations are independently developed via GraalVM 23 and AWS Serverless, with strong development advantages on the GraalVM platform and extensive experience in K8S architecture, MySQL optimization, and Kafka architecture, the preferred choice is the Native Plan.

Implementation and Optimization#

File Fragmentation#

In traditional Hadoop big data processing cluster systems, the MapReduce paradigm is commonly used to split, process, and summarize large files, rather than handling files via multi-threading on a single machine. The main reasons for this are as follows:

  1. Limitations of Processing Large Files:
    • Single-machine resources (CPU, memory, disk I/O, etc.) are limited. For large files (e.g., data on the terabyte scale), multi-threading on a single machine is often inefficient, especially when memory is insufficient or I/O load is high.
    • A cluster can provide more computing and storage resources than a single machine, allowing multiple file blocks to be processed simultaneously, making large file handling more scalable.
  2. Parallelism and Scalability of Distributed Computing:
    • Hadoop achieves high parallelism by splitting data and distributing it across different nodes in the cluster. This allows data processing speed to increase linearly as the number of cluster nodes grows.
    • The design of MapReduce allows the cluster to dynamically scale (by adding more nodes), speeding up processing. In contrast, multi-threading on a single machine is limited by hardware resource constraints.
  3. Reducing Single Points of Failure and Improving Fault Tolerance:
    • In a cluster environment, data is split and stored across multiple nodes. Hadoop’s HDFS (Hadoop Distributed File System) provides data replication, so even if a node fails, copies of the data on other nodes can take over, ensuring the task completes smoothly.
    • With multi-threading on a single machine, any failure of a thread or process can cause the entire task to fail, leading to poor fault tolerance.
  4. Data Locality and Network I/O Optimization:
    • Hadoop’s design follows the principle of “bringing computation to the data location,” meaning that computational tasks are executed on the node where the data is stored, avoiding data transfer over the network and reducing network I/O. Each node processes only its local file block, reducing data transfer overhead.
    • In this way, network I/O mainly occurs during the reduce phase, while the map phase is mostly local, thus minimizing overall network load.
  5. Task Scheduling and Load Balancing:
    • Hadoop’s MapReduce task scheduler can assign data blocks to nodes with lower load, optimizing cluster resource utilization. This dynamic scheduling provides significant advantages in a distributed environment, while single-machine multi-threading can only balance tasks among a limited number of CPU cores, with limited load-balancing capability.

Does the Native Plan Also Require File Splitting Like Hadoop Clusters? It is important to clarify that the Native Plan utilizes Java’s direct memory, which resides in RAM. Files are loaded into memory all at once for processing. This is memory-oriented, and referring to the average read speed table for various cache layers:

Level (Bottom to Top)Average Read Time
Register0.3ns
L1 Cache0.9ns
L2 Cache2.8ns
L3 Cache12.9ns
RAM120ns
SSD50~150µs
HDD1~10ms

In data analysis and processing applications, RAM read speeds are much faster than SSD and HDD. Therefore, if there is enough memory, loading a large file into RAM all at once for processing is a more efficient choice. It has been verified that the size of any file to be processed will not exceed 18GB, meaning that with servers equipped with 32GB or more of RAM, the entire file can safely be loaded into memory, thus utilizing RAM’s high read/write speed to boost processing efficiency.

Using Native technology allows for fast data reading and analysis without relying on file splitting or distributed clusters. Unlike Hadoop, which distributes I/O pressure through splitting and clustering, directly loading files into memory eliminates the data transfer and coordination overhead between multiple nodes, avoiding performance bottlenecks caused by network I/O limitations. In this way, data analysis and processing are fully managed by a single server, ensuring that system resources are fully utilized to achieve the fastest processing speeds.

Hosting data analysis applications on a single server not only simplifies the system architecture but also ensures efficient use of CPU and memory resources, allowing file processing speeds to reach their peak. This method is especially suitable in environments with sufficient memory, as concentrating all computing resources on a single server maximizes hardware performance, achieving the fastest possible file loading, parsing, and analysis with minimal latency.

Memory Mapping#

In JDK 19 or earlier versions, memory mapping could only be done through MappedByteBuffer, and manual management and release (clean) of this memory were required.

In JDK 19 and later, the FileChannel::map method allows you to map a specified file into memory starting from the offset position, in FileChannel.MapMode.READ_ONLY read-only mode, and the file’s size is determined by FileChannel::size. The memory-mapped file’s lifecycle is managed by a custom java.lang.foreign.Arena object. This returns a java.lang.foreign.MemorySegment object, which is used to manage the file in memory.

// try-with-resource
try (var fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
    final long fileSize = fileChannel.size();
    final MemorySegment segment = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global());
    // get file start and end address
    final long fileStart = segment.address();
    final long fileEnd = fileStart + fileSize;
}

Memory Addressing#

When reading file data into memory, to fully leverage the parallel processing capabilities of multi-threading, each thread processes a specific region of memory, achieving division of labor. Specifically, after the file data is loaded into RAM, each thread obtains the starting and ending addresses of a memory data block, and processes and analyzes the data in parallel within its independent memory space. This ensures that all threads maximize CPU resource utilization, speeding up the overall processing.

Each thread independently calculates its task result. After completion, the results are collected and merged to produce the global statistical analysis. This parallel mechanism significantly enhances the speed and efficiency of processing large files, especially on a single high-performance server. Unlike MapReduce, there are no network transfer overheads or file fragment management delays between distributed nodes. In the Native solution, all operations are performed in local memory, eliminating network I/O and the potential latency caused by distributed file systems, achieving low-latency, high-performance local processing.

In Native mode, data processing depends not only on the parallel mechanism of multi-threading but also requires fine-grained manipulation of memory data. Since the file is fully loaded into memory, each thread can directly access data blocks via memory addresses and perform precise control and calculations on memory values using bitwise operations (such as masking, complement, and shifting). This memory address-level operation is similar to fine-grained control of pointers and memory in C++, allowing for extremely high performance and flexibility. For example, bitmasking can efficiently filter specific bits of data, while shifting operations can quickly compute or extract data, enabling precise and efficient analysis and processing. This approach makes Native mode more accurate in memory processing, providing deep control over data reading and computation, thereby achieving rapid analysis and processing of large-scale data.

Take a 12-core CPU as an example, and assume the format of a single patient record is: <Patient Name>;<Body Temperature, Celsius, 1 decimal>;<Blood Pressure, mmHg, positive integer>;<Spinal Tap Cell Count, cells/L, positive integer>;<Blood Test, cTnT (ng/mL), 2 decimals>;<Blood Test, CEA (μg/L), positive integer>, with the spinal tap cell count being in the magnitude of 10610^6. In this environment, fine-grained thread arrays can be used to execute tasks, avoiding the overhead of thread pool management and further reducing garbage collection (GC) overhead caused by Java’s heap memory. This thread management approach also offers several other advantages:

  1. Threads created via thread arrays are independent, allowing for more direct control of each thread’s execution.
  2. In high-performance or low-latency scenarios, creating a small number of independent threads and controlling them more precisely is preferable to relying on thread pool scheduling.
  3. After tasks are completed, all threads naturally exit without occupying resources for long periods.
  4. Using a thread pool introduces additional management overhead, such as task queues, idle thread waiting, and thread recycling. For a small number of short-lived threads, using thread arrays directly avoids these redundant issues.
  5. The thread array implementation directly starts threads and runs tasks, eliminating the need for task queues and waiting mechanisms in thread pools, reducing such overhead.
  6. The main advantage of a thread pool is thread reuse, but in this business case, each thread’s task is independent and does not need reuse, making the thread pool’s reuse mechanism unnecessary.
  7. For one-time execution and small-scale concurrent tasks with a fixed number of threads, using a thread array is usually sufficient and more efficient.

To facilitate the verification of subsequent code, we provide some simple test data. The test data is small, so the provided code examples do not use multi-threading or memory segmentation addressing. Readers interested in this topic can further explore it on their own.

Data Analysis#

Data Segmentation#

In this case, the file content is loaded into memory through FileChannel, and it is mapped to a MemorySegment object in the program for efficient offset-addressing operations. The test data is ASCII-encoded (1 byte per character), and within the MemorySegment, each read operation corresponds to 8 bytes (64 bits). Using direct bitwise operations for data processing is more efficient than methods based on String or extensive conditional checks.

During the processing, the data is divided into tasks of 256 bytes (which can be adjusted based on actual business needs), encapsulated in a thread-safe Segment class. It is important to note the following:

  • Each data segment does not necessarily start or end at the boundary of a complete data unit (e.g., a newline character: \n). A specific data structure can be represented as follows:
    +------------+--------+----------------------------------+--------+
    | head       | headBR | -                                | tailBR |
    +------------+--------+----------------------------------+--------+
    | characters | \n     | characters containing semicolons | \n     |
    +------------+--------+----------------------------------+--------+
    • head: The starting position of the data segment.
    • headBR: The starting position of the first complete data unit within the segment.
    • tailBR: The ending position of the last complete data unit within the segment.
  • The getNearestBRMask method is used to search for the nearest newline character, either forward or backward, ensuring that the beginning and end of the segment correspond to the start and end of complete data units.
private static long getNearestBRMask(long bytePos) {
   while (bytePos < Segment.SEGMENT_TOTAL_BYTES) {
      // get 8 bytes (64 bits) after the position of currBytePos
      long currentWord = Segment.SEGMENT.get(ValueLayout.JAVA_LONG, bytePos);
      // calculate the newline character mask
      long mask = (currentWord ^ 0x0A0A0A0A0A0A0A0AL) - 0x0101010101010101L;
      mask &= ~currentWord & 0x8080808080808080L;
      // convert mask to byte indices
      if (mask != 0) return bytePos + (Long.numberOfTrailingZeros(mask) >>> 3);
      // addressing continues in steps of 8 bytes
      bytePos += 8;
   }
   return Segment.SEGMENT_TOTAL_BYTES;
}

By implementing getNearestBRMask, each data segment of the Segment (head, headBR, tailBR) can be expressed in the constructor:

@ThreadSafe
Segment(final AtomicLong cursor) {
    if (SEGMENT == null) throw new Error("Segment hasn't been loaded.");
    this.head = cursor.addAndGet(SEGMENT_SIZE) - SEGMENT_SIZE;
    this.headBR = this.head == 0 ? 0 : getNearestBRMask(this.head) + 1;
    this.tailBR = getNearestBRMask(Math.min(SEGMENT_TOTAL_BYTES - 1, this.head + SEGMENT_SIZE));
}

The final AtomicLong cursor further confirms which 256-byte data segment in the MemorySegment this particular segment corresponds to. Designing it as an atomic class allows for more efficient concurrent operations, avoiding segmentation errors.

Meanwhile, MemorySegment offers two ways to read data, both of which are inline optimized by the JVM at compile time, but there are differences in how data is retrieved:

  1. get(AddressLayout layout, long offset)
    • Locates based on byte offset.
    • Requires the offset to be a multiple of 8 for memory alignment.
  2. getAtIndex(AddressLayout layout, long index)
    • Locates based on a logical index and the layout byte size (AddressLayout).
    • Does not require the index to be a multiple of 8.

Here, the get method is preferred for its stability and accuracy in addressing. This addressing method is more akin to a sliding window algorithm, where a window of 8 bytes starts searching from each index position that is a multiple of 8. This approach not only leverages memory alignment and bit operations to improve efficiency but also avoids excessive string operations and conditional checks. It is known that the significant data delimiter in each segment is a semicolon (0x3B), and using the principle of calculating the newline character mask, a similar mask can be designed to calculate the positions of semicolons within the 8-byte window.

// get the mask of semicolon in a word
private static long getSemicolonMask(final long word) {
    final long input = word ^ 0x3B3B3B3B3B3B3B3BL;
    return (input - 0x0101010101010101L) & ~input & 0x8080808080808080L;
}

However, a segment of text may contain multiple semicolons (small data within an 8-byte segment) or none at all (such as a patient’s name that is longer than 8 bytes). Extracting the small data from each segment and organizing it properly is a crucial step for subsequent data analysis.

Here, we can use the mask filtering method and recursion to help us extract the required valid data. By using the 0xFF mask and its inverse, we can filter the necessary data. For example, in the small segment Joe;36.0;91;81;0.15, the first 8 bytes (Joe;36.0) correspond to the binary (word1), and the semicolon mask (seMask1) obtained after applying getSemicolonMask is:

       J        o        e        ;        3        6        .        0
01001010 01101111 01100101 00111011 00110011 00110110 00101110 00110000 word
00000000 00000000 00000000 10000000 00000000 00000000 00000000 00000000 seMask

The highest bit of seMask1 indicates the position of the semicolon byte. To further convert this into the corresponding filter mask, we need to know the index of the byte where this highest bit 1 is located. The method Long.numberOfLeadingZeros(seMask) >>> 3 can quickly achieve this effect. The result is 4 (counting from the low end to the high end, with the index being 4). At the same time, an 0xFF mask array can be designed based on the index:

private static final long[] MASK = new long[] {0xFFFFFFFFFFFFFFFFL, 0xFFFFFFFFFFFFFFL, 0xFFFFFFFFFFFFL, 0xFFFFFFFFFFL,
        0xFFFFFFFFL, 0xFFFFFFL, 0xFFFFL, 0xFFL, 0x00L};

To retain only the content before the semicolon, we can first extract the mask that keeps only the semicolon and everything after it, then invert it, and finally perform a bitwise AND operation to filter out the required part (capturedWord):

// Determine the position of the semicolon byte
final int leadingSemicolon = Long.numberOfLeadingZeros(seMask) >>> 3;
// Use the mask to capture the portion before the semicolon (excluding the semicolon)
final long capturedWord = ~MASK[leadingSemicolon] & word;

At the binary level, they execute as follows:

         J        o        e        ;        3        6        .        0
  01001010 01101111 01100101 00111011 00110011 00110110 00101110 00110000 word
  00000000 00000000 00000000 11111111 11111111 11111111 11111111 11111111 MASK
  11111111 11111111 11111111 00000000 00000000 00000000 00000000 00000000 ~MASK

  01001010 01101111 01100101 00111011 00110011 00110110 00101110 00110000 word
& 11111111 11111111 11111111 00000000 00000000 00000000 00000000 00000000 ~MASK
-------------------------------------------------------------------------
  01001010 01101111 01100101 00000000 00000000 00000000 00000000 00000000 capturedWord
=        J        o        e

After this round of operations, the characters before the semicolon can be directly extracted. Finally, using the recursive approach, after each extraction, the remaining content is updated in word, and the above process is repeated to extract all the content between each semicolon. The specific code is as follows:

private static void analyzeBlock(String block) {
   final long[] results = new long[1 << 3]; // 一个 block 占 8 个字节最多 6 项数据
   int cursor = 0;
   long word = Long.parseUnsignedLong(block, 2);
   while (cursor < results.length) {
      long semicolonMask = getSemicolonMask(word); // 快速找到 word 的分号位置的掩码
      if (semicolonMask != 0) {
         final int leadingSemicolon = Long.numberOfLeadingZeros(semicolonMask) >>> 3; // 确定分号字节的位置
         final long capturedWord = ~MASK[leadingSemicolon] & word; // 利用掩码捕获分号的前面部分(不包括分号)
         word = MASK[leadingSemicolon + 1] & word; // word 更新为 leadingSemicolon 的后面部分(不包括分号)
         results[cursor] = capturedWord; // 存储捕获的字节
         cursor++;
      } else {
         results[7] = word; // 如果整个 word 没有包含分号就整个存储
         break;
      }
   }
   // 输出保存的结果进行验证
   for (long result : results) {
      System.out.print(Util.longToAscii(result));
   }
}

Analysis Workflow#

Now that the data segmentation work is complete, there are still some issues to consider:

  1. How to determine if the patient’s name spans across segments (with part of it at the end of one segment and part at the beginning of the next) and how to concatenate it.
  2. How to determine the location of the patient’s name and retrieve a certain number of small data points to instantiate and store them as objects.
  3. The data is still stored in the form of long values. How to properly convert these values for decimals and integers.

The solutions to the first two issues are quite varied, and there is no significant difference in efficiency between them (checking for newlines and performing forward/backward segment searches). Here, we will focus on the more complex issue 3, particularly for decimal numbers. The key characteristic of decimal numbers is that they will always contain a decimal point (binary: 00101110). Using this advantage, we can quickly differentiate between integer and decimal parts. The specific implementation method is as follows:

private static long scanNumber(Scanner scanPtr) {
   // Read a long (i.e., 8 bytes) from the current scan pointer as a digit byte sequence.
   long numberWord = scanPtr.getLongAt(scanPtr.pos() + 1);

   // Find the position of the decimal separator (dot) in the byte using the mask 0x10101000L.
   // Perform a bitwise AND with the numberWord and negate to find the first bit set to 1.
   // Use `Long.numberOfTrailingZeros` to find the number of trailing zeros at the end of the byte.
   int decimalSepPos = Long.numberOfTrailingZeros(~numberWord & 0x10101000L);

   // Call convertIntoNumber method to convert the decimal point and the numeric part into a long.
   long number = convertIntoNumber(decimalSepPos, numberWord);

   // Update the scan pointer position, skipping the length of the current number (decimal part position + 4 bytes).
   scanPtr.add((decimalSepPos >>> 3) + 4);

   // Return the converted number.
   return number;
}

// Convert ASCII encoded digits into integer numbers (avoiding if statements to prevent extra overhead).
private static long convertIntoNumber(int decimalSepPos, long numberWord) {
   // Calculate the decimal point position offset and align it to the standard position.
   int shift = 28 - decimalSepPos;

   // Determine the sign bit (signed = -1 for negative numbers, signed = 0 for positive numbers).
   // Extract the sign bit (bit 59 of numberWord) to perform sign extension.
   long signed = (~numberWord << 59) >> 63;

   // Design a mask to ensure the negative sign bit (ASCII '-' = 0xFF) is ignored.
   long designMask = ~(signed & 0xFF);

   // Align the number to a specific position, retain the valid numeric part, 
   // and convert ASCII digits to corresponding values (0-9).
   long digits = ((numberWord & designMask) << shift) & 0x0F000F0F00L;

   // The digits format looks like: 0xUU00TTHH00, 
   // where UU represents the unit digit, TT represents the tens, and HH represents the hundreds.

   // Calculate the absolute value of the number:
   // 0xUU00TTHH00 * (100 * 0x1000000 + 10 * 0x10000 + 1)
   // = 0x000000UU00TTHH00 + 0x00UU00TTHH000000 * 10 + 0xUU00TTHH00000000 * 100
   long absValue = ((digits * 0x640a0001) >>> 32) & 0x3FF;

   // Return the actual value (restore the sign using XOR and subtraction; if signed = 0, no change; if signed = -1, negate the result).
   return (absValue ^ signed) - signed;
}

Now that the necessary data analysis tasks are completed, any further computations (sum, average, min, max) can be done at the array level. However, a faster approach would be to directly update the result array during the object instantiation process:

private void accumulate(Result other) {
   if (other.min < min) { // calculate the minimal value
      min = other.min;
   }
   if (other.max > max) { // calculate the maximum value
      max = other.max;
   }
   sum += other.sum; // calculate the summary value
   count += other.count; // calculate the count of this entry type
}

Solution Evaluation#

In a standalone deployment scenario, this self-developed architecture fully exploits the potential of modern hardware. Through deep optimization based on GraalVM Native Image and JDK 23 FFM API, it establishes a high-performance data analysis platform centered on direct memory-level operations and efficient bitwise computation, eliminating the need for distributed systems.

For example, leveraging an AMD 12-core 3GHz processor, the architecture maximizes hardware capabilities, performing billions of bitwise operations and hundreds of millions of arithmetic operations per second. It conducts precise analytical tasks directly on memory data, inherently avoiding the burdens of garbage collection (GC), extensive memory allocation, and deallocation typical of traditional Java applications. Nearly all resources are allocated to core computational tasks.

Key Performance Characteristics#

  • Bitwise Optimization: By replacing traditional string parsing and comparisons with high-performance byte- and bit-based operations, the solution significantly reduces CPU cycles. Memory access and computation are fully aligned, further enhancing execution efficiency. Even under extreme conditions, the platform efficiently handles 1–2TB of daily data analysis demands, keeping the latency for a single 15GB data analysis strictly within 30 seconds, as expected by operators. Multithreaded parallelism further reduces overall processing time.
  • Advantages over Distributed Frameworks: Unlike distributed frameworks such as Flink and Hadoop, this solution operates entirely on a single machine. It avoids additional hardware nodes and eliminates the overhead associated with network communication and task coordination. Local memory operation ensures ultra-low data access latency, bypassing inefficiencies from distributed storage systems like HDFS. Common bottlenecks in distributed systems, such as data sharding coordination, node recovery, and network delays, are entirely absent.
  • GraalVM Native Image and JDK 23 FFM API: GraalVM Native Image shifts runtime performance optimization to the compilation phase. With precise static analysis and compilation optimizations, it achieves execution efficiency akin to native code while removing traditional JVM overhead. Using the FFM API in JDK 23, memory operations are refined to the byte level, eliminating unnecessary intermediate layer calls, resulting in a more direct and efficient data processing path.
  • Performance and Cost Efficiency: This architecture leverages extreme hardware computation capabilities, including billions of bitwise operations and millions of arithmetic calculations, achieving a level of efficiency and stability beyond the reach of distributed systems. It outperforms distributed frameworks like Flink and Hadoop in terms of performance while entirely avoiding the high hardware and operational costs of distributed architectures. This makes it the optimal solution for single-machine data analysis.

Performance and Cost Efficiency#

This architecture leverages extreme hardware computation capabilities, including billions of bitwise operations and millions of arithmetic calculations, achieving a level of efficiency and stability beyond the reach of distributed systems. It outperforms distributed frameworks like Flink and Hadoop in terms of performance while entirely avoiding the high hardware and operational costs of distributed architectures. This makes it the optimal solution for single-machine data analysis.

References#

[1] Oracle. (2023). Foreign Function and Memory API. Oracle Java Platform, Standard Edition 21 Documentation. Retrieved from https://docs.oracle.com/en/java/javase/21/core/foreign-function-and-memory-api.html

[2] OpenJDK. (2022). JEP 424: Foreign Function & Memory API (Preview). OpenJDK Project. Retrieved from https://openjdk.org/jeps/424

[3] OpenJDK. (2021). JEP 412: Foreign Function & Memory API (Incubator). OpenJDK Project. Retrieved from https://openjdk.org/jeps/412

Real-Time Processing of Tens of Millions of Big Data
https://biu.kim/posts/open/unsafe_ffm_bigdata/
Author
Moritz Arena
Published at
2024-09-27