WHEN Concurrency and Parallelism

Concurrency Model

WHEN provides two distinct execution models for handling concurrent operations:

  1. Cooperative Concurrency: Multiple blocks share the main thread
  2. Parallel Execution: Blocks run in separate OS threads

Cooperative Concurrency

By default, all blocks run cooperatively in the main thread, with the interpreter managing execution scheduling.

Basic Cooperative Execution

counter = 0
background_work = 0

fo main_task():
    counter = counter + 1
    print(f"Main task: {counter}")
    sleep(0.5)

fo background():
    background_work = background_work + 1
    print(f"Background: {background_work}")
    sleep(0.8)

main:
    when counter == 0:
        main_task.start()
        background.start()

    # Both blocks share execution time
    when counter >= 10:
        main_task.stop()
        background.stop()
        exit()

Cooperative Scheduling

  • The interpreter executes one iteration of each running block per main loop
  • Blocks yield control after each iteration
  • Execution order is deterministic and predictable
  • No thread synchronization needed

Benefits of Cooperative Model

  • Predictable execution: No race conditions
  • Simple debugging: Single-threaded execution path
  • Low overhead: No thread creation/switching costs
  • Shared state: Direct access to all variables

Parallel Execution

Use the parallel keyword to run blocks in separate threads for true parallelism.

Parallel Forever Blocks

result = None
computation_done = False

parallel fo heavy_computation():
    # CPU-intensive work in separate thread
    result = fibonacci(35)
    computation_done = True
    break

fo ui_updates():
    # UI remains responsive
    print("Working...")
    sleep(1)

    when computation_done:
        print(f"Computation result: {result}")
        break

main:
    heavy_computation.start()
    ui_updates.start()

Parallel Declarative Blocks

progress = 0
batch_results = []

parallel de batch_processor(100):
    # Process items in parallel
    item_result = process_item(batch_processor.current_iteration)
    batch_results.append(item_result)
    progress = progress + 1

fo progress_monitor():
    print(f"Progress: {progress}/100")
    sleep(1)

    when progress >= 100:
        print("Batch processing complete!")
        break

main:
    batch_processor.start()
    progress_monitor.start()

Thread Safety

WHEN automatically handles thread synchronization for shared variables.

Safe Variable Access

shared_counter = 0
worker_count = 4

parallel fo worker():
    # Safe increment operation
    shared_counter = shared_counter + 1
    print(f"Worker incremented counter to {shared_counter}")
    sleep(1)

    when shared_counter >= 20:
        break

main:
    # Start multiple parallel workers
    i = 0
    when i < worker_count:
        worker.start()
        i = i + 1

Thread-Safe Collections

shared_queue = []
processing_complete = False

parallel fo producer():
    item_count = 0
    when item_count < 50:
        shared_queue.append(f"item_{item_count}")
        item_count = item_count + 1
        sleep(0.1)

    when item_count >= 50:
        processing_complete = True
        break

parallel fo consumer():
    when len(shared_queue) > 0:
        item = shared_queue.pop(0)
        print(f"Processing {item}")
        sleep(0.2)

    when processing_complete and len(shared_queue) == 0:
        break

Communication Patterns

Producer-Consumer

buffer = []
buffer_size = 10
producer_done = False

parallel fo producer():
    count = 0
    when count < 100 and len(buffer) < buffer_size:
        buffer.append(f"data_{count}")
        count = count + 1
        sleep(0.05)

    when count >= 100:
        producer_done = True
        break

parallel fo consumer():
    when len(buffer) > 0:
        item = buffer.pop(0)
        print(f"Consumed: {item}")
        sleep(0.1)

    when producer_done and len(buffer) == 0:
        print("All items consumed")
        break

Event-Driven Communication

events = []
event_handlers = {
    "user_input": [],
    "system_event": [],
    "error": []
}

parallel fo event_generator():
    # Generate events from external sources
    event = check_for_events()
    when event:
        events.append(event)
    sleep(0.01)

fo event_dispatcher():
    when len(events) > 0:
        event = events.pop(0)
        event_type = event.get("type", "unknown")

        when event_type in event_handlers:
            handle_event(event)

Synchronization Primitives

Barriers and Checkpoints

worker_ready = [False, False, False]
all_ready = False

parallel fo worker_1():
    # Do initialization work
    initialize_worker_1()
    worker_ready[0] = True

    # Wait for all workers
    when all_ready:
        start_main_work()

parallel fo worker_2():
    initialize_worker_2()
    worker_ready[1] = True

    when all_ready:
        start_main_work()

fo coordinator():
    when all(worker_ready):
        all_ready = True
        print("All workers ready, starting main work")

Conditional Synchronization

phase_1_complete = False
phase_2_complete = False
all_phases_done = False

parallel fo phase_1_worker():
    # Phase 1 work
    do_phase_1_work()
    phase_1_complete = True

parallel fo phase_2_worker():
    # Wait for phase 1
    when phase_1_complete:
        do_phase_2_work()
        phase_2_complete = True

fo coordinator():
    when phase_1_complete and phase_2_complete:
        all_phases_done = True
        print("All phases complete")

Performance Considerations

When to Use Parallel Blocks

  • CPU-intensive operations: Mathematical calculations, data processing
  • I/O-bound operations: File operations, network requests
  • Independent tasks: Operations that don't depend on shared state
  • Background services: Monitoring, logging, maintenance tasks

When to Use Cooperative Blocks

  • UI updates: Rendering, user interaction handling
  • Game loops: Frame updates, animation
  • State machines: Sequential logic, workflow management
  • Simple coordination: Orchestrating other blocks

Threading Overhead

# Good: Long-running parallel work
parallel fo data_cruncher():
    result = expensive_calculation()  # Takes seconds
    print(f"Result: {result}")

# Avoid: Short parallel operations
# parallel fo quick_task():
#     x = x + 1  # Too lightweight for threading overhead

Best Practices

1. Minimize Shared State

# Good: Minimal shared communication
results = []

parallel fo worker(worker_id):
    local_result = do_work(worker_id)
    results.append(local_result)

# Avoid: Heavy shared state manipulation
# shared_dict = {}
# parallel fo worker():
#     shared_dict[key] = complex_operation(shared_dict)

2. Use Appropriate Block Types

# CPU-bound: Use parallel DE blocks
parallel de cpu_intensive(num_items):
    process_item(cpu_intensive.current_iteration)

# Coordination: Use cooperative FO blocks
fo coordinator():
    manage_workflow()
    sleep(0.1)

3. Handle Graceful Shutdown

shutdown_requested = False

parallel fo long_running_task():
    when not shutdown_requested:
        do_chunk_of_work()
        sleep(0.1)

    when shutdown_requested:
        cleanup_resources()
        break

main:
    when user_requests_exit():
        shutdown_requested = True
        wait_for_cleanup()
        exit()

4. Error Handling in Parallel Contexts

error_occurred = False
error_message = ""

parallel fo risky_operation():
    result = potentially_failing_operation()

    when not result["success"]:
        error_occurred = True
        error_message = result["error"]
        break

fo error_handler():
    when error_occurred:
        print(f"Error in parallel operation: {error_message}")
        handle_error()
        error_occurred = False

Monitoring and Profiling

Performance Metrics

start_time = time.time()
operations_completed = 0
operations_per_second = 0

parallel fo performance_worker():
    do_operation()
    operations_completed = operations_completed + 1

fo performance_monitor():
    elapsed = time.time() - start_time
    when elapsed > 0:
        operations_per_second = operations_completed / elapsed
        print(f"Performance: {operations_per_second:.2f} ops/sec")
    sleep(1)

Resource Usage

import psutil

fo resource_monitor():
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent

    print(f"CPU: {cpu_percent}%, Memory: {memory_percent}%")

    when cpu_percent > 90:
        print("High CPU usage detected")

    sleep(5)