pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

在單個(gè)后臺(tái)線程定期修改它的同時(shí)讀取 Map

Concurrently reading a Map while a single background thread regularly modifies it(在單個(gè)后臺(tái)線程定期修改它的同時(shí)讀取 Map)
本文介紹了在單個(gè)后臺(tái)線程定期修改它的同時(shí)讀取 Map的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我有一個(gè)類,我在 updateLiveSockets() 方法內(nèi)每 30 秒從單個(gè)后臺(tái)線程填充地圖 liveSocketsByDatacenter ,然后我有一個(gè)方法 getNextSocket() 將被多個(gè)讀取器線程調(diào)用以獲取可用的活動(dòng)套接字,該套接字使用相同的映射來(lái)獲取此信息.

I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

正如你在我的課堂上看到的那樣:

As you can see in my class:

  • 從每 30 秒運(yùn)行一次的單個(gè)后臺(tái)線程,我用 updateLiveSockets() 方法中的所有活動(dòng)套接字填充 liveSocketsByDatacenter 映射.
  • 然后從多個(gè)線程中,我調(diào)用 getNextSocket() 方法給我一個(gè)可用的活動(dòng)套接字,它使用 liveSocketsByDatacenter 映射來(lái)獲取所需的信息.
  • From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets in updateLiveSockets() method.
  • And then from multiple threads, I call the getNextSocket() method to give me a live socket available which uses a liveSocketsByDatacenter map to get the required information.

我的代碼運(yùn)行良好,沒(méi)有任何問(wèn)題,我想看看是否有更好或更有效的方法來(lái)編寫(xiě)它.我還想就線程安全問(wèn)題或任何競(jìng)爭(zhēng)條件(如果有的話)發(fā)表意見(jiàn),但到目前為止我還沒(méi)有看到任何問(wèn)題,但我可能是錯(cuò)的.

I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.

我最擔(dān)心的是 updateLiveSockets() 方法和 getLiveSocketX() 方法.我在 LINE A 迭代 liveSockets 這是 SocketHolderList 然后創(chuàng)建一個(gè)新的 SocketHolder 對(duì)象并添加到另一個(gè)新列表.這里可以嗎?

I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?

注意: SocketHolder 是一個(gè)不可變的類.你可以忽略我擁有的 ZeroMQ 東西.

Note: SocketHolder is an immutable class. And you can ignore ZeroMQ stuff I have.

推薦答案

您使用以下同步技術(shù).

  1. 帶有實(shí)時(shí)套接字?jǐn)?shù)據(jù)的地圖位于原子引用后面,這允許安全地切換地圖.
  2. updateLiveSockets() 方法是同步的(隱含在此),這將防止兩個(gè)線程同時(shí)切換地圖.
  3. 如果在 getNextSocket() 方法期間發(fā)生切換,請(qǐng)?jiān)谑褂玫貓D時(shí)對(duì)地圖進(jìn)行本地引用以避免混淆.
  1. The map with live socket data is behind an atomic reference, this allows safely switching the map.
  2. The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
  3. You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.

它像現(xiàn)在一樣是線程安全的嗎?

線程安全始終取決于共享可變數(shù)據(jù)是否正確同步.在這種情況下,共享的可變數(shù)據(jù)是數(shù)據(jù)中心到其 SocketHolders 列表的映射.

Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.

地圖位于AtomicReference 中,并且制作本地副本以供使用這一事實(shí)足以在地圖上進(jìn)行同步.您的方法采用地圖的一個(gè)版本并使用它,由于 AtomicReference 的性質(zhì),切換版本是線程安全的.這也可以通過(guò)將成員字段設(shè)置為地圖 volatile 來(lái)實(shí)現(xiàn),因?yàn)槟龅闹皇歉乱?您無(wú)需對(duì)其執(zhí)行任何檢查然后執(zhí)行操作).

The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).

由于 scheduleAtFixedRate() 保證傳遞的 Runnable 不會(huì)與自身并發(fā)運(yùn)行,所以 updateLiveSockets() 上的 synchronized 不是必需的,但是,它也不會(huì)造成任何真正的傷害.

As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.

所以是的,這個(gè)類是線程安全的,因?yàn)樗?

So yes, this class is thread safe, as it is.

但是,SocketHolder 是否可以被多個(gè)線程同時(shí)使用并不完全清楚.事實(shí)上,這個(gè)類只是試圖通過(guò)選擇一個(gè)隨機(jī)的活動(dòng)來(lái)最小化 SocketHolder 的并發(fā)使用(盡管不需要打亂整個(gè)數(shù)組來(lái)選擇一個(gè)隨機(jī)索引).它實(shí)際上并沒(méi)有阻止并發(fā)使用.

However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.

可以提高效率嗎?

我相信它可以.查看 updateLiveSockets() 方法時(shí),它似乎構(gòu)建了完全相同的映射,除了 SocketHolder 可能具有不同的 isLive 標(biāo)志.這使我得出結(jié)論,與其切換整個(gè)地圖,我只想切換地圖中的每個(gè)列表.為了以線程安全的方式更改映射中的條目,我可以使用 ConcurrentHashMap.

I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.

如果我使用 ConcurrentHashMap,并且不切換映射,而是切換映射中的值,我可以擺脫 AtomicReference.

If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.

要更改映射,我可以構(gòu)建新列表并將其直接放入地圖中.這樣更高效,因?yàn)槲野l(fā)布數(shù)據(jù)更快,創(chuàng)建的對(duì)象更少,而我的同步只是建立在現(xiàn)成的組件上,這有利于可讀性.

To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.

這是我的構(gòu)建(為了簡(jiǎn)潔,省略了一些不太相關(guān)的部分)

Here's my build (omitted some parts that were less relevant, for brevity)

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
    private final ZContext ctx = new ZContext();

    // ...

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
      }
    }

    // ...      

    // this method will be called by multiple threads to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    // is there any concurrency or thread safety issue or race condition here?
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // no need to make this synchronized
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
      }
    }

}

這篇關(guān)于在單個(gè)后臺(tái)線程定期修改它的同時(shí)讀取 Map的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

【網(wǎng)站聲明】本站部分內(nèi)容來(lái)源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問(wèn)題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

相關(guān)文檔推薦

Convert List of Strings into Map using Java-8 Streams API(使用 Java-8 Streams API 將字符串列表轉(zhuǎn)換為 Map)
Getting data from JSON(從 JSON 獲取數(shù)據(jù))
java linkedhashmap iteration(javalinkedhashmap迭代)
Converting a list of objects to Map(將對(duì)象列表轉(zhuǎn)換為 Map)
Create a HashMap with a fixed Key corresponding to a HashSet. point of departure(用一個(gè)固定的Key對(duì)應(yīng)一個(gè)HashSet創(chuàng)建一個(gè)HashMap.出發(fā)點(diǎn))
HttpMessageConverter exception : RestClientException: Could not write request: no suitable HttpMessageConverter found(HttpMessageConverter 異常:RestClientException:無(wú)法寫(xiě)入請(qǐng)求:找不到合適的 HttpMessageConverter) - IT屋-程序員
主站蜘蛛池模板: 北京发电车出租-发电机租赁公司-柴油发电机厂家 - 北京明旺盛安机电设备有限公司 | 沈阳液压泵_沈阳液压阀_沈阳液压站-沈阳海德太科液压设备有限公司 | 银川美容培训-美睫美甲培训-彩妆纹绣培训-新娘化妆-学化妆-宁夏倍莱妮职业技能培训学校有限公司 临时厕所租赁_玻璃钢厕所租赁_蹲式|坐式厕所出租-北京慧海通 | 冷热冲击试验箱_温度冲击试验箱价格_冷热冲击箱排名_林频厂家 | 电力电子产业网| PCB设计,PCB抄板,电路板打样,PCBA加工-深圳市宏力捷电子有限公司 | 活动策划,舞台搭建,活动策划公司-首选美湖上海活动策划公司 | 威廉希尔WilliamHill·足球(中国)体育官方网站 | 铝箔袋,铝箔袋厂家,东莞铝箔袋,防静电铝箔袋,防静电屏蔽袋,防静电真空袋,真空袋-东莞铭晋让您的产品与众不同 | 山东聚盛新型材料有限公司-纳米防腐隔热彩铝板和纳米防腐隔热板以及钛锡板、PVDF氟膜板供应商 | app开发|app开发公司|小程序开发|物联网开发||北京网站制作|--前潮网络 | 净水器代理,净水器招商,净水器加盟-FineSky德国法兹全屋净水 | IWIS链条代理-ALPS耦合透镜-硅烷预处理剂-上海顶楚电子有限公司 lcd条形屏-液晶长条屏-户外广告屏-条形智能显示屏-深圳市条形智能电子有限公司 | 导电银胶_LED封装导电银胶_半导体封装导电胶厂家-上海腾烁 | 酒糟烘干机-豆渣烘干机-薯渣烘干机-糟渣烘干设备厂家-焦作市真节能环保设备科技有限公司 | 江苏全风,高压风机,全风环保风机,全风环形高压风机,防爆高压风机厂家-江苏全风环保科技有限公司(官网) | 劳动法网-专业的劳动法和劳动争议仲裁服务网 | 动物解剖台-成蚊接触筒-标本工具箱-负压实验台-北京哲成科技有限公司 | 液压压力机,液压折弯机,液压剪板机,模锻液压机-鲁南新力机床有限公司 | 滑板场地施工_极限运动场地设计_滑板公园建造_盐城天人极限运动场地建设有限公司 | 湖南自考_湖南自学考试 | 磁力去毛刺机_去毛刺磁力抛光机_磁力光饰机_磁力滚抛机_精密金属零件去毛刺机厂家-冠古科技 | 六维力传感器_三维力传感器_二维力传感器-南京神源生智能科技有限公司 | 上海单片机培训|重庆曙海培训分支机构—CortexM3+uC/OS培训班,北京linux培训,Windows驱动开发培训|上海IC版图设计,西安linux培训,北京汽车电子EMC培训,ARM培训,MTK培训,Android培训 | 胶原检测试剂盒,弹性蛋白检测试剂盒,类克ELISA试剂盒,阿达木单抗ELISA试剂盒-北京群晓科苑生物技术有限公司 | 称重传感器,测力传感器,拉压力传感器,压力变送器,扭矩传感器,南京凯基特电气有限公司 | 定量包装秤,吨袋包装称,伸缩溜管,全自动包装秤,码垛机器人,无锡市邦尧机械工程有限公司 | 干式磁选机_湿式磁选机_粉体除铁器-潍坊国铭矿山设备有限公司 | 范秘书_懂你的范文小秘书 | 环氧树脂地坪_防静电地坪漆_环氧地坪漆涂料厂家-地壹涂料地坪漆 环球电气之家-中国专业电气电子产品行业服务网站! | 活性炭-蜂窝-椰壳-柱状-粉状活性炭-河南唐达净水材料有限公司 | 安平县鑫川金属丝网制品有限公司,声屏障,高速声屏障,百叶孔声屏障,大弧形声屏障,凹凸穿孔声屏障,铁路声屏障,顶部弧形声屏障,玻璃钢吸音板 | 电地暖-电采暖-发热膜-石墨烯电热膜品牌加盟-暖季地暖厂家 | 铝合金重力铸造_铝合金翻砂铸造_铝铸件厂家-东莞市铝得旺五金制品有限公司 | 模切之家-专注服务模切行业的B2B平台! | 济南画室培训-美术高考培训-山东艺霖艺术培训画室 | LHH药品稳定性试验箱-BPS系列恒温恒湿箱-意大利超低温冰箱-上海一恒科学仪器有限公司 | 自动配料系统_称重配料控制系统厂家 | 软膜天花_软膜灯箱_首选乐创品牌_一站式天花软膜材料供应商! | RV减速机-蜗轮蜗杆减速机-洗车机减速机-减速机厂家-艾思捷 | 【直乐】河北石家庄脊柱侧弯医院_治疗椎间盘突出哪家医院好_骨科脊柱外科专业医院_治疗抽动症/关节病骨伤权威医院|排行-直乐矫形中医医院 |