Apache2
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
h2_bucket_beam.h File Reference

Go to the source code of this file.

Data Structures

struct  h2_blist
 
struct  h2_beam_lock
 
struct  h2_bproxy_list
 
struct  h2_bucket_beam
 

Macros

#define H2_BLIST_INIT(b)   APR_RING_INIT(&(b)->list, apr_bucket, link);
 
#define H2_BLIST_SENTINEL(b)   APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
 
#define H2_BLIST_EMPTY(b)   APR_RING_EMPTY(&(b)->list, apr_bucket, link)
 
#define H2_BLIST_FIRST(b)   APR_RING_FIRST(&(b)->list)
 
#define H2_BLIST_LAST(b)   APR_RING_LAST(&(b)->list)
 
#define H2_BLIST_INSERT_HEAD(b, e)
 
#define H2_BLIST_INSERT_TAIL(b, e)
 
#define H2_BLIST_CONCAT(a, b)
 
#define H2_BLIST_PREPEND(a, b)
 

Typedefs

typedef void h2_beam_mutex_leave (void *ctx, struct apr_thread_mutex_t *lock)
 
typedef struct h2_bucket_beam h2_bucket_beam
 
typedef apr_status_t h2_beam_mutex_enter (void *ctx, h2_beam_lock *pbl)
 
typedef void h2_beam_io_callback (void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 
typedef struct h2_beam_proxy h2_beam_proxy
 
typedef int h2_beam_can_beam_callback (void *ctx, h2_bucket_beam *beam, apr_file_t *file)
 
typedef apr_bucketh2_bucket_beamer (h2_bucket_beam *beam, apr_bucket_brigade *dest, const apr_bucket *src)
 

Enumerations

enum  h2_beam_owner_t { H2_BEAM_OWNER_SEND, H2_BEAM_OWNER_RECV }
 

Functions

apr_size_t h2_util_bl_print (char *buffer, apr_size_t bmax, const char *tag, const char *sep, h2_blist *bl)
 
int h2_beam_no_files (void *ctx, h2_bucket_beam *beam, apr_file_t *file)
 
apr_status_t h2_beam_create (h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, apr_size_t buffer_size)
 
apr_status_t h2_beam_destroy (h2_bucket_beam *beam)
 
apr_status_t h2_beam_send (h2_bucket_beam *beam, apr_bucket_brigade *bb, apr_read_type_e block)
 
apr_status_t h2_beam_receive (h2_bucket_beam *beam, apr_bucket_brigade *green_buckets, apr_read_type_e block, apr_off_t readbytes)
 
int h2_beam_empty (h2_bucket_beam *beam)
 
int h2_beam_holds_proxies (h2_bucket_beam *beam)
 
void h2_beam_abort (h2_bucket_beam *beam)
 
apr_status_t h2_beam_close (h2_bucket_beam *beam)
 
apr_status_t h2_beam_wait_empty (h2_bucket_beam *beam, apr_read_type_e block)
 
void h2_beam_mutex_set (h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, struct apr_thread_cond_t *cond, void *m_ctx)
 
void h2_beam_timeout_set (h2_bucket_beam *beam, apr_interval_time_t timeout)
 
apr_interval_time_t h2_beam_timeout_get (h2_bucket_beam *beam)
 
void h2_beam_buffer_size_set (h2_bucket_beam *beam, apr_size_t buffer_size)
 
apr_size_t h2_beam_buffer_size_get (h2_bucket_beam *beam)
 
void h2_beam_on_consumed (h2_bucket_beam *beam, h2_beam_io_callback *cb, void *ctx)
 
void h2_beam_on_produced (h2_bucket_beam *beam, h2_beam_io_callback *cb, void *ctx)
 
void h2_beam_on_file_beam (h2_bucket_beam *beam, h2_beam_can_beam_callback *cb, void *ctx)
 
apr_off_t h2_beam_get_buffered (h2_bucket_beam *beam)
 
apr_off_t h2_beam_get_mem_used (h2_bucket_beam *beam)
 
int h2_beam_was_received (h2_bucket_beam *beam)
 
apr_size_t h2_beam_get_files_beamed (h2_bucket_beam *beam)
 
void h2_register_bucket_beamer (h2_bucket_beamer *beamer)
 

Macro Definition Documentation

#define H2_BLIST_CONCAT (   a,
 
)
Value:
do { \
APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
Definition: apr_buckets.h:224
#define APR_RING_CONCAT(h1, h2, elem, link)
Definition: apr_ring.h:332
#define H2_BLIST_EMPTY (   b)    APR_RING_EMPTY(&(b)->list, apr_bucket, link)
#define H2_BLIST_FIRST (   b)    APR_RING_FIRST(&(b)->list)
#define H2_BLIST_INIT (   b)    APR_RING_INIT(&(b)->list, apr_bucket, link);
#define H2_BLIST_INSERT_HEAD (   b,
 
)
Value:
do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define APR_RING_INSERT_HEAD(hp, nep, elem, link)
Definition: apr_ring.h:311
Definition: apr_buckets.h:224
struct apr_bucket apr_bucket
Definition: apr_buckets.h:121
#define H2_BLIST_INSERT_TAIL (   b,
 
)
Value:
do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define APR_RING_INSERT_TAIL(hp, nep, elem, link)
Definition: apr_ring.h:322
Definition: apr_buckets.h:224
struct apr_bucket apr_bucket
Definition: apr_buckets.h:121
#define H2_BLIST_LAST (   b)    APR_RING_LAST(&(b)->list)
#define H2_BLIST_PREPEND (   a,
 
)
Value:
do { \
APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
Definition: apr_buckets.h:224
#define APR_RING_PREPEND(h1, h2, elem, link)
Definition: apr_ring.h:348
#define H2_BLIST_SENTINEL (   b)    APR_RING_SENTINEL(&(b)->list, apr_bucket, link)

Typedef Documentation

typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl)
typedef void h2_beam_mutex_leave(void *ctx, struct apr_thread_mutex_t *lock)

A h2_bucket_beam solves the task of transferring buckets, esp. their data, across threads with zero buffer copies.

When a thread, let's call it the red thread, wants to send buckets to another, the green thread, it creates a h2_bucket_beam and adds buckets via the h2_beam_send(). It gives the beam to the green thread which then can receive buckets into its own brigade via h2_beam_receive().

Sending and receiving can happen concurrently, if a thread mutex is set for the beam, see h2_beam_mutex_set.

The beam can limit the amount of data it accepts via the buffer_size. This can also be adjusted during its lifetime. When the beam not only gets a mutex but als a condition variable (in h2_beam_mutex_set()), sends and receives can be done blocking. A timeout can be set for such blocks.

Care needs to be taken when terminating the beam. The beam registers at the pool it was created with and will cleanup after itself. However, if received buckets do still exist, already freed memory might be accessed. The beam does a assertion on this condition.

The proper way of shutting down a beam is to first make sure there are no more green buckets out there, then cleanup the beam to purge eventually still existing red buckets and then, possibly, terminate the beam itself (or the pool it was created with).

The following restrictions apply to bucket transport:

  • only EOS and FLUSH meta buckets are copied through. All other meta buckets are kept in the beams hold.
  • all kind of data buckets are transported through:
    • transient buckets are converted to heap ones on send
    • heap and pool buckets require no extra handling
    • buckets with indeterminate length are read on send
    • file buckets will transfer the file itself into a new bucket, if allowed
    • all other buckets are read on send to make sure data is present

This assures that when the red thread sends its red buckets, the data is made accessible while still on the red side. The red bucket then enters the beams hold storage. When the green thread calls receive, red buckets in the hold are wrapped into special beam buckets. Beam buckets on read present the data directly from the internal red one, but otherwise live on the green side. When a beam bucket gets destroyed, it notifies its beam that the corresponding red bucket from the hold may be destroyed. Since the destruction of green buckets happens in the green thread, any corresponding red bucket can not immediately be destroyed, as that would result in race conditions. Instead, the beam transfers such red buckets from the hold to the purge storage. Next time there is a call from the red side, the buckets in purge will be deleted.

There are callbacks that can be registered with a beam:

  • a "consumed" callback that gets called on the red side with the amount of data that has been received by the green side. The amount is a delta from the last callback invocation. The red side can trigger these callbacks by calling h2_beam_send() with a NULL brigade.
  • a "can_beam_file" callback that can prohibit the transfer of file handles through the beam. This will cause file buckets to be read on send and its data buffer will then be transports just like a heap bucket would. When no callback is registered, no restrictions apply and all files are passed through. File handles transferred to the green side will stay there until the receiving brigade's pool is destroyed/cleared. If the pool lives very long or if many different files are beamed, the process might run out of available file handles.

The name "beam" of course is inspired by good old transporter technology where humans are kept inside the transporter's memory buffers until the transmission is complete. Star gates use a similar trick.

typedef struct h2_beam_proxy h2_beam_proxy
typedef apr_bucket* h2_bucket_beamer(h2_bucket_beam *beam, apr_bucket_brigade *dest, const apr_bucket *src)

Enumeration Type Documentation

Enumerator
H2_BEAM_OWNER_SEND 
H2_BEAM_OWNER_RECV 

Function Documentation

void h2_beam_abort ( h2_bucket_beam beam)

Abort the beam. Will cleanup any buffered buckets and answer all send and receives with APR_ECONNABORTED.

Call from the sender side only.

apr_size_t h2_beam_buffer_size_get ( h2_bucket_beam beam)
void h2_beam_buffer_size_set ( h2_bucket_beam beam,
apr_size_t  buffer_size 
)

Set/get the maximum buffer size for beam data (memory footprint).

apr_status_t h2_beam_close ( h2_bucket_beam beam)

Close the beam. Sending an EOS bucket serves the same purpose.

Call from the sender side only.

apr_status_t h2_beam_create ( h2_bucket_beam **  pbeam,
apr_pool_t pool,
int  id,
const char *  tag,
h2_beam_owner_t  owner,
apr_size_t  buffer_size 
)

Creates a new bucket beam for transfer of buckets across threads.

The pool the beam is created with will be protected by the given mutex and will be used in multiple threads. It needs a pool allocator that is only used inside that same mutex.

Parameters
pbeamwill hold the created beam on return
poolpool owning the beam, beam will cleanup when pool released
ididentifier of the beam
tagtag identifying beam for logging
ownerif the beam is owned by the sender or receiver, e.g. if the pool owner is using this beam for sending or receiving
buffer_sizemaximum memory footprint of buckets buffered in beam, or 0 for no limitation
apr_status_t h2_beam_destroy ( h2_bucket_beam beam)

Destroys the beam immediately without cleanup.

int h2_beam_empty ( h2_bucket_beam beam)

Determine if beam is empty.

apr_off_t h2_beam_get_buffered ( h2_bucket_beam beam)

Get the amount of bytes currently buffered in the beam (unread).

apr_size_t h2_beam_get_files_beamed ( h2_bucket_beam beam)
apr_off_t h2_beam_get_mem_used ( h2_bucket_beam beam)

Get the memory used by the buffered buckets, approximately.

int h2_beam_holds_proxies ( h2_bucket_beam beam)

Determine if beam has handed out proxy buckets that are not destroyed.

void h2_beam_mutex_set ( h2_bucket_beam beam,
h2_beam_mutex_enter  m_enter,
struct apr_thread_cond_t cond,
void m_ctx 
)
int h2_beam_no_files ( void ctx,
h2_bucket_beam beam,
apr_file_t file 
)

Will deny all transfer of apr_file_t across the beam and force a data copy instead.

void h2_beam_on_consumed ( h2_bucket_beam beam,
h2_beam_io_callback cb,
void ctx 
)

Register a callback to be invoked on the sender side with the amount of bytes that have been consumed by the receiver, since the last callback invocation or reset.

Parameters
beamthe beam to set the callback on
cbthe callback or NULL
ctxthe context to use in callback invocation

Call from the sender side, callbacks invoked on sender side.

void h2_beam_on_file_beam ( h2_bucket_beam beam,
h2_beam_can_beam_callback cb,
void ctx 
)
void h2_beam_on_produced ( h2_bucket_beam beam,
h2_beam_io_callback cb,
void ctx 
)

Register a callback to be invoked on the receiver side with the amount of bytes that have been produces by the sender, since the last callback invocation or reset.

Parameters
beamthe beam to set the callback on
cbthe callback or NULL
ctxthe context to use in callback invocation

Call from the receiver side, callbacks invoked on receiver side.

apr_status_t h2_beam_receive ( h2_bucket_beam beam,
apr_bucket_brigade green_buckets,
apr_read_type_e  block,
apr_off_t  readbytes 
)

Receive buckets from the beam into the given brigade. Will return APR_EOF when reading past an EOS bucket. Reads can be blocking until data is available or the beam has been closed. Non-blocking calls return APR_EAGAIN if no data is available.

Call from the receiver side only.

apr_status_t h2_beam_send ( h2_bucket_beam beam,
apr_bucket_brigade bb,
apr_read_type_e  block 
)

Send buckets from the given brigade through the beam. Will hold buckets internally as long as they have not been processed by the receiving side. All accepted buckets are removed from the given brigade. Will return with APR_EAGAIN on non-blocking sends when not all buckets could be accepted.

Call from the sender side only.

apr_interval_time_t h2_beam_timeout_get ( h2_bucket_beam beam)
void h2_beam_timeout_set ( h2_bucket_beam beam,
apr_interval_time_t  timeout 
)

Set/get the timeout for blocking read/write operations. Only works if a mutex has been set for the beam.

apr_status_t h2_beam_wait_empty ( h2_bucket_beam beam,
apr_read_type_e  block 
)

Return APR_SUCCESS when all buckets in transit have been handled. When called with APR_BLOCK_READ and a mutex set, will wait until the green side has consumed all data. Otherwise APR_EAGAIN is returned. With clear_buffers set, any queued data is discarded. If a timeout is set on the beam, waiting might also time out and return APR_ETIMEUP.

Call from the sender side only.

int h2_beam_was_received ( h2_bucket_beam beam)

Return != 0 iff (some) data from the beam has been received.

void h2_register_bucket_beamer ( h2_bucket_beamer beamer)
apr_size_t h2_util_bl_print ( char *  buffer,
apr_size_t  bmax,
const char *  tag,
const char *  sep,
h2_blist bl 
)

Print the buckets in the list into the buffer (type and lengths).

Parameters
bufferthe buffer to print into
bmaxmax number of characters to place in buffer, incl. trailing 0
tagtag string for this bucket list
sepseparator to use
blthe bucket list to print
Returns
number of characters printed