class ConsumerRef < ServerSide::Model(:consumer_refs) def self.signaled_node_ids(channel_id) transaction do node_ids, ref_ids = [], [] filter(:channel_id => channel_id, :signaled => true).each do |r| ref_ids << r[:id] node_ids << r[:node_id] end filter(:id => ref_ids).update(:signaled => false) node_ids end end def self.signal_by_node_id(node_id) filter(:node_id => node_id).update(:signaled => true) end def self.signal_by_channel_id(channel_id) filter(:channel_id => channel_id).update(:signaled => true) end def self.remove_by_channel_id(channel_id) filter(:channel_id => channel_id).delete end def self.minimum_sample_rate(node_id) filter(:node_id => node_id).minimum(:sample_rate) end def self.add_or_update(channel_id, node_id, sample_rate) ref = find(:channel_id => channel_id, :node_id => node_id) if ref if ref[:sample_rate] != sample_rate ref.set(:sample_rate => sample_rate, :signaled => true) end else ref = create( :channel_id => channel_id, :node_id => node_id, :sample_rate => sample_rate :signaled => true ) end ref end end class Channel < ServerSide::Model(:channels) def self.compile_consumer_events(channel_id) ConsumerRef.signaled_node_ids(channel_id).inject([]) do |events, node_id| node = Node[node_id] events << ChannelEvent.state_changed(node) end end before_delete do transaction do ConsumerRef.delete_by_channel_id(id) ProducerRef.delete_by_channel_id(id) Node.filter(:producer_id => id).update(:producer_id => nil) ...etc... end end one_to_many :produced_nodes, :class => Node, :key => {:producer_id => :id} def consumed_nodes Node.join(:consumer_refs, :node_id => :id). filter(:consumer_refs__channel_id => @pkey) end def self.stale filter(:stamp.LESS_THAN(Time.now - STALE_PERIOD)) end def self.delete_stale db.transaction {stale.delete} end def touch set(:stamp => Time.ticks) end def touch_produced_nodes t = Time.now produced_nodes.each {|n| n.touch(t)} end def signal_subscribed_nodes ConsumerNodeRef.signal_by_channel_id(id) end def subscribe(paths, sample_rate = nil) Node.filter(:path => paths).each do |node| ConsumerNodeRef.add_or_update(id, node.id, sample_rate) node.update_tracking end end def mount(path) Node.mount_subtree(path, id) end end ################################################################################