Skip to content

Instantly share code, notes, and snippets.

@dvdhrm
Created October 22, 2016 15:08
Show Gist options
  • Save dvdhrm/2333f776b2bb39456e88141f72ffb303 to your computer and use it in GitHub Desktop.
Save dvdhrm/2333f776b2bb39456e88141f72ffb303 to your computer and use it in GitHub Desktop.
bus1 transactions
#ifndef __BUS1_TX_H
#define __BUS1_TX_H
/*
* Copyright (C) 2013-2016 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation; either version 2.1 of the License, or (at
* your option) any later version.
*/
/**
* DOC: Transactions
*
* XXX
*/
#include <linux/err.h>
#include <linux/kernel.h>
#include <linux/kref.h>
#include "peer.h"
#include "util/active.h"
#include "util/queue.h"
enum bus1_tx_bits {
BUS1_TX_BIT_SEALED,
BUS1_TX_BIT_SYNCING,
};
struct bus1_tx {
struct bus1_peer *origin;
struct bus1_queue_node *todo;
struct bus1_queue_node *sync;
struct bus1_queue_node *async;
unsigned long flags;
u64 sync_ts;
u64 async_ts;
};
/**
* bus1_tx_init() - XXX
*/
static inline void bus1_tx_init(struct bus1_tx *tx, struct bus1_peer *origin)
{
tx->origin = origin;
tx->todo = NULL;
tx->sync = NULL;
tx->async = NULL;
tx->flags = 0;
tx->sync_ts = 0;
tx->async_ts = 0;
}
/**
* bus1_tx_deinit() - XXX
*/
static inline void bus1_tx_deinit(struct bus1_tx *tx)
{
WARN_ON(tx->todo);
WARN_ON(tx->sync);
WARN_ON(tx->async);
}
/* internal helper */
static inline void bus1_tx_push(struct bus1_tx *tx,
struct bus1_queue_node **list,
struct bus1_queue_node *qnode)
{
/*
* Push @qnode onto one of the lists in @tx (specified as @list). Note
* that each list has different locking/ordering requirements, so use
* one of the wrappers around this to access each list.
*
* Whenever something is pushed on a list, we make sure it has the tx
* set as group. Furthermore, we tell lockdep that its peer was
* released. This is required to allow holding hundreds of peers in a
* multicast without exceeding the lockdep limits of allowed locks held
* in parallel.
* Note that pushing a qnode on a list consumes the qnode together with
* its set owner. The caller must not access it, except by popping it
* from the list or using one of the internal list-iterators. In other
* words, we say that a caller must be aware of lockdep limitations
* whenever they hold an unlimited number of peers. However, if they
* make sure they only ever hold a fixed number, but use transaction
* lists to stash them, the transaction lists make sure to properly
* avoid lockdep limitations.
*/
WARN_ON(qnode->group || qnode->next || qnode == *list);
qnode->group = tx;
qnode->next = *list;
*list = qnode;
if (qnode->owner)
bus1_active_lockdep_released(&qnode->owner->active);
}
/* internal helper */
static inline struct bus1_queue_node *
bus1_tx_pop(struct bus1_tx *tx, struct bus1_queue_node **list)
{
struct bus1_queue_node *qnode = *list;
/*
* This pops the first entry off a list on a transaction. Different
* lists have different locking requirements, so use one of the
* wrappers, rather than this internal helper.
*
* Note that we need to tell lockdep about the acquired peer when
* returning the qnode. See bus1_tx_push() for details.
*/
if (qnode) {
*list = qnode->next;
qnode->next = NULL;
if (qnode->owner)
bus1_active_lockdep_acquired(&qnode->owner->active);
}
return qnode;
}
/**
* bus1_tx_push_todo() - XXX
*/
static inline void bus1_tx_push_todo(struct bus1_tx *tx,
struct bus1_queue_node *qnode)
{
bus1_tx_push(tx, &tx->todo, qnode);
}
/**
* bus1_tx_pop_todo() - XXX
*/
static inline struct bus1_queue_node *bus1_tx_pop_todo(struct bus1_tx *tx)
{
return bus1_tx_pop(tx, &tx->todo);
}
#endif /* __BUS1_TX_H */
/*
* Copyright (C) 2013-2016 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation; either version 2.1 of the License, or (at
* your option) any later version.
*/
#include <linux/err.h>
#include <linux/kernel.h>
#include <linux/slab.h>
#include "node.h"
#include "peer.h"
#include "util.h"
#include "util/active.h"
#include "util/queue.h"
/*
* This starts an iterator for a singly-linked list with head-elements given as
* @list. @iter is filled with the first element, and its *acquired* peer is
* returned. You *must* call bus1_tx_next() on @iter, otherwise you will run
* into lockdep-ref-leaks. IOW: don't bail out of your loop with 'break'.
*
* It is supposed to be used like this:
*
* for (peer = bus1_tx_first(tx, &tx->foo, &qnode);
* qnode;
* peer = bus1_tx_next(tx, &qnode))
* bar();
*/
static struct bus1_peer *bus1_tx_first(struct bus1_tx *tx,
struct bus1_queue_node *list,
struct bus1_queue_node **iter)
{
if ((*iter = list)) {
if (!list->owner)
return tx->origin;
bus1_active_lockdep_acquired(&list->owner->active);
return list->owner;
}
return NULL;
}
/*
* This continues an iteration of a singly-linked list started via
* bus1_tx_first(). It returns the same information (see it for details).
*/
static struct bus1_peer *bus1_tx_next(struct bus1_tx *tx,
struct bus1_queue_node **iter)
{
struct bus1_queue_node *qnode = *iter;
if (qnode->owner)
bus1_active_lockdep_released(&qnode->owner->active);
return bus1_tx_first(tx, qnode->next, iter);
}
/**
* bus1_tx_stage_async() - XXX
*/
bool bus1_tx_stage_async(struct bus1_tx *tx, struct bus1_queue_node *qnode)
{
WARN_ON(bus1_queue_node_is_queued(qnode));
lockdep_assert_held(&tx->origin->data.lock);
lockdep_assert_held(&(qnode->owner ?: tx->origin)->data.lock);
if (test_bit(BUS1_TX_BIT_SEALED, &tx->flags))
return false;
bus1_tx_push(tx, &tx->async, qnode);
tx->async_ts = bus1_queue_stage(&qnode->owner->data.queue,
&qnode->owner->waitq,
qnode, tx->async_ts);
return true;
}
/**
* bus1_tx_stage_sync() - XXX
*/
void bus1_tx_stage_sync(struct bus1_tx *tx,
struct bus1_queue_node *qnode)
{
WARN_ON(test_bit(BUS1_TX_BIT_SEALED, &tx->flags));
WARN_ON(bus1_queue_node_is_queued(qnode));
lockdep_assert_held(&(qnode->owner ?: tx->origin)->data.lock);
bus1_tx_push(tx, &tx->sync, qnode);
tx->sync_ts = bus1_queue_stage(&qnode->owner->data.queue,
&qnode->owner->waitq,
qnode, tx->sync_ts);
}
/**
* bus1_tx_settle() - XXX
*/
int bus1_tx_settle(struct bus1_tx *tx)
{
/*
* Settling is required to reliable return information about destroyed
* nodes. If such information is not required, there is also no need to
* settle. Hence, we can safely use TASK_KILLABLE rather than
* TASK_UNINTERRUPTIBLE, as long as the caller makes sure not to return
* node information on error.
*/
return wait_on_bit(&tx->flags, BUS1_TX_BIT_SYNCING, TASK_KILLABLE);
}
/**
* bus1_tx_commit() - XXX
*/
void bus1_tx_commit(struct bus1_tx *tx)
{
struct bus1_queue_node *qnode, **tail;
struct bus1_peer *peer, *owner, *origin = tx->origin;
u64 ts;
WARN_ON(tx->todo);
if (WARN_ON(test_bit(BUS1_TX_BIT_SEALED, &tx->flags)))
return;
/*
* XXX: acquire commit TS
*/
mutex_lock(&origin->data.lock);
bus1_queue_sync(&origin->data.queue, max(tx->sync_ts, tx->async_ts));
ts = bus1_queue_tick(&origin->data.queue);
WARN_ON(test_and_set_bit(BUS1_TX_BIT_SYNCING, &tx->flags));
WARN_ON(test_and_set_bit(BUS1_TX_BIT_SEALED, &tx->flags));
mutex_unlock(&origin->data.lock);
/*
* XXX: sync round
*/
tail = &list->sync;
do {
for (peer = bus1_tx_first(tx, *tail, &qnode);
qnode;
peer = bus1_tx_next(tx, &qnode)) {
tail = &qnode->next;
mutex_lock(&peer->data.lock);
bus1_queue_sync(&peer->data.queue, ts);
mutex_unlock(&peer->data.lock);
}
/* append async-list to the tail of the previous list */
*tail = tx->async;
tx->async = NULL;
} while (*tail);
WARN_ON(!test_and_clear_bit(BUS1_TX_BIT_SYNCING, &tx->flags));
wake_up_bit(&tx->flags, BUS1_TX_BIT_SYNCING);
/*
* XXX: commit round
*/
owner = NULL;
while ((qnode = bus1_tx_pop(tx, &tx->sync))) {
swap(owner, qnode->owner);
peer = owner ?: tx->origin;
mutex_lock(&peer->data.lock);
bus1_queue_commit_staged(&peer->data.queue, &peer->waitq,
qnode, ts);
mutex_unlock(&peer->data.lock);
owner = bus1_peer_release(owner);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment