久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

在單個后臺線程定期修改它的同時讀取 Map

Concurrently reading a Map while a single background thread regularly modifies it(在單個后臺線程定期修改它的同時讀取 Map)
本文介紹了在單個后臺線程定期修改它的同時讀取 Map的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

我有一個類,我在 updateLiveSockets() 方法內每 30 秒從單個后臺線程填充地圖 liveSocketsByDatacenter ,然后我有一個方法 getNextSocket() 將被多個讀取器線程調用以獲取可用的活動套接字,該套接字使用相同的映射來獲取此信息.

I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

正如你在我的課堂上看到的那樣:

As you can see in my class:

  • 從每 30 秒運行一次的單個后臺線程,我用 updateLiveSockets() 方法中的所有活動套接字填充 liveSocketsByDatacenter 映射.
  • 然后從多個線程中,我調用 getNextSocket() 方法給我一個可用的活動套接字,它使用 liveSocketsByDatacenter 映射來獲取所需的信息.
  • From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets in updateLiveSockets() method.
  • And then from multiple threads, I call the getNextSocket() method to give me a live socket available which uses a liveSocketsByDatacenter map to get the required information.

我的代碼運行良好,沒有任何問題,我想看看是否有更好或更有效的方法來編寫它.我還想就線程安全問題或任何競爭條件(如果有的話)發表意見,但到目前為止我還沒有看到任何問題,但我可能是錯的.

I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.

我最擔心的是 updateLiveSockets() 方法和 getLiveSocketX() 方法.我在 LINE A 迭代 liveSockets 這是 SocketHolderList 然后創建一個新的 SocketHolder 對象并添加到另一個新列表.這里可以嗎?

I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?

注意: SocketHolder 是一個不可變的類.你可以忽略我擁有的 ZeroMQ 東西.

Note: SocketHolder is an immutable class. And you can ignore ZeroMQ stuff I have.

推薦答案

您使用以下同步技術.

  1. 帶有實時套接字數據的地圖位于原子引用后面,這允許安全地切換地圖.
  2. updateLiveSockets() 方法是同步的(隱含在此),這將防止兩個線程同時切換地圖.
  3. 如果在 getNextSocket() 方法期間發生切換,請在使用地圖時對地圖進行本地引用以避免混淆.
  1. The map with live socket data is behind an atomic reference, this allows safely switching the map.
  2. The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
  3. You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.

它像現在一樣是線程安全的嗎?

線程安全始終取決于共享可變數據是否正確同步.在這種情況下,共享的可變數據是數據中心到其 SocketHolders 列表的映射.

Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.

地圖位于AtomicReference 中,并且制作本地副本以供使用這一事實足以在地圖上進行同步.您的方法采用地圖的一個版本并使用它,由于 AtomicReference 的性質,切換版本是線程安全的.這也可以通過將成員字段設置為地圖 volatile 來實現,因為您所做的只是更新引用(您無需對其執行任何檢查然后執行操作).

The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).

由于 scheduleAtFixedRate() 保證傳遞的 Runnable 不會與自身并發運行,所以 updateLiveSockets() 上的 synchronized 不是必需的,但是,它也不會造成任何真正的傷害.

As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.

所以是的,這個類是線程安全的,因為它是.

So yes, this class is thread safe, as it is.

但是,SocketHolder 是否可以被多個線程同時使用并不完全清楚.事實上,這個類只是試圖通過選擇一個隨機的活動來最小化 SocketHolder 的并發使用(盡管不需要打亂整個數組來選擇一個隨機索引).它實際上并沒有阻止并發使用.

However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.

可以提高效率嗎?

我相信它可以.查看 updateLiveSockets() 方法時,它似乎構建了完全相同的映射,除了 SocketHolder 可能具有不同的 isLive 標志.這使我得出結論,與其切換整個地圖,我只想切換地圖中的每個列表.為了以線程安全的方式更改映射中的條目,我可以使用 ConcurrentHashMap.

I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.

如果我使用 ConcurrentHashMap,并且不切換映射,而是切換映射中的值,我可以擺脫 AtomicReference.

If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.

要更改映射,我可以構建新列表并將其直接放入地圖中.這樣更高效,因為我發布數據更快,創建的對象更少,而我的同步只是建立在現成的組件上,這有利于可讀性.

To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.

這是我的構建(為了簡潔,省略了一些不太相關的部分)

Here's my build (omitted some parts that were less relevant, for brevity)

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
    private final ZContext ctx = new ZContext();

    // ...

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
      }
    }

    // ...      

    // this method will be called by multiple threads to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    // is there any concurrency or thread safety issue or race condition here?
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // no need to make this synchronized
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
      }
    }

}

這篇關于在單個后臺線程定期修改它的同時讀取 Map的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Convert List of Strings into Map using Java-8 Streams API(使用 Java-8 Streams API 將字符串列表轉換為 Map)
Getting data from JSON(從 JSON 獲取數據)
java linkedhashmap iteration(javalinkedhashmap迭代)
Converting a list of objects to Map(將對象列表轉換為 Map)
Create a HashMap with a fixed Key corresponding to a HashSet. point of departure(用一個固定的Key對應一個HashSet創建一個HashMap.出發點)
HttpMessageConverter exception : RestClientException: Could not write request: no suitable HttpMessageConverter found(HttpMessageConverter 異常:RestClientException:無法寫入請求:找不到合適的 HttpMessageConverter) - IT屋-程序員
主站蜘蛛池模板: 玖操| 亚洲精品在线观看网站 | 在线精品一区二区 | 性一交一乱一透一a级 | 精精精精xxxx免费视频 | 亚洲精品中文字幕在线观看 | 欧美视频一区 | 精品视频一区二区 | 黑人巨大精品欧美一区二区免费 | 亚洲国产一区二区三区 | 亚洲精品免费在线观看 | 不卡在线一区 | 欧美综合在线视频 | 亚洲人成人一区二区在线观看 | 国产ts一区 | 精品国产乱码久久久久久丨区2区 | 日韩欧美国产成人一区二区 | 99精品热视频 | 成人精品视频在线 | 亚洲视频中文字幕 | 午夜影院在线 | 亚洲综合二区 | 国产精品久久久精品 | 欧洲一区二区视频 | 国产成人免费 | 日本免费视频在线观看 | 成人在线中文字幕 | 欧美精品一区二区三区一线天视频 | 久久亚洲欧美日韩精品专区 | a视频在线 | 天天拍天天色 | 亚洲精品日韩在线 | 欧美成人一级视频 | 久久久久久久一区 | 日韩欧美在线观看视频 | 久久久久久久一区二区三区 | 久久精品久久久久久 | a久久 | 日韩1区2区 | 国产精品明星裸体写真集 | 在线观看免费观看在线91 |