Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
name_distr.c
Go to the documentation of this file.
1 /*
2  * net/tipc/name_distr.c: TIPC name distribution code
3  *
4  * Copyright (c) 2000-2006, Ericsson AB
5  * Copyright (c) 2005, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in the
15  * documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "link.h"
39 #include "name_distr.h"
40 
41 #define ITEM_SIZE sizeof(struct distr_item)
42 
61 struct distr_item {
67 };
68 
74 struct publ_list {
75  struct list_head list;
77 };
78 
79 static struct publ_list publ_zone = {
80  .list = LIST_HEAD_INIT(publ_zone.list),
81  .size = 0,
82 };
83 
84 static struct publ_list publ_cluster = {
85  .list = LIST_HEAD_INIT(publ_cluster.list),
86  .size = 0,
87 };
88 
89 static struct publ_list publ_node = {
90  .list = LIST_HEAD_INIT(publ_node.list),
91  .size = 0,
92 };
93 
94 static struct publ_list *publ_lists[] = {
95  NULL,
96  &publ_zone, /* publ_lists[TIPC_ZONE_SCOPE] */
97  &publ_cluster, /* publ_lists[TIPC_CLUSTER_SCOPE] */
98  &publ_node /* publ_lists[TIPC_NODE_SCOPE] */
99 };
100 
101 
105 static void publ_to_item(struct distr_item *i, struct publication *p)
106 {
107  i->type = htonl(p->type);
108  i->lower = htonl(p->lower);
109  i->upper = htonl(p->upper);
110  i->ref = htonl(p->ref);
111  i->key = htonl(p->key);
112 }
113 
117 static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
118 {
119  struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size);
120  struct tipc_msg *msg;
121 
122  if (buf != NULL) {
123  msg = buf_msg(buf);
124  tipc_msg_init(msg, NAME_DISTRIBUTOR, type, INT_H_SIZE, dest);
125  msg_set_size(msg, INT_H_SIZE + size);
126  }
127  return buf;
128 }
129 
130 static void named_cluster_distribute(struct sk_buff *buf)
131 {
132  struct sk_buff *buf_copy;
133  struct tipc_node *n_ptr;
134 
135  list_for_each_entry(n_ptr, &tipc_node_list, list) {
136  if (tipc_node_active_links(n_ptr)) {
137  buf_copy = skb_copy(buf, GFP_ATOMIC);
138  if (!buf_copy)
139  break;
140  msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
141  tipc_link_send(buf_copy, n_ptr->addr, n_ptr->addr);
142  }
143  }
144 
145  kfree_skb(buf);
146 }
147 
151 void tipc_named_publish(struct publication *publ)
152 {
153  struct sk_buff *buf;
154  struct distr_item *item;
155 
156  list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list);
157  publ_lists[publ->scope]->size++;
158 
159  if (publ->scope == TIPC_NODE_SCOPE)
160  return;
161 
162  buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
163  if (!buf) {
164  pr_warn("Publication distribution failure\n");
165  return;
166  }
167 
168  item = (struct distr_item *)msg_data(buf_msg(buf));
169  publ_to_item(item, publ);
170  named_cluster_distribute(buf);
171 }
172 
177 {
178  struct sk_buff *buf;
179  struct distr_item *item;
180 
181  list_del(&publ->local_list);
182  publ_lists[publ->scope]->size--;
183 
184  if (publ->scope == TIPC_NODE_SCOPE)
185  return;
186 
187  buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
188  if (!buf) {
189  pr_warn("Withdrawal distribution failure\n");
190  return;
191  }
192 
193  item = (struct distr_item *)msg_data(buf_msg(buf));
194  publ_to_item(item, publ);
195  named_cluster_distribute(buf);
196 }
197 
198 /*
199  * named_distribute - prepare name info for bulk distribution to another node
200  */
201 static void named_distribute(struct list_head *message_list, u32 node,
202  struct publ_list *pls, u32 max_item_buf)
203 {
204  struct publication *publ;
205  struct sk_buff *buf = NULL;
206  struct distr_item *item = NULL;
207  u32 left = 0;
208  u32 rest = pls->size * ITEM_SIZE;
209 
210  list_for_each_entry(publ, &pls->list, local_list) {
211  if (!buf) {
212  left = (rest <= max_item_buf) ? rest : max_item_buf;
213  rest -= left;
214  buf = named_prepare_buf(PUBLICATION, left, node);
215  if (!buf) {
216  pr_warn("Bulk publication failure\n");
217  return;
218  }
219  item = (struct distr_item *)msg_data(buf_msg(buf));
220  }
221  publ_to_item(item, publ);
222  item++;
223  left -= ITEM_SIZE;
224  if (!left) {
225  list_add_tail((struct list_head *)buf, message_list);
226  buf = NULL;
227  }
228  }
229 }
230 
234 void tipc_named_node_up(unsigned long nodearg)
235 {
236  struct tipc_node *n_ptr;
237  struct tipc_link *l_ptr;
238  struct list_head message_list;
239  u32 node = (u32)nodearg;
240  u32 max_item_buf = 0;
241 
242  /* compute maximum amount of publication data to send per message */
244  n_ptr = tipc_node_find(node);
245  if (n_ptr) {
246  tipc_node_lock(n_ptr);
247  l_ptr = n_ptr->active_links[0];
248  if (l_ptr)
249  max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
250  ITEM_SIZE) * ITEM_SIZE;
251  tipc_node_unlock(n_ptr);
252  }
254  if (!max_item_buf)
255  return;
256 
257  /* create list of publication messages, then send them as a unit */
258  INIT_LIST_HEAD(&message_list);
259 
261  named_distribute(&message_list, node, &publ_cluster, max_item_buf);
262  named_distribute(&message_list, node, &publ_zone, max_item_buf);
264 
265  tipc_link_send_names(&message_list, (u32)node);
266 }
267 
274 static void named_purge_publ(struct publication *publ)
275 {
276  struct publication *p;
277 
278  write_lock_bh(&tipc_nametbl_lock);
279  p = tipc_nametbl_remove_publ(publ->type, publ->lower,
280  publ->node, publ->ref, publ->key);
281  if (p)
283  write_unlock_bh(&tipc_nametbl_lock);
284 
285  if (p != publ) {
286  pr_err("Unable to remove publication from failed node\n"
287  " (type=%u, lower=%u, node=0x%x, ref=%u, key=%u)\n",
288  publ->type, publ->lower, publ->node, publ->ref,
289  publ->key);
290  }
291 
292  kfree(p);
293 }
294 
298 void tipc_named_recv(struct sk_buff *buf)
299 {
300  struct publication *publ;
301  struct tipc_msg *msg = buf_msg(buf);
302  struct distr_item *item = (struct distr_item *)msg_data(msg);
303  u32 count = msg_data_sz(msg) / ITEM_SIZE;
304 
306  while (count--) {
307  if (msg_type(msg) == PUBLICATION) {
308  publ = tipc_nametbl_insert_publ(ntohl(item->type),
309  ntohl(item->lower),
310  ntohl(item->upper),
312  msg_orignode(msg),
313  ntohl(item->ref),
314  ntohl(item->key));
315  if (publ) {
317  msg_orignode(msg),
318  publ,
320  named_purge_publ);
321  }
322  } else if (msg_type(msg) == WITHDRAWAL) {
323  publ = tipc_nametbl_remove_publ(ntohl(item->type),
324  ntohl(item->lower),
325  msg_orignode(msg),
326  ntohl(item->ref),
327  ntohl(item->key));
328 
329  if (publ) {
331  kfree(publ);
332  } else {
333  pr_err("Unable to remove publication by node 0x%x\n"
334  " (type=%u, lower=%u, ref=%u, key=%u)\n",
335  msg_orignode(msg), ntohl(item->type),
336  ntohl(item->lower), ntohl(item->ref),
337  ntohl(item->key));
338  }
339  } else {
340  pr_warn("Unrecognized name table message received\n");
341  }
342  item++;
343  }
345  kfree_skb(buf);
346 }
347 
356 {
357  struct publication *publ;
358  int scope;
359 
361 
362  for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
363  list_for_each_entry(publ, &publ_lists[scope]->list, local_list)
364  publ->node = tipc_own_addr;
365 
367 }