Scaling aggregates with dataflow: A case study in real-time aggregation

|
Share

Modern file storage systems host billions of files and serve millions of users, which creates a constant challenge: maintaining real-time accuracy of folder size and items count while handling massive write volumes from file uploads, updates, moves, copies, and deletions.

At Box, we moved beyond standard streaming patterns, which struggled with our hierarchical data and mixed traffic loads, to explore the custom windowing strategies — specifically a hybrid session-gap approach — that finally solved the problem.

This article details the Box journey optimizing a real-time aggregation pipeline for this scale.

We’ll cover the legacy system’s bottlenecks, our iterative Dataflow solutions; a detailed breakdown of the results across performance, cost, and user experience metrics; and the specific engineering lessons learned along the way.

Key takeaways:

  • File activity at Box’s scale creates a major aggregation challenge because every upload, move, copy, or deletion can trigger updates across many parent folders
  • A simple windowing model was not enough, so the team designed a hybrid approach that balanced database efficiency with fast, user-visible updates
  • The final Dataflow-based solution significantly reduced write traffic and infrastructure cost while also improving the consistency and responsiveness of folder-size updates

Core operations and their impact

01

At scale, file storage systems must handle diverse operations, each with cascading effects across the folder hierarchy.

Fan-out in filesystem hierarchy contexts refers to the cascading effect where an operation on a file or folder triggers updates at each level of its ancestor folders. For instance, when a file is uploaded to /folder1/folder2/folder3, not only does folder3 get updated, but so do folder2 and folder1. This means attributes like size and file count are incremented for all ancestor folders. Similarly, operations such as moving, copying, or deleting files propagate changes up through their respective hierarchies.

File uploads: At peak, Box receives more than 4,000 file uploads per second. Each upload triggers several updates:

  • Size increment: Increases the size of all ancestor folders along the file’s path
  • File count increment: Increases the file count in each ancestor folder
  • Timestamp update: Updates the timestamps for all ancestor folders
  • User ID update: Updates the last modification entry with the user ID for all ancestor folders

Version updates: A smaller volume of files receive new versions. While they adjust sizes and update timestamps, they don’t change file counts.

File moves: Approximately 200 files are moved between folders every second. Moves are more complex, affecting both source and destination paths:

  • Size adjustments: Both source and destination folders experience size changes
  • Count adjustments: Files are added to destination folders and removed from source folders
  • Timestamp updates: All affected folders’ timestamps are updated

Deletions: Like uploads, deletions impact both size and file count.

Restores: Similar to deletions, restores affect both size and file count.

Copies: Copying files resembles moving files but with an essential difference:

  • Original files remain: The original file persists in the source location

Legacy system

02

The legacy system suffered from several critical limitations:

  • Frequent database replication and availability issues: Heavy write operations coupled with folder fan-outs overwhelmed the database
  • Ineffective pre-aggregation: Memcached-based pre-aggregation, intended to serialize updates to the same folder, failed to effectively reduce database load
  • Rate limiting: Due to ineffective pre-aggregation of database updates, a rate limiting mechanism was introduced to prevent database overload and protect against hotspotting issues
  • Job failures: Concurrent uploads from the same user or to the same folder triggered rate limiting, leading to job retries and delays ranging from minutes to days; job failures due to token expiration added further strain to the system

The need for optimization

To address these issues and achieve a scalable, reliable real-time aggregation pipeline, we defined the following success criteria:

  • Aggregation window: Folder size updates should reflect changes within five minutes, ensuring a near-real-time user experience. Non-busy folders (not receiving a constant stream of updates) should reflect changes with sub-minute latency
  • Reliability of the real-time pipeline: 99% accuracy for live updates and 99.9% accuracy for 24-hour summaries are critical
  • Database load: Significant reduction in write traffic to prevent database overload

Given our team’s experience building low-scale Dataflow jobs, we decided to explore Dataflow at a much higher scale.

Dataflow introduction

Dataflow is built on the Apache Beam programming model, which provides a unified approach for defining both batch and streaming pipelines. With Dataflow, the focus shifts to the logical composition of data processing jobs, while leaving the low-level details of distributed processing to the platform.

At its core, a Dataflow pipeline represents a series of computations, encompassing reading input data, transforming it, and writing output data. These pipelines utilize PCollections, which are flexible datasets that can be either bounded (fixed size) or unbounded (continuously updating). Transforms within these pipelines perform various operations, such as filtering, mapping, grouping by key, and aggregating elements, enabling real-time insights without manual orchestration of parallel processes.

Event time vs processing time

Every event has a timestamp associated with its message, usually tied to publish time. In this article, we’ll call this event time.

Alternatively, you can use processing time, where every event is output with the current timestamp. This can be accomplished using: out.outputWithTimestamp(update, Instant.now()).

Consider a scenario with a backlog of Pub/Sub messages:

  • Event 1, published at 15:30
  • Event 2, published at 15:40
  • Event 3, published at 15:50

If we window events into five-minute windows based on event time, the following windows are created:

  • From 15:30 to 15:35 with one event
  • From 15:40 to 15:45 with one event
  • From 15:45 to 15:50 with one event

Using processing time, events are re-emitted as:

  • Event 1, published at 15:30, processing time 16:33:01 (current time)
  • Event 2, published at 15:40, processing time 16:33:03 (current time)
  • Event 3, published at 15:50, processing time 16:33:07 (current time)

If we window events based on processing time, only a single window is created.

We opted to process events based on processing time to mitigate potential spikes from the backlog of messages after job restarts. Since our aggregation requirements did not depend on time windowing (e.g., “how many uploads happened between time X and Y”), but rather on maintaining up-to-date folder aggregates, using processing time was a suitable and straightforward choice.

Watermark

Per the Basics of the Beam model, a watermark is a guess as to when all data in a certain window is expected to have arrived. This is needed because data isn’t always guaranteed to arrive in a pipeline in time order or at predictable intervals.

Custom WindowFn and MergeCandidate

In the following sections of this article, we will be modifying the standard windowing mechanism and creating our own.

To create a custom windowing mechanism with WindowFn, extend WindowFn (or a more specific subclass) and implement the required abstract methods: assignWindows() to determine which windows each element belongs to based on its timestamp, and mergeWindows() to define how overlapping windows should be combined.

The window merging mechanism is based on the MergeOverlappingIntervalWindows.MergeCandidate, later referred to as MergeCandidate, which allows us to control how a window is expanded or merged.

Dataflow-based solutions: A step-by-step approach

Architecture overview

03

To reuse existing components as much as possible, the Dataflow job was introduced as an aggregation step between Pub/Sub topics.

Job overview

The job consists of four logical components:

  • Parse and fan-out: Read events in the original schema into the job and use the fan-out mechanism to propagate updates for each folder in the tree
  • Window and key by folder ID: Assign each update to the correct time window and folder’s bucket
  • Combine by folder ID: Combine multiple updates on a single folder level into one update
  • Convert to final update: Convert internal representation into the original schema

Here’s an example of this flow, illustrating a simplified job processing using five-minute fixed windows:

04

We experimented with several windowing and triggering strategies in Dataflow. Here are the key lessons we learned from each approach.

Windowing strategy: Session with an early trigger

Our initial approach aimed to utilize:

  • Session windows with a five-minute gap duration: Sessions.withGapDuration(Duration.standardMinutes(5))
  • Triggered early after five minutes of processing time (time as it passes while your pipeline is executing) by: AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))

While this approach initially showed promise, we realized that standard Sessions windows were ill-suited for our needs because they lack a maximum duration cap. By design, session windows extend indefinitely as long as new data arrives within the gap duration, which means active folders could delay aggregation well beyond our five-minute target.

05

To address this limitation, we decided to explore a watermark-based mechanism to gain more control over the windowing details and ensure predictable aggregation.

Windowing strategy: fixed-length session window

We decided to leverage the watermark mechanism to track session length. This eliminated the need for the AfterProcessingTime triggering mechanism, simplifying triggering to: Repeatedly.forever(AfterWatermark.pastEndOfWindow())

FixedSessionWindowFn:

The FixedSessionWindowFn class assigns each element to its own window of fixed duration, starting at the element’s event time. If multiple elements arrive close enough together (within the maximum session duration), their windows will be merged, similar to session windows but with a strict upper bound on window length. This ensures that no window can extend beyond the configured maximum session duration, regardless of how many elements arrive.

public class FixedSessionWindowFn extends WindowFn<Object, IntervalWindow> { 
  @Override 
  public Collection<IntervalWindow> assignWindows(AssignContext c) { 
    // Create a window for each element, overlapping window within max session duration will be merged. 
    // where IntervalWindow constructor parameters are: 
    //  - Instant start 
    //  - ReadableDuration size 
    return Arrays.asList(new IntervalWindow(c.timestamp(), maxSessionDuration)); 
  } 

  @Override 
  public void mergeWindows(MergeContext c) throws Exception { 
    FixedSessionWindowMerge.mergeWindows(c, maxSessionDuration); 
  } 
} 

FixedSessionWindowMerge.MergeCandidate:

A key aspect of the solution is the modified merge strategy, which prevents windows from spanning beyond their fixed duration. This ensures that data is accurately grouped within the defined time boundaries.

private static class MergeCandidate { 
  public boolean intersects(IntervalWindow window) { 
    return currentWindow == null || currentWindow.intersects(window); 
  } 

 
  public void add(IntervalWindow window) { 
    if (currentWindow == null) { 
      currentWindow = window; 
    } 
    // Do not span current window, since we have fixed session duration. As IntervalWindow is an indicator of watermark 
    // for the df job, it is important not to span (expand start/end time) IntervalWindow when merging windows. 
    parts.add(window); 
  } 
} 

The results demonstrate the effectiveness of the fixed-length session window approach. The windowing mechanism operates as expected, accurately grouping data within the specified time frames. The implementation also leads to a reduction in database load, providing significant benefits for performance and resource optimization.

06

The fixed time-window approach worked well for automation-based events, efficiently aggregating many rapid updates (such as autosaving a document every 30 seconds) into a single database write. However, for typical users who made sparse updates and expected folder sizes to update within a sub-minute interval, this strategy led to a poor user experience due to noticeable delays in reflecting changes.

Windowing strategy: Fixed length session window with max gap

This final solution incorporated a maximum gap parameter into the fixed-length session windows. This means a window can be triggered by either of:

  • Maximum session length: the time since the arrival of the first element, after which the session window will be terminated
  • Maximum gap: the time gap in seconds after which two events corresponding to the same key will be considered different sessions

This balanced user experience with database performance, allowing for more frequent updates while preventing excessive load.

SessionWindowWithMaxDuration:

Each element is initially assigned to its own window, starting at the event’s timestamp and extending for the maximum allowed gap duration. When multiple elements for the same key arrive close together (within the configured gap), their windows are merged into a single session. However, this merged session is never allowed to exceed the maximum session duration.

public class SessionWindowWithMaxDuration extends WindowFn<Object, IntervalWindow> { 
  @Override 
  public Collection<IntervalWindow> assignWindows(AssignContext c) { 
    // Create a window for each element, overlapping window within max gap duration and session 
    // length will be merged. 
    return Arrays.asList(new IntervalWindow(c.timestamp(), maxGapDuration)); 
  } 

  @Override 
  public void mergeWindows(MergeContext c) throws Exception { 
    SessionWindowWithMaxDurationMerge.mergeWindows(c, maxSessionDuration); 
  } 
} 

SessionWindowWithMaxDurationMerge:

The two main methods are intersects, which checks if a new window overlaps with the current session window, and add, which merges a new window into the session. If there is no current window, add simply sets it. Otherwise, it expands the session window to include the new window, but only up to the maximum allowed session duration. The new window is then added to the session parts.

private static class MergeCandidate { 
  public boolean intersects(IntervalWindow window) { 
    if (currentWindow == null || currentWindow.intersects(window)) { 
      return true; 
    } 
    return false; 
  } 
   
  public void add(IntervalWindow window) { 
    if (currentWindow == null) { 
      currentWindow = window; 
    } else { 
      // Expand the window 
      Instant currentSessionStart = currentWindow.start(); 
      Instant currentSessionMaxEnd = currentSessionStart.plus(maxSessionDuration); 
      Instant otherSessionEnd = window.end(); 
      currentWindow = 
          new IntervalWindow( 
              new Instant(currentSessionStart), 
              new Instant( 
                  Math.min(currentSessionMaxEnd.getMillis(), otherSessionEnd.getMillis()))); 
    } 
    parts.add(window); 
  } 
} 

Results

The final solution achieved expected results across multiple dimensions:

Performance metrics:

  • Database write traffic reduction: A significant reduction in database write traffic, from 24k/s to 6-8k/s during peak times (~67% reduction)
  • System reliability: Achieved target accuracy for live updates (accuracy measurement was performed separately and is not detailed in this article)

Cost impact:

  • Infrastructure savings: ~$300k per year in reduced database infrastructure costs
  • Operational efficiency: Reduction in database update failures and retries, significantly reducing operational overhead

User experience:

  • Improved responsiveness: Real-time updates became more efficient, reflecting changes for sessions with a maximum length of five minutes or within a 35-second gap, whichever occurs first
  • Reduced inconsistencies: Eliminated the majority of folder-size discrepancies that users previously experienced during high-traffic periods

Key learnings

This optimization process highlights several specific insights for engineers implementing similar aggregation solutions:

Windowing strategy

  • Hybrid approaches for mixed workloads: Systems with both bursty automation and sparse human activity benefit from hybrid windowing strategies (like our max session duration + gap timeout) rather than one-size-fits-all approaches
  • Processing time over event time: For aggregation use cases that don’t require historical time-based analytics, processing time simplifies implementation and reduces complexity around late-arriving data
  • Custom windowing functions: Don’t hesitate to implement custom windowing logic when standard patterns don’t fit your use case; the flexibility often justifies the additional complexity

Architecture and operations

  • Monitor fan-out patterns: In hierarchical systems, understanding fan-out is crucial for capacity planning. A single user action can translate into orders-of-magnitude more database writes (e.g., 4,000 uploads resulting in ~24,000 writes)
  • Decouple aggregation from transactional writes: Using Pub/Sub as an intermediary layer allowed us to optimize aggregation independently of the core file operations
  • Embrace eventual consistency: For non-critical aggregates, accepting short-term inconsistency in exchange for system stability and cost reduction is often the right trade-off

Conclusion

Through a multi-step optimization process, incorporating Dataflow and effective windowing strategies, we transformed a failing legacy aggregation system into a robust, cost-effective solution that reduced database load by 67% while improving user experience.

The $300k annual savings and improved reliability demonstrated that strategic application of streaming aggregation can deliver significant business value. However, the process underscores that these optimizations require patience, measurement, and willingness to adapt based on real-world performance data.

Most importantly, this case study illustrates that cost reduction and user experience improvements are not mutually exclusive; with the right architectural approach, you can achieve both.

Additional resources