Documentation Index Fetch the complete documentation index at: https://mintlify.com/angr/angr/llms.txt
Use this file to discover all available pages before exploring further.
State plugins are modular components that attach to SimState to provide additional functionality. They’re used for tracking open files, managing heap state, implementing POSIX semantics, and storing custom analysis data.
Overview
Plugins are the primary extension mechanism for SimState. angr uses plugins internally for:
Memory and registers (state.memory, state.regs)
POSIX file system (state.posix)
Call stack tracking (state.callstack)
Execution history (state.history)
Breakpoints (state.inspect)
Solver interface (state.solver)
Plugin Architecture
Base Class
All plugins inherit from SimStatePlugin (angr/state_plugins/plugin.py:30):
from angr.state_plugins import SimStatePlugin
class SimStatePlugin :
"""Base class for state plugins"""
def __init__ ( self ):
self .state = None # Set when plugin is registered
def set_state ( self , state ):
"""Called when plugin is attached to a state"""
self .state = state._get_weakref()
def copy ( self , memo ):
"""Create a copy for state branching"""
raise NotImplementedError
def merge ( self , others , merge_conditions , common_ancestor = None ):
"""Merge with other plugin instances"""
raise NotImplementedError
def widen ( self , others ):
"""Widen for static analysis"""
raise NotImplementedError
Plugin Lifecycle
Plugin is attached to a state:
class MyPlugin ( SimStatePlugin ):
def __init__ ( self ):
super (). __init__ ()
self .data = {}
# Register on state
state.register_plugin( 'my_plugin' , MyPlugin())
# Access plugin
state.my_plugin.data[ 'key' ] = 'value'
When state branches, plugin is copied:
@SimStatePlugin.memo
def copy ( self , memo ):
# Create new instance
c = MyPlugin()
c.data = dict ( self .data) # Copy data
return c
When states merge, plugins merge:
def merge ( self , others , merge_conditions , common_ancestor = None ):
# Merge data from multiple plugin instances
for other in others:
self .data.update(other.data)
return True
Creating Custom Plugins
Basic Plugin
from angr.state_plugins import SimStatePlugin
import claripy
class TaintTracker ( SimStatePlugin ):
"""Track tainted memory locations"""
def __init__ ( self ):
super (). __init__ ()
self .tainted_addrs = set ()
self .taint_sources = {}
def mark_tainted ( self , addr , source ):
"""Mark an address as tainted"""
if self .state.solver.symbolic(addr):
# Handle symbolic address
for concrete_addr in self .state.solver.eval_upto(addr, 256 ):
self .tainted_addrs.add(concrete_addr)
self .taint_sources[concrete_addr] = source
else :
concrete_addr = self .state.solver.eval(addr)
self .tainted_addrs.add(concrete_addr)
self .taint_sources[concrete_addr] = source
def is_tainted ( self , addr ):
"""Check if address is tainted"""
if self .state.solver.symbolic(addr):
# Check if any solution is tainted
for concrete_addr in self .state.solver.eval_upto(addr, 256 ):
if concrete_addr in self .tainted_addrs:
return True
return False
return self .state.solver.eval(addr) in self .tainted_addrs
@SimStatePlugin.memo
def copy ( self , memo ):
c = TaintTracker()
c.tainted_addrs = set ( self .tainted_addrs)
c.taint_sources = dict ( self .taint_sources)
return c
def merge ( self , others , merge_conditions , common_ancestor = None ):
# Union of tainted addresses
for other in others:
self .tainted_addrs.update(other.tainted_addrs)
self .taint_sources.update(other.taint_sources)
return True
def widen ( self , others ):
return self .merge(others, None )
# Register as default plugin
from angr.sim_state import SimState
SimState.register_default( 'taint' , TaintTracker)
Using the Plugin
import angr
project = angr.Project( '/bin/ls' )
state = project.factory.entry_state()
# Plugin is automatically available
state.taint.mark_tainted( 0x 804a000 , 'stdin' )
if state.taint.is_tainted(state.regs.rax):
print ( "RAX is tainted!" )
Real-World Example: Allocation Tracker
from angr.state_plugins import SimStatePlugin
import claripy
class AllocationTracker ( SimStatePlugin ):
"""Track heap allocations and detect issues"""
def __init__ ( self ):
super (). __init__ ()
self .allocations = {} # addr -> allocation info
self .freed = set ()
self .allocation_counter = 0
def allocate ( self , addr , size , location ):
"""Record an allocation"""
addr_concrete = self .state.solver.eval(addr)
size_concrete = self .state.solver.eval(size)
self .allocations[addr_concrete] = {
'id' : self .allocation_counter,
'size' : size_concrete,
'location' : location, # Where allocated
'state' : self .state.copy() # Snapshot state
}
self .allocation_counter += 1
print ( f "[ALLOC { self .allocation_counter } ] "
f " { addr_concrete :#x} size= { size_concrete } at { location :#x} " )
def free ( self , addr , location ):
"""Record a free operation"""
addr_concrete = self .state.solver.eval(addr)
# Check for double-free
if addr_concrete in self .freed:
print ( f "[ERROR] Double-free at { location :#x} " )
print ( f " Address: { addr_concrete :#x} " )
return False
# Check if address was allocated
if addr_concrete not in self .allocations:
print ( f "[ERROR] Free of unallocated memory at { location :#x} " )
print ( f " Address: { addr_concrete :#x} " )
return False
# Mark as freed
self .freed.add(addr_concrete)
alloc_info = self .allocations[addr_concrete]
print ( f "[FREE { alloc_info[ 'id' ] } ] "
f " { addr_concrete :#x} allocated at { alloc_info[ 'location' ] :#x} " )
return True
def check_access ( self , addr , size , access_type ):
"""Check if memory access is valid"""
addr_concrete = self .state.solver.eval(addr)
size_concrete = self .state.solver.eval(size)
# Check each allocation
for alloc_addr, info in self .allocations.items():
if alloc_addr <= addr_concrete < alloc_addr + info[ 'size' ]:
# Access is within allocation
if addr_concrete + size_concrete > alloc_addr + info[ 'size' ]:
print ( f "[ERROR] Buffer overflow in allocation { info[ 'id' ] } " )
print ( f " Allocation: { alloc_addr :#x} size= { info[ 'size' ] } " )
print ( f " Access: { addr_concrete :#x} size= { size_concrete } " )
return False
# Check use-after-free
if alloc_addr in self .freed:
print ( f "[ERROR] Use-after-free of allocation { info[ 'id' ] } " )
return False
return True
print ( f "[ERROR] { access_type } of unallocated memory { addr_concrete :#x} " )
return False
def get_stats ( self ):
"""Get allocation statistics"""
return {
'total_allocations' : self .allocation_counter,
'active_allocations' : len ( self .allocations) - len ( self .freed),
'freed_allocations' : len ( self .freed),
'leaked_allocations' : len ( set ( self .allocations.keys()) - self .freed)
}
@SimStatePlugin.memo
def copy ( self , memo ):
c = AllocationTracker()
c.allocations = dict ( self .allocations)
c.freed = set ( self .freed)
c.allocation_counter = self .allocation_counter
return c
def merge ( self , others , merge_conditions , common_ancestor = None ):
# Conservative merge: intersection of allocations
if common_ancestor is not None :
self .allocations = dict (common_ancestor.allocations)
self .freed = set (common_ancestor.freed)
return True
def widen ( self , others ):
return self .merge(others, None )
SimState.register_default( 'alloc_tracker' , AllocationTracker)
Hooking malloc/free
import angr
from angr import SimProcedure
class TrackedMalloc ( SimProcedure ):
def run ( self , size ):
# Allocate memory
addr = self .state.heap.allocate(size)
# Track allocation
self .state.alloc_tracker.allocate(addr, size, self .state.addr)
return addr
class TrackedFree ( SimProcedure ):
def run ( self , ptr ):
# Track free
self .state.alloc_tracker.free(ptr, self .state.addr)
# Actually free
self .state.heap.free(ptr)
return 0
project.hook_symbol( 'malloc' , TrackedMalloc())
project.hook_symbol( 'free' , TrackedFree())
# Run analysis
simgr = project.factory.simulation_manager()
simgr.run()
# Get statistics
for state in simgr.deadended:
stats = state.alloc_tracker.get_stats()
print ( f "Allocation stats: { stats } " )
Built-in Plugins
POSIX Plugin
Provides POSIX file system and I/O (angr/state_plugins/posix.py:77):
# Access file descriptors
state.posix.fd[ 0 ] # stdin
state.posix.fd[ 1 ] # stdout
state.posix.fd[ 2 ] # stderr
# Open files
fd = state.posix.open( b '/path/to/file' , 'r' )
# Read/write
data = state.posix.read(fd, 100 )
state.posix.write( 1 , b 'output' , 6 )
# Get output
stdout = state.posix.dumps( 1 ) # Get stdout content
stderr = state.posix.dumps( 2 ) # Get stderr content
Inspect Plugin
Breakpoint and instrumentation support (angr/state_plugins/inspect.py:230):
# Add breakpoint on memory read
def on_read ( state ):
addr = state.inspect.mem_read_address
print ( f "Reading from { addr } " )
state.inspect.b(
'mem_read' ,
when = angr. BP_BEFORE ,
action = on_read
)
# Breakpoint on specific address
state.inspect.b(
'mem_write' ,
when = angr. BP_AFTER ,
mem_write_address = 0x 804a000 ,
action = lambda s : print ( "Wrote to target!" )
)
Available Breakpoint Events
event_types = {
'mem_read' , # Memory read
'mem_write' , # Memory write
'reg_read' , # Register read
'reg_write' , # Register write
'tmp_read' , # VEX temp read
'tmp_write' , # VEX temp write
'expr' , # Expression evaluation
'statement' , # VEX statement
'instruction' , # Instruction execution
'irsb' , # Basic block
'constraints' , # Constraint added
'call' , # Function call
'return' , # Function return
'simprocedure' , # SimProcedure execution
'syscall' , # System call
'exit' , # State exit
'fork' , # State fork
}
Callstack Plugin
Track function calls and returns:
# Access call stack
for frame in state.callstack:
print ( f "Function: { frame.func_addr :#x} " )
print ( f "Return to: { frame.ret_addr :#x} " )
print ( f "Stack pointer: { frame.stack_ptr :#x} " )
# Current function
current_func = state.callstack.current_function_address
# Call depth
depth = len (state.callstack)
History Plugin
Track execution history:
# Access history
for addr in state.history.bbl_addrs:
print ( f "Executed block: { addr :#x} " )
# Recent actions
for action in state.history.actions:
print ( f "Action: { action } " )
# Jump history
for jump in state.history.jump_sources:
print ( f "Jumped from: { jump :#x} " )
# Instruction count
total_insns = state.history.block_count
Advanced Plugin Patterns
Plugin with State Inspection
class InstrumentedPlugin ( SimStatePlugin ):
"""Plugin that monitors state operations"""
def __init__ ( self ):
super (). __init__ ()
self .mem_reads = []
self .mem_writes = []
def init_state ( self ):
"""Called when plugin is first added to state"""
# Add inspection breakpoints
self .state.inspect.b(
'mem_read' ,
when = angr. BP_AFTER ,
action = self ._on_read
)
self .state.inspect.b(
'mem_write' ,
when = angr. BP_AFTER ,
action = self ._on_write
)
def _on_read ( self , state ):
self .mem_reads.append({
'addr' : state.inspect.mem_read_address,
'expr' : state.inspect.mem_read_expr,
'location' : state.addr
})
def _on_write ( self , state ):
self .mem_writes.append({
'addr' : state.inspect.mem_write_address,
'expr' : state.inspect.mem_write_expr,
'location' : state.addr
})
@SimStatePlugin.memo
def copy ( self , memo ):
c = InstrumentedPlugin()
c.mem_reads = list ( self .mem_reads)
c.mem_writes = list ( self .mem_writes)
return c
def merge ( self , others , merge_conditions , common_ancestor = None ):
# Merge operation logs
for other in others:
self .mem_reads.extend(other.mem_reads)
self .mem_writes.extend(other.mem_writes)
return True
Plugin with Globals Storage
class PersistentPlugin ( SimStatePlugin ):
"""Plugin that persists data across states"""
def __init__ ( self ):
super (). __init__ ()
def store_global ( self , key , value ):
"""Store data that persists across copies"""
if not hasattr ( self .state.globals, 'persistent_data' ):
self .state.globals[ 'persistent_data' ] = {}
self .state.globals[ 'persistent_data' ][key] = value
def get_global ( self , key , default = None ):
"""Retrieve persistent data"""
persistent = self .state.globals.get( 'persistent_data' , {})
return persistent.get(key, default)
@SimStatePlugin.memo
def copy ( self , memo ):
# Don't copy globals - they're shared
return PersistentPlugin()
def merge ( self , others , merge_conditions , common_ancestor = None ):
return False # No merging needed
Best Practices
Use @SimStatePlugin.memo Decorator
Always use the memo decorator on copy() to handle circular references:
@SimStatePlugin.memo
def copy ( self , memo ):
c = MyPlugin()
c.data = copy.deepcopy( self .data, memo)
return c
Always check for symbolic values before evaluating:
def process_addr ( self , addr ):
if self .state.solver.symbolic(addr):
# Handle symbolic case
possible_addrs = self .state.solver.eval_upto(addr, 256 )
for concrete_addr in possible_addrs:
# Process each possibility
pass
else :
# Handle concrete case
concrete_addr = self .state.solver.eval(addr)
Use init_state() for Setup
Perform initialization in init_state(), not init ():
def init_state ( self ):
"""Called when plugin is added to state"""
# Now we have access to self.state
self .state.inspect.b( 'mem_read' , action = self .handler)
Implement Merge Carefully
Merge should produce a conservative overapproximation:
def merge ( self , others , merge_conditions , common_ancestor = None ):
# Union for over-approximation
self .tracked_data.update( * [o.tracked_data for o in others])
return True
Reference
Plugin Methods
Method Purpose Required __init__()Initialize plugin Yes copy(memo)Create copy for branching Yes merge(others, conditions, ancestor)Merge plugin instances Yes set_state(state)Attach to state No (default OK) init_state()Initialize with state access No widen(others)Widen for static analysis No
Common Built-in Plugins
Plugin Access Purpose memory state.memoryMemory operations registers state.regsRegister access posix state.posixPOSIX I/O and filesystem solver state.solverConstraint solving inspect state.inspectBreakpoints callstack state.callstackCall stack tracking history state.historyExecution history globals state.globalsGlobal storage libc state.libclibc state heap state.heapHeap allocator